From ca6ee91c5e1167b1929eedbb96c76dfa24e7d026 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Wed, 27 May 2026 18:35:18 +0900 Subject: refactor: ChatMessage.chunks[] union — interleaved thinking, tool batching, error/system chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bun.lock | 1 + packages/api/src/agent-manager.ts | 221 ++++++++--- packages/api/tests/agent-manager.test.ts | 15 +- packages/api/tests/routes.test.ts | 15 +- packages/core/src/agent/agent.ts | 298 +++++++++----- packages/core/src/chunks/append.ts | 265 +++++++++++++ packages/core/src/db/index.ts | 1 - packages/core/src/db/messages.ts | 68 +++- packages/core/src/index.ts | 7 + packages/core/src/tools/list-files.ts | 5 +- packages/core/src/tools/write-file.ts | 5 +- packages/core/src/types/index.ts | 57 ++- packages/core/tests/agent/agent.test.ts | 6 +- packages/core/tests/chunks/append.test.ts | 437 +++++++++++++++++++++ packages/core/tests/tools/write-file.test.ts | 2 +- packages/frontend/package.json | 1 + .../frontend/src/lib/components/ChatMessage.svelte | 76 ++-- .../src/lib/components/ToolCallDisplay.svelte | 4 +- packages/frontend/src/lib/tabs.svelte.ts | 364 +++++++++-------- packages/frontend/src/lib/types.ts | 61 ++- packages/frontend/tests/chat-store.test.ts | 408 +++++++++---------- plan-chunk-refactor.md | 245 ++++++++++++ 22 files changed, 1967 insertions(+), 595 deletions(-) create mode 100644 packages/core/src/chunks/append.ts create mode 100644 packages/core/tests/chunks/append.test.ts create mode 100644 plan-chunk-refactor.md diff --git a/bun.lock b/bun.lock index 92f730c..9518892 100644 --- a/bun.lock +++ b/bun.lock @@ -42,6 +42,7 @@ "name": "@dispatch/frontend", "version": "0.0.1", "dependencies": { + "@dispatch/core": "workspace:*", "dompurify": "^3.4.5", "highlight.js": "^11.11.1", "marked": "^18.0.4", diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 487b09f..0f06683 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -3,9 +3,12 @@ import { type AgentEvent, type AgentSkillMapping, type AgentStatus, + appendEventToChunks, appendMessage, + applySystemEvent, BackgroundShellStore, BackgroundTranscriptStore, + type Chunk, type ClaudeAccount, clearSpillForTab, configToRuleset, @@ -23,6 +26,7 @@ import { createYoutubeTranscribeTool, type DispatchConfig, getClaudeAccountsFromDB, + getMessagesForTab, getSetting, loadConfig, loadSkills, @@ -32,7 +36,9 @@ import { refreshAccountCredentialsAsync, resolveApiKey, type SkillDefinition, + type SystemChunkKind, TaskList, + updateMessage, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -160,6 +166,14 @@ interface TabAgent { shellStore: BackgroundShellStore; /** Store for transcript requests backgrounded due to user interrupt. */ transcriptStore: BackgroundTranscriptStore; + /** + * In-flight assistant message chunks for the active turn. `null` when + * no turn is running. Out-of-band system events (config-reload, + * cancel, etc.) target this list when present. + */ + currentChunks: Chunk[] | null; + /** DB id of the in-flight assistant message (if persisted yet). */ + currentAssistantId: string | null; } export class AgentManager { @@ -222,9 +236,10 @@ export class AgentManager { for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } - // Emit config-reload to all tabs + // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); + this.routeSystemEventToTab(tabId, "config-reload", "Configuration reloaded"); } }); @@ -234,9 +249,10 @@ export class AgentManager { for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } - // Emit config-reload to all tabs + // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); + this.routeSystemEventToTab(tabId, "config-reload", "Skills reloaded"); } }); } @@ -297,6 +313,8 @@ export class AgentManager { queueListeners: [], shellStore: new BackgroundShellStore(), transcriptStore: new BackgroundTranscriptStore(), + currentChunks: null, + currentAssistantId: null, }; this.tabAgents.set(tabId, tabAgent); } @@ -671,9 +689,80 @@ export class AgentManager { } } + /** + * Persist a system chunk to a tab's message history. + * + * If an assistant turn is in flight (`currentChunks` is non-null), the + * chunk is appended to the in-flight assistant message's chunk list — + * the final `appendMessage` / `updateMessage` call at end-of-turn picks + * it up automatically. + * + * Otherwise we load the tab's persisted messages, run them through + * `applySystemEvent` (which either appends to an existing trailing + * `role: "system"` message or creates a new one), then persist the + * delta via `appendMessage` / `updateMessage`. + */ + private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void { + const tabAgent = this.tabAgents.get(tabId); + + // Turn in flight → append directly to the in-flight chunk list. + // The chunk lands on the assistant message when it's persisted at + // turn-end (or the assistant message is updated mid-turn elsewhere). + if (tabAgent?.currentChunks) { + tabAgent.currentChunks.push({ type: "system", kind, text }); + if (tabAgent.currentAssistantId) { + try { + updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); + } catch { + // Best-effort — the final persistence in processMessage will + // flush the same chunks again. + } + } + return; + } + + // No turn in flight → route via applySystemEvent against the + // persisted message list. Either appends to a trailing system + // message or creates a fresh one. + try { + const rows = getMessagesForTab(tabId); + const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks })); + const before = messages[messages.length - 1]; + const { messageId } = applySystemEvent(messages, { kind, text }); + const target = messages.find((m) => m.id === messageId); + if (!target) return; + if (before && before.id === messageId) { + // Appended to existing trailing system message. + updateMessage(messageId, JSON.stringify(target.chunks)); + } else { + // Newly created system message. + appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks)); + } + } catch { + // DB not available (e.g. tab not yet created) — drop silently. + } + } + stopTab(tabId: string): void { const tabAgent = this.tabAgents.get(tabId); if (tabAgent) { + // If a turn is in flight, drop a `cancelled` system chunk into + // the in-flight assistant message so the user sees an explicit + // "Generation cancelled by user" marker at the cancellation point. + if (tabAgent.currentChunks) { + tabAgent.currentChunks.push({ + type: "system", + kind: "cancelled", + text: "Generation cancelled by user", + }); + if (tabAgent.currentAssistantId) { + try { + updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); + } catch { + // best-effort + } + } + } tabAgent.abortController?.abort(); tabAgent.status = "idle"; tabAgent.agent = null; @@ -891,15 +980,28 @@ export class AgentManager { currentKeyId = entry.key_id || undefined; currentModelId = entry.model_id || undefined; allOutput = ""; - let assistantText = ""; - let assistantThinking = ""; - const assistantToolCalls: Array<{ - id: string; - name: string; - arguments: Record; - result?: string; - isError?: boolean; - }> = []; + + // Single ordered chunk list for the assistant turn — replaces the + // previous (text + toolCalls + thinking) tri-accumulator pattern. + // Persisted progressively (insert on first chunk, update thereafter) + // so out-of-band routes (config-reload, cancel) see real DB rows. + const chunks: Chunk[] = []; + const assistantId = crypto.randomUUID(); + let assistantPersisted = false; + tabAgent.currentChunks = chunks; + tabAgent.currentAssistantId = assistantId; + + const flushAssistant = (): void => { + if (chunks.length === 0) return; + const json = JSON.stringify(chunks); + if (!assistantPersisted) { + appendMessage(tabId, assistantId, "assistant", json); + assistantPersisted = true; + } else { + updateMessage(assistantId, json); + } + }; + let attemptError: string | null = null; try { @@ -925,11 +1027,17 @@ export class AgentManager { message, reasoningEffort ? { reasoningEffort } : undefined, )) { - // Stop processing if the tab was aborted (closed/stopped) + // Stop processing if the tab was aborted (closed/stopped). + // stopTab() already injected a `cancelled` system chunk into + // `chunks` before flipping the abort flag, so we just need + // to flush and exit. if (tabAgent.abortController?.signal.aborted) break; if (event.type === "error") { attemptError = event.error; + // Record the error as a chunk so it's part of the + // persisted turn history. + appendEventToChunks(chunks, event); break; } @@ -938,68 +1046,41 @@ export class AgentManager { } this.emit(event, tabId); - // Accumulate content for DB persistence + // For diagnostics / child agent result harvesting, keep a + // flat string copy of plain text output. if (event.type === "text-delta") { - assistantText += event.delta; allOutput += event.delta; - } else if (event.type === "reasoning-delta") { - assistantThinking += event.delta; - } else if (event.type === "tool-call") { - assistantToolCalls.push({ - id: event.toolCall.id, - name: event.toolCall.name, - arguments: event.toolCall.arguments, - }); - } else if (event.type === "tool-result") { - const tc = assistantToolCalls.find((t) => t.id === event.toolResult.toolCallId); - if (tc) { - tc.result = event.toolResult.result; - tc.isError = event.toolResult.isError; - } - } else if (event.type === "done") { - // Persist assistant message to DB - const contentSegments: Array> = []; - if (assistantText) contentSegments.push({ type: "text", text: assistantText }); - for (const tc of assistantToolCalls) { - contentSegments.push({ type: "tool-call", ...tc }); - } - if (contentSegments.length > 0) { - appendMessage( - tabId, - crypto.randomUUID(), - "assistant", - JSON.stringify(contentSegments), - assistantThinking || undefined, - ); - } - // Reset for next turn - assistantText = ""; - assistantThinking = ""; - assistantToolCalls.length = 0; } + + if (event.type === "done") { + // End of turn — flush the accumulated chunks. Reset the + // in-flight pointers so out-of-band system events route + // through `applySystemEvent` against the persisted list + // instead of mutating a stale array. + flushAssistant(); + chunks.length = 0; + assistantPersisted = true; // suppress post-loop flush + tabAgent.currentChunks = null; + tabAgent.currentAssistantId = null; + continue; + } + + // Route every content-bearing event through the shared helper. + // `appendEventToChunks` ignores lifecycle events (status / done + // / task-list-update / tab-created / message-* / etc), so it's + // safe to call unconditionally. + appendEventToChunks(chunks, event); } } catch (err) { console.error(`[dispatch] processMessage error for tab ${tabId}:`, err); attemptError = err instanceof Error ? err.message : String(err); } - // Flush any accumulated assistant content from this attempt - if (assistantText || assistantToolCalls.length > 0) { - const contentSegments: Array> = []; - if (assistantText) contentSegments.push({ type: "text", text: assistantText }); - for (const tc of assistantToolCalls) { - contentSegments.push({ type: "tool-call", ...tc }); - } - if (contentSegments.length > 0) { - appendMessage( - tabId, - crypto.randomUUID(), - "assistant", - JSON.stringify(contentSegments), - assistantThinking || undefined, - ); - } - } + // Flush any accumulated assistant content from this attempt (covers + // the abort/error/exception paths where we never saw a `done`). + flushAssistant(); + tabAgent.currentChunks = null; + tabAgent.currentAssistantId = null; // No error — success if (!attemptError) { @@ -1024,11 +1105,21 @@ export class AgentManager { `Key "${tabAgent.keyId}" rate limited. ` + `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; console.warn(`[dispatch] ${fallbackMsg}`); + // Persist the notice + model-change as system chunks. We're + // between turns here (just flushed the previous assistant + // message), so the helper routes them into a `role: "system"` + // message via `applySystemEvent`. this.emit({ type: "notice", message: fallbackMsg }, tabId); + this.routeSystemEventToTab(tabId, "notice", fallbackMsg); this.emit( { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, tabId, ); + this.routeSystemEventToTab( + tabId, + "model-changed", + `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, + ); tabAgent.agent = null; continue; } diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index a1f2d75..28fa044 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -13,7 +13,10 @@ vi.mock("@dispatch/core", () => ({ yield { type: "text-delta", delta: "world" } as const; yield { type: "done", - message: { role: "assistant", content: "Hello world" }, + message: { + role: "assistant", + chunks: [{ type: "text", text: "Hello world" }], + }, } as const; yield { type: "status", status: "idle" } as const; } @@ -178,6 +181,16 @@ vi.mock("@dispatch/core", () => ({ return null; }, appendMessage() {}, + updateMessage() {}, + getMessagesForTab() { + return []; + }, + appendEventToChunks(_chunks: unknown[], _event: unknown) { + // no-op stub; chunk accumulation isn't exercised in these unit tests + }, + applySystemEvent(_messages: unknown[], _event: unknown) { + return { messageId: "mock-system-msg" }; + }, BackgroundShellStore: class MockBackgroundShellStore { has() { return false; diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index 69a6676..49bae49 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -14,7 +14,10 @@ vi.mock("@dispatch/core", () => ({ yield { type: "text-delta", delta: "world" } as const; yield { type: "done", - message: { role: "assistant", content: "Hello world" }, + message: { + role: "assistant", + chunks: [{ type: "text", text: "Hello world" }], + }, } as const; yield { type: "status", status: "idle" } as const; } @@ -179,6 +182,16 @@ vi.mock("@dispatch/core", () => ({ return null; }, appendMessage() {}, + updateMessage() {}, + getMessagesForTab() { + return []; + }, + appendEventToChunks(_chunks: unknown[], _event: unknown) { + // no-op stub + }, + applySystemEvent(_messages: unknown[], _event: unknown) { + return { messageId: "mock-system-msg" }; + }, BackgroundShellStore: class MockBackgroundShellStore { has() { return false; diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index ec83cad..4e58378 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -1,6 +1,7 @@ import { dirname } from "node:path"; import type { CoreMessage, CoreSystemMessage } from "ai"; import { streamText } from "ai"; +import { appendEventToChunks } from "../chunks/append.js"; import { buildBillingHeaderValue, SYSTEM_IDENTITY } from "../credentials/claude.js"; import { createProvider, prefixToolName, unprefixToolName } from "../llm/provider.js"; import { canonicalize } from "../tools/path-utils.js"; @@ -12,35 +13,118 @@ import type { AgentEvent, AgentStatus, ChatMessage, + Chunk, QueueCallbacks, ToolCall, ToolResult, } from "../types/index.js"; +/** + * Rebuild AI SDK `CoreMessage[]` from our internal `ChatMessage[]`. + * + * Strip rules (see plan-chunk-refactor.md): + * - `role: "system"` messages are skipped wholesale (they're display-only + * standalone-system bubbles that exist outside any model turn). + * - `error` chunks are skipped — the turn ended; the LLM doesn't need them. + * - `system` chunks are skipped — display-only notices. + * - `text` chunks → `{ type: "text", text }` parts. + * - `thinking` chunks → `{ type: "reasoning", text }` parts (handles + * Claude's `interleaved-thinking-2025-05-14` round-trip). + * - `tool-batch` chunks → one `{ type: "tool-call" }` part per entry + * inside the current assistant message, followed by a separate + * `{ role: "tool", content: [{ type: "tool-result", ... }] }` message + * for every entry that has a result. This mirrors what the AI SDK + * expects on the wire. + * + * Mixed-resolution tool batches (some entries with `result`, some without): + * we emit tool-results only for the entries that have them. In practice + * the agent resolves every tool call in a step before looping back to the + * LLM, so this case only arises mid-step (where the message hasn't been + * round-tripped to the LLM yet) and is benign. + */ function toCoreMessages(messages: ChatMessage[], useToolPrefix?: boolean): CoreMessage[] { const result: CoreMessage[] = []; for (const msg of messages) { + if (msg.role === "system") continue; + if (msg.role === "user") { - result.push({ role: "user", content: msg.content }); - } else if (msg.role === "assistant") { - const parts: Array< - | { type: "text"; text: string } - | { type: "tool-call"; toolCallId: string; toolName: string; args: Record } - > = [{ type: "text", text: msg.content }]; - for (const tc of msg.toolCalls ?? []) { - const toolName = useToolPrefix ? prefixToolName(tc.name) : tc.name; - parts.push({ type: "tool-call", toolCallId: tc.id, toolName, args: tc.arguments }); + // User messages in our model can in theory contain non-text chunks, + // but in practice the UI only produces text. Concatenate any text + // chunks; ignore anything else. + const text = msg.chunks + .filter((c): c is Extract => c.type === "text") + .map((c) => c.text) + .join(""); + result.push({ role: "user", content: text }); + continue; + } + + // role === "assistant" + const parts: Array< + | { type: "text"; text: string } + | { type: "reasoning"; text: string } + | { type: "tool-call"; toolCallId: string; toolName: string; args: Record } + > = []; + const trailingToolResults: Array<{ + toolCallId: string; + toolName: string; + result: string; + }> = []; + + for (const chunk of msg.chunks) { + switch (chunk.type) { + case "text": + parts.push({ type: "text", text: chunk.text }); + break; + case "thinking": + parts.push({ type: "reasoning", text: chunk.text }); + break; + case "tool-batch": + for (const entry of chunk.calls) { + const toolName = useToolPrefix ? prefixToolName(entry.name) : entry.name; + parts.push({ + type: "tool-call", + toolCallId: entry.id, + toolName, + args: entry.arguments, + }); + if (entry.result !== undefined) { + trailingToolResults.push({ + toolCallId: entry.id, + toolName, + result: entry.result, + }); + } + } + break; + case "error": + case "system": + // Strip — not sent back to the LLM. + break; } + } + + // Skip the assistant message entirely if it has no parts (e.g., a + // turn that consisted solely of system/error chunks). Emitting an + // assistant message with empty content can confuse some providers. + if (parts.length === 0 && trailingToolResults.length === 0) continue; + + if (parts.length > 0) { result.push({ role: "assistant", content: parts }); - for (const tr of msg.toolResults ?? []) { - const toolName = useToolPrefix ? prefixToolName(tr.toolName) : tr.toolName; - result.push({ - role: "tool", - content: [ - { type: "tool-result", toolCallId: tr.toolCallId, toolName, result: tr.result }, - ], - }); - } + } + + for (const tr of trailingToolResults) { + result.push({ + role: "tool", + content: [ + { + type: "tool-result", + toolCallId: tr.toolCallId, + toolName: tr.toolName, + result: tr.result, + }, + ], + }); } } return result; @@ -204,8 +288,7 @@ export class Agent { // to read it without prompting the user. Bypassing the external- // directory check here keeps the inspection flow frictionless. const isSpillPath = - resolvedPath === resolvedSpillRoot || - resolvedPath.startsWith(`${resolvedSpillRoot}/`); + resolvedPath === resolvedSpillRoot || resolvedPath.startsWith(`${resolvedSpillRoot}/`); if (!isUnderWorkdir && !isSpillPath) { const permissionType = @@ -288,7 +371,7 @@ export class Agent { this.status = "running"; yield { type: "status", status: "running" }; - this.messages.push({ role: "user", content: userMessage }); + this.messages.push({ role: "user", chunks: [{ type: "text", text: userMessage }] }); const registry = createToolRegistry(this.config.tools); // `isClaudeOAuth` gates Claude-Code-CLI-specific behavior: billing-header @@ -322,19 +405,36 @@ export class Agent { // request as coming from the official CLI. let systemPrompt = this.config.systemPrompt; if (isClaudeOAuth) { - const billingHeader = buildBillingHeaderValue(this.messages); + // `buildBillingHeaderValue` historically took `{ role, content: string }[]`. + // Project the new chunk-based ChatMessage shape onto that legacy shape + // by stringifying text chunks. Only the first user message's text is + // actually used (see extractFirstUserMessageText) — non-text chunks + // safely contribute nothing. + const legacyShape = this.messages.map((m) => ({ + role: m.role, + content: m.chunks + .filter((c): c is Extract => c.type === "text") + .map((c) => c.text) + .join(""), + })); + const billingHeader = buildBillingHeaderValue(legacyShape); systemPrompt = `${billingHeader}\n${SYSTEM_IDENTITY}\n\n${systemPrompt}`; } try { - // Track the final assistant message across all steps - let finalText = ""; - const allToolCalls: ToolCall[] = []; - const allToolResults: ToolResult[] = []; + // Single chunk accumulator for the entire assistant turn (all steps). + // All event-driven mutations go through `appendEventToChunks`. + const chunks: Chunk[] = []; // We build up a local message list for multi-turn within one run() call - // that includes tool results fed back to the LLM + // that includes tool results fed back to the LLM. Each step appends + // the assistant's evolving chunk list as one ChatMessage; subsequent + // steps see prior tool calls and their results via the chunks. const stepMessages: ChatMessage[] = [...this.messages]; + // The assistant ChatMessage for the current turn, shared across steps + // so its `chunks` reference matches the accumulator above. We push + // it once when the first step has actual output to record. + let assistantTurnMessage: ChatMessage | null = null; for (let step = 0; step < MAX_STEPS; step++) { const effort = options?.reasoningEffort ?? this.config.reasoningEffort ?? "max"; @@ -397,17 +497,35 @@ export class Agent { const result = streamText(streamOptions); - let stepText = ""; + // Per-step tool-call tracking — needed because we only loop back + // to the LLM if this step produced tool calls. The actual chunk + // state for the turn lives in `chunks`. const stepToolCalls: ToolCall[] = []; + // Ensure we have an assistant message in stepMessages whose + // `chunks` reference is shared with our accumulator. Only push + // once; subsequent steps mutate the same chunks array. + if (assistantTurnMessage === null) { + assistantTurnMessage = { role: "assistant", chunks }; + stepMessages.push(assistantTurnMessage); + } + try { for await (const event of result.fullStream) { if (event.type === "text-delta") { - stepText += event.textDelta; - finalText += event.textDelta; - yield { type: "text-delta", delta: event.textDelta }; + const internalEvent: AgentEvent = { + type: "text-delta", + delta: event.textDelta, + }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; } else if (event.type === "reasoning") { - yield { type: "reasoning-delta", delta: event.textDelta }; + const internalEvent: AgentEvent = { + type: "reasoning-delta", + delta: event.textDelta, + }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; } else if (event.type === "tool-call") { const rawName = event.toolName; const toolName = isClaudeOAuth ? unprefixToolName(rawName) : rawName; @@ -417,18 +535,21 @@ export class Agent { arguments: event.args as Record, }; stepToolCalls.push(toolCall); - allToolCalls.push(toolCall); - yield { type: "tool-call", toolCall }; + const internalEvent: AgentEvent = { type: "tool-call", toolCall }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; } else if (event.type === "error") { const errRecord = event.error as unknown as Record; const statusCode = typeof errRecord.statusCode === "number" ? errRecord.statusCode : undefined; const errorMsg = formatError(event.error, this.config); - yield { + const internalEvent: AgentEvent = { type: "error", error: errorMsg, ...(statusCode !== undefined ? { statusCode } : {}), }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; this.status = "error"; yield { type: "status", status: "error" }; return; @@ -452,8 +573,9 @@ export class Agent { arguments: {}, }; stepToolCalls.push(badToolCall); - allToolCalls.push(badToolCall); - yield { type: "tool-call", toolCall: badToolCall }; + const tcEvent: AgentEvent = { type: "tool-call", toolCall: badToolCall }; + appendEventToChunks(chunks, tcEvent); + yield tcEvent; const badToolResult: ToolResult = { toolCallId: fakeId, @@ -461,39 +583,33 @@ export class Agent { result: errorResult, isError: true, }; - allToolResults.push(badToolResult); - yield { type: "tool-result", toolResult: badToolResult }; + const trEvent: AgentEvent = { type: "tool-result", toolResult: badToolResult }; + appendEventToChunks(chunks, trEvent); + yield trEvent; } - // No tool calls means the agent is done + // No tool calls means the agent is done — the assistant message + // already exists in stepMessages with up-to-date chunks. if (stepToolCalls.length === 0) { - // Add the final assistant message to step messages (for history) - stepMessages.push({ - role: "assistant", - content: stepText, - }); break; } - // Add assistant message with tool calls to step messages - stepMessages.push({ - role: "assistant", - content: stepText, - toolCalls: stepToolCalls, - }); - - // Execute tool calls manually - const stepToolResults: ToolResult[] = []; - // Track tool calls that already have results (e.g. synthetic unavailable-tool errors) - const alreadyResolved = new Set(allToolResults.map((r) => r.toolCallId)); + // Execute tool calls manually. Their results merge back into the + // `chunks` accumulator via `appendEventToChunks`, which routes + // each result to the correct entry inside its tool-batch chunk. + // Track which calls already have a recorded result (from the + // synthetic unavailable-tool error path above) so we don't + // re-execute them. + const alreadyResolved = new Set(); + for (const c of chunks) { + if (c.type !== "tool-batch") continue; + for (const entry of c.calls) { + if (entry.result !== undefined) alreadyResolved.add(entry.id); + } + } for (const tc of stepToolCalls) { - // Skip execution for tool calls that already have synthetic results - if (alreadyResolved.has(tc.id)) { - const existing = allToolResults.find((r) => r.toolCallId === tc.id); - if (existing) stepToolResults.push(existing); - continue; - } + if (alreadyResolved.has(tc.id)) continue; const shellOutputQueue: Array<{ data: string; stream: "stdout" | "stderr" }> = []; @@ -506,7 +622,15 @@ export class Agent { while (toolResult === undefined) { if (shellOutputQueue.length > 0) { const item = shellOutputQueue.shift(); - if (item) yield { type: "shell-output", data: item.data, stream: item.stream }; + if (item) { + const shellEvent: AgentEvent = { + type: "shell-output", + data: item.data, + stream: item.stream, + }; + appendEventToChunks(chunks, shellEvent); + yield shellEvent; + } continue; } const raceResult = await Promise.race([ @@ -523,7 +647,15 @@ export class Agent { // Drain any remaining shell output emitted before we read the result while (shellOutputQueue.length > 0) { const item = shellOutputQueue.shift(); - if (item) yield { type: "shell-output", data: item.data, stream: item.stream }; + if (item) { + const shellEvent: AgentEvent = { + type: "shell-output", + data: item.data, + stream: item.stream, + }; + appendEventToChunks(chunks, shellEvent); + yield shellEvent; + } } // Check for queued user messages and append them to the tool result @@ -539,36 +671,22 @@ export class Agent { } } - stepToolResults.push(finalToolResult); - allToolResults.push(finalToolResult); - yield { type: "tool-result", toolResult: finalToolResult }; - } - - // Add tool results back to step messages so LLM can see them - // We append them to the last assistant message's toolResults - const lastMsg = stepMessages[stepMessages.length - 1]; - if (lastMsg) { - lastMsg.toolResults = stepToolResults; - } - } - - // If we exhausted MAX_STEPS and there were pending tool calls, surface an error - if (stepMessages.length > 0) { - const lastMsg = stepMessages[stepMessages.length - 1]; - if (lastMsg?.toolCalls && lastMsg.toolCalls.length > 0 && !lastMsg.toolResults) { - yield { - type: "error", - error: `Agent reached MAX_STEPS (${MAX_STEPS}) with unresolved tool calls`, - }; + const trEvent: AgentEvent = { type: "tool-result", toolResult: finalToolResult }; + appendEventToChunks(chunks, trEvent); + yield trEvent; } } - const assistantMessage: ChatMessage = { + // Build the final assistant message from the accumulated chunks. + // If no assistant turn message was ever created (e.g., the model + // produced nothing — unusual but possible), synthesize an empty one. + const assistantMessage: ChatMessage = assistantTurnMessage ?? { role: "assistant", - content: finalText, - toolCalls: allToolCalls.length > 0 ? allToolCalls : undefined, - toolResults: allToolResults.length > 0 ? allToolResults : undefined, + chunks, }; + // `assistantTurnMessage` was pushed into `stepMessages` but not into + // `this.messages` — push it now so the agent's outward-facing + // history reflects the turn. this.messages.push(assistantMessage); // Drain any remaining queued messages that arrived after the last tool call @@ -580,7 +698,7 @@ export class Agent { const userMessages = remaining.map((m) => m.message).join("\n---\n"); this.messages.push({ role: "user", - content: userMessages, + chunks: [{ type: "text", text: userMessages }], }); } } diff --git a/packages/core/src/chunks/append.ts b/packages/core/src/chunks/append.ts new file mode 100644 index 0000000..9c2a367 --- /dev/null +++ b/packages/core/src/chunks/append.ts @@ -0,0 +1,265 @@ +/** + * Chunk-builder helper. + * + * `appendEventToChunks` is the single source of truth for how a stream of + * `AgentEvent`s collapses into an ordered `Chunk[]` on a message. Both the + * backend (agent + agent-manager) and the frontend store call this helper + * so the wire format stays in lockstep across the boundary. + * + * Open/close rules — see plan-chunk-refactor.md for the full table. + * + * | Chunk | Opens on | Coalesces | + * |---------------|-------------------------------------------------------|------------------------------------------------------------| + * | `text` | first `text-delta` after a non-text chunk | consecutive `text-delta` events append to `.text` | + * | `thinking` | first `reasoning-delta` after a non-thinking chunk | consecutive `reasoning-delta` events append to `.text` | + * | `tool-batch` | first `tool-call` after a non-tool-batch chunk | consecutive `tool-call` events push a new entry to `.calls`| + * | `error` | every `error` event | NEVER (always single-event) | + * | `system` | every `notice`/`model-changed`/`config-reload`/... | NEVER (two consecutive system events → two chunks) | + * + * Side-effect events (no new chunk): + * - `tool-result` → finds the call by `id` across all `tool-batch` chunks (most-recent first) + * and updates its `result` / `isError`. + * - `shell-output` → appends to the most recent entry of the most recent `tool-batch` chunk. + * + * Ignored events: + * - `status`, `done`, `task-list-update`, `tab-created`, `message-queued`, + * `message-consumed`, `message-cancelled` — these are control / lifecycle + * events, not message content. + */ + +import type { + AgentEvent, + ChatMessage, + Chunk, + MessageRole, + SystemChunk, + SystemChunkKind, + ToolBatchChunk, +} from "../types/index.js"; + +/** + * Mutates `chunks` in place based on `event`. + * + * Returns void; the array is the output channel. + */ +export function appendEventToChunks(chunks: Chunk[], event: AgentEvent): void { + switch (event.type) { + case "text-delta": { + // Open or extend the current text chunk. + const last = chunks[chunks.length - 1]; + if (last && last.type === "text") { + last.text += event.delta; + } else { + chunks.push({ type: "text", text: event.delta }); + } + return; + } + + case "reasoning-delta": { + // Open or extend the current thinking chunk. + const last = chunks[chunks.length - 1]; + if (last && last.type === "thinking") { + last.text += event.delta; + } else { + chunks.push({ type: "thinking", text: event.delta }); + } + return; + } + + case "tool-call": { + // Open or extend the current tool-batch chunk. + const last = chunks[chunks.length - 1]; + const entry = { + id: event.toolCall.id, + name: event.toolCall.name, + arguments: event.toolCall.arguments, + }; + if (last && last.type === "tool-batch") { + last.calls.push(entry); + } else { + chunks.push({ type: "tool-batch", calls: [entry] }); + } + return; + } + + case "tool-result": { + // Find the matching call (by id) across all tool-batch chunks, + // most-recent first. Tool results can arrive after subsequent + // text-deltas, so we cannot rely on the *last* chunk being the + // tool-batch — we have to search. + for (let i = chunks.length - 1; i >= 0; i--) { + const c = chunks[i]; + if (!c || c.type !== "tool-batch") continue; + const call = c.calls.find((e) => e.id === event.toolResult.toolCallId); + if (call) { + call.result = event.toolResult.result; + call.isError = event.toolResult.isError; + return; + } + } + // Orphan result with no matching call — drop silently. + return; + } + + case "shell-output": { + // Append to the most recent entry of the most recent tool-batch. + // Walk back through chunks to find the latest tool-batch; if there + // are intervening text/thinking/etc chunks (which can happen if + // the model streams text while a shell tool is still running), + // we still want the most recent tool-batch. + for (let i = chunks.length - 1; i >= 0; i--) { + const c = chunks[i]; + if (!c || c.type !== "tool-batch") continue; + const entry = c.calls[c.calls.length - 1]; + if (!entry) return; + const prev = entry.shellOutput ?? { stdout: "", stderr: "" }; + entry.shellOutput = { + stdout: prev.stdout + (event.stream === "stdout" ? event.data : ""), + stderr: prev.stderr + (event.stream === "stderr" ? event.data : ""), + }; + return; + } + // Orphan shell-output with no tool-batch in scope — drop silently. + return; + } + + case "error": { + // Always a fresh single-event chunk — no coalescing. + chunks.push({ + type: "error", + message: event.error, + ...(event.statusCode !== undefined ? { statusCode: event.statusCode } : {}), + }); + return; + } + + case "notice": { + chunks.push({ type: "system", kind: "notice", text: event.message }); + return; + } + + case "model-changed": { + chunks.push({ + type: "system", + kind: "model-changed", + text: `Switched to ${event.modelId} (${event.keyId})`, + }); + return; + } + + case "config-reload": { + chunks.push({ + type: "system", + kind: "config-reload", + text: "Configuration reloaded", + }); + return; + } + + // Lifecycle / control events — no chunk emitted. + case "status": + case "done": + case "task-list-update": + case "tab-created": + case "message-queued": + case "message-consumed": + case "message-cancelled": + return; + + default: { + // Exhaustiveness check — if a new event variant is added to + // AgentEvent, TypeScript will complain here. + const _exhaustive: never = event; + void _exhaustive; + return; + } + } +} + +// ─── System event routing across messages ──────────────────────── + +/** + * Minimal shape needed by `applySystemEvent`. + * + * The caller (agent-manager / persistence layer) typically tracks message + * id alongside the wire-format `ChatMessage`. This generic constraint + * lets us keep core `ChatMessage` clean while still letting downstream + * pass anything with an `id`. + */ +export interface IdentifiedMessage { + id: string; + role: MessageRole; + chunks: Chunk[]; +} + +/** + * Describes the system event in caller-controlled terms. We let the caller + * decide both the `kind` (so the same helper can record cancellations, + * notices, model swaps, etc.) and the `text` (so the caller controls + * formatting / localization). + */ +export interface SystemEventLike { + kind: SystemChunkKind; + text: string; +} + +/** + * Routes a system event to the right message when *no assistant turn is + * in flight*. (When a turn IS in flight, the caller should instead use + * `appendEventToChunks` against the in-flight message's chunks directly.) + * + * Routing rules (from plan-chunk-refactor.md): + * + * 1. Most recent message is `role: "system"` → append a `system` chunk + * to it. (Note: a second consecutive system event creates a second + * system chunk inside the same system message — chunks themselves + * never coalesce.) + * 2. Otherwise → create a fresh `role: "system"` message containing one + * `system` chunk and push it. + * + * Returns the `messageId` that was used (either the existing system + * message's id or the newly-created one) so the caller can persist / + * emit a diff to subscribers. + * + * `idFactory` defaults to `crypto.randomUUID()`; tests inject a + * deterministic factory. + */ +export function applySystemEvent( + messages: M[], + event: SystemEventLike, + idFactory: () => string = defaultIdFactory, +): { messageId: string } { + const chunk: SystemChunk = { type: "system", kind: event.kind, text: event.text }; + + const last = messages[messages.length - 1]; + if (last && last.role === "system") { + last.chunks.push(chunk); + return { messageId: last.id }; + } + + const id = idFactory(); + // We can't fabricate the full `M` shape without knowing its extra + // fields, but `IdentifiedMessage` is the minimum we need to push. + // Callers that extend the shape with extra fields are responsible for + // initializing them via post-hoc patching, or by passing in their own + // message-creation logic. In practice callers either: + // (a) use `ChatMessage` itself (no extra fields beyond IdentifiedMessage), or + // (b) construct messages and look them up by id after this call returns. + const newMessage = { id, role: "system" as const, chunks: [chunk] } as unknown as M; + messages.push(newMessage); + return { messageId: id }; +} + +function defaultIdFactory(): string { + // In Node 19+ / modern browsers, `crypto.randomUUID` is available globally. + if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") { + return crypto.randomUUID(); + } + // Fallback: pseudo-random; not cryptographically secure, but adequate for + // in-memory message identifiers when randomUUID is unavailable. + return `sysmsg-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}`; +} + +// ─── Re-exports for convenience ────────────────────────────────── + +export type { ChatMessage, Chunk, SystemChunk, SystemChunkKind, ToolBatchChunk }; diff --git a/packages/core/src/db/index.ts b/packages/core/src/db/index.ts index 81b474e..e63b266 100644 --- a/packages/core/src/db/index.ts +++ b/packages/core/src/db/index.ts @@ -99,7 +99,6 @@ export function getDatabase(): Database { seq INTEGER NOT NULL, role TEXT NOT NULL, content_json TEXT NOT NULL, - thinking TEXT, created_at INTEGER NOT NULL )`); diff --git a/packages/core/src/db/messages.ts b/packages/core/src/db/messages.ts index 80a1f22..34b69e5 100644 --- a/packages/core/src/db/messages.ts +++ b/packages/core/src/db/messages.ts @@ -1,21 +1,29 @@ +import type { Chunk, MessageRole } from "../types/index.js"; import { getDatabase } from "./index.js"; +/** + * A persisted message row, with `content_json` already parsed into a `Chunk[]`. + * Mirrors the new schema (no `thinking` column — that lived under the old + * `content + toolCalls + toolResults + thinking` model). + */ export interface MessageRow { id: string; tabId: string; seq: number; - role: string; - contentJson: string; - thinking: string | null; + role: MessageRole; + chunks: Chunk[]; createdAt: number; } +/** + * Append a new message to the tab. Caller passes the already-serialized + * chunk list as `contentJson` (i.e. `JSON.stringify(chunks)`). + */ export function appendMessage( tabId: string, id: string, - role: string, + role: MessageRole, contentJson: string, - thinking?: string, ): void { const db = getDatabase(); const maxSeq = db @@ -23,40 +31,58 @@ export function appendMessage( .get({ $tabId: tabId }) as { max_seq: number }; const seq = (maxSeq?.max_seq ?? -1) + 1; db.query( - `INSERT INTO messages (id, tab_id, seq, role, content_json, thinking, created_at) - VALUES ($id, $tabId, $seq, $role, $contentJson, $thinking, $now)`, + `INSERT INTO messages (id, tab_id, seq, role, content_json, created_at) + VALUES ($id, $tabId, $seq, $role, $contentJson, $now)`, ).run({ $id: id, $tabId: tabId, $seq: seq, $role: role, $contentJson: contentJson, - $thinking: thinking ?? null, $now: Date.now(), }); } -export function updateMessage(id: string, contentJson: string, thinking?: string): void { +/** + * Replace the persisted chunks for an existing message. `contentJson` is + * the already-serialized chunk list. + */ +export function updateMessage(id: string, contentJson: string): void { const db = getDatabase(); - db.query( - "UPDATE messages SET content_json = $contentJson, thinking = $thinking WHERE id = $id", - ).run({ $id: id, $contentJson: contentJson, $thinking: thinking ?? null }); + db.query("UPDATE messages SET content_json = $contentJson WHERE id = $id").run({ + $id: id, + $contentJson: contentJson, + }); } +/** + * Read all messages for a tab in seq order. `content_json` is parsed into + * `Chunk[]` here so callers don't have to. If a row's JSON is malformed, + * the message is returned with an empty chunk list rather than throwing. + */ export function getMessagesForTab(tabId: string): MessageRow[] { const db = getDatabase(); const rows = db .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq ASC") .all({ $tabId: tabId }) as Array>; - return rows.map((row) => ({ - id: row.id as string, - tabId: row.tab_id as string, - seq: row.seq as number, - role: row.role as string, - contentJson: row.content_json as string, - thinking: row.thinking as string | null, - createdAt: row.created_at as number, - })); + return rows.map((row) => { + const rawJson = row.content_json as string; + let chunks: Chunk[]; + try { + const parsed = JSON.parse(rawJson); + chunks = Array.isArray(parsed) ? (parsed as Chunk[]) : []; + } catch { + chunks = []; + } + return { + id: row.id as string, + tabId: row.tab_id as string, + seq: row.seq as number, + role: row.role as MessageRole, + chunks, + createdAt: row.created_at as number, + }; + }); } export function clearMessagesForTab(tabId: string): void { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a68bfb9..1453a01 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -3,6 +3,13 @@ // Agent & LLM export { Agent } from "./agent/agent.js"; export { deleteAgent, getAgentDirs, loadAgents, saveAgent } from "./agents/index.js"; +// Chunk helpers +export { + appendEventToChunks, + applySystemEvent, + type IdentifiedMessage, + type SystemEventLike, +} from "./chunks/append.js"; // Config export { configToRuleset, diff --git a/packages/core/src/tools/list-files.ts b/packages/core/src/tools/list-files.ts index bf21046..e003099 100644 --- a/packages/core/src/tools/list-files.ts +++ b/packages/core/src/tools/list-files.ts @@ -20,10 +20,7 @@ export function createListFilesTool(workingDirectory: string): ToolDefinition { const absolutePath = await canonicalize(workingDirectory, relPath); const absoluteWorkDir = await canonicalize(workingDirectory); - if ( - absolutePath !== absoluteWorkDir && - !absolutePath.startsWith(`${absoluteWorkDir}/`) - ) { + if (absolutePath !== absoluteWorkDir && !absolutePath.startsWith(`${absoluteWorkDir}/`)) { return `Error: Path "${relPath}" is outside the working directory.`; } diff --git a/packages/core/src/tools/write-file.ts b/packages/core/src/tools/write-file.ts index 763b083..aa69c86 100644 --- a/packages/core/src/tools/write-file.ts +++ b/packages/core/src/tools/write-file.ts @@ -24,10 +24,7 @@ export function createWriteFileTool(workingDirectory: string): ToolDefinition { const absolutePath = await canonicalize(workingDirectory, filePath); const absoluteWorkDir = await canonicalize(workingDirectory); - if ( - absolutePath !== absoluteWorkDir && - !absolutePath.startsWith(`${absoluteWorkDir}/`) - ) { + if (absolutePath !== absoluteWorkDir && !absolutePath.startsWith(`${absoluteWorkDir}/`)) { return `Error: Path "${filePath}" is outside the working directory.`; } diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index 65576cf..8985bd9 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -3,13 +3,62 @@ import type { PermissionChecker, Ruleset } from "../permission/index.js"; // ─── Message Types ─────────────────────────────────────────────── -export type MessageRole = "user" | "assistant" | "tool"; +export type MessageRole = "user" | "assistant" | "system"; + +/** + * A single ordered chunk of content inside a message. The chunk list + * preserves the actual temporal ordering of text, reasoning, tool calls, + * system notices, and errors as they arrived from the model. + * + * Coalescing rules (see plan-chunk-refactor.md): + * - `text` and `thinking` coalesce on consecutive same-type deltas. + * - `tool-batch` coalesces on consecutive `tool-call` events + * (appends a new entry to `calls`). + * - `error` and `system` are always single-event chunks (no coalescing). + */ +export type Chunk = TextChunk | ThinkingChunk | ToolBatchChunk | ErrorChunk | SystemChunk; + +export interface TextChunk { + type: "text"; + text: string; +} + +export interface ThinkingChunk { + type: "thinking"; + text: string; +} + +export interface ToolBatchChunk { + type: "tool-batch"; + calls: ToolBatchEntry[]; +} + +export interface ToolBatchEntry { + id: string; + name: string; + arguments: Record; + result?: string; + isError?: boolean; + shellOutput?: { stdout: string; stderr: string }; +} + +export interface ErrorChunk { + type: "error"; + message: string; + statusCode?: number; +} + +export type SystemChunkKind = "notice" | "model-changed" | "config-reload" | "cancelled"; + +export interface SystemChunk { + type: "system"; + kind: SystemChunkKind; + text: string; +} export interface ChatMessage { role: MessageRole; - content: string; - toolCalls?: ToolCall[]; - toolResults?: ToolResult[]; + chunks: Chunk[]; } export interface ToolCall { diff --git a/packages/core/tests/agent/agent.test.ts b/packages/core/tests/agent/agent.test.ts index be5272f..3d11fc5 100644 --- a/packages/core/tests/agent/agent.test.ts +++ b/packages/core/tests/agent/agent.test.ts @@ -149,11 +149,11 @@ describe("Agent", () => { expect(agent.messages).toHaveLength(2); expect(agent.messages[0]).toMatchObject({ role: "user", - content: "my question", + chunks: [{ type: "text", text: "my question" }], }); expect(agent.messages[1]).toMatchObject({ role: "assistant", - content: "Response", + chunks: [{ type: "text", text: "Response" }], }); }); @@ -182,7 +182,7 @@ describe("Agent", () => { expect(doneEvent).toBeDefined(); expect(doneEvent).toMatchObject({ type: "done", - message: { role: "assistant", content: "Done!" }, + message: { role: "assistant", chunks: [{ type: "text", text: "Done!" }] }, }); }); diff --git a/packages/core/tests/chunks/append.test.ts b/packages/core/tests/chunks/append.test.ts new file mode 100644 index 0000000..c8917c9 --- /dev/null +++ b/packages/core/tests/chunks/append.test.ts @@ -0,0 +1,437 @@ +import { describe, expect, it } from "vitest"; +import { appendEventToChunks, applySystemEvent } from "../../src/chunks/append.js"; +import type { AgentEvent, ChatMessage, Chunk } from "../../src/types/index.js"; + +// ─── helpers ───────────────────────────────────────────────────── + +const td = (delta: string): AgentEvent => ({ type: "text-delta", delta }); +const rd = (delta: string): AgentEvent => ({ type: "reasoning-delta", delta }); +const tc = (id: string, name = "fake_tool", args: Record = {}): AgentEvent => ({ + type: "tool-call", + toolCall: { id, name, arguments: args }, +}); +const tr = (toolCallId: string, result: string, isError = false): AgentEvent => ({ + type: "tool-result", + toolResult: { toolCallId, toolName: "fake_tool", result, isError }, +}); +const so = (data: string, stream: "stdout" | "stderr" = "stdout"): AgentEvent => ({ + type: "shell-output", + data, + stream, +}); +const err = (error: string, statusCode?: number): AgentEvent => ({ + type: "error", + error, + ...(statusCode !== undefined ? { statusCode } : {}), +}); +const notice = (message: string): AgentEvent => ({ type: "notice", message }); +const modelChanged = (keyId: string, modelId: string): AgentEvent => ({ + type: "model-changed", + keyId, + modelId, +}); +const configReload: AgentEvent = { type: "config-reload" }; + +function run(events: AgentEvent[]): Chunk[] { + const chunks: Chunk[] = []; + for (const e of events) appendEventToChunks(chunks, e); + return chunks; +} + +// ─── Required cases from the plan ──────────────────────────────── + +describe("appendEventToChunks — required cases from plan", () => { + it("empty chunks + text-delta → one text chunk with the delta", () => { + const chunks = run([td("Hello")]); + expect(chunks).toEqual([{ type: "text", text: "Hello" }]); + }); + + it("two consecutive text-deltas → one text chunk with concatenated text", () => { + const chunks = run([td("Hello, "), td("world!")]); + expect(chunks).toEqual([{ type: "text", text: "Hello, world!" }]); + }); + + it("text-delta then reasoning-delta → two chunks (text, thinking)", () => { + const chunks = run([td("ans: 42"), rd("I should explain")]); + expect(chunks).toEqual([ + { type: "text", text: "ans: 42" }, + { type: "thinking", text: "I should explain" }, + ]); + }); + + it("text-delta then tool-call → two chunks (text, tool-batch with one entry)", () => { + const chunks = run([td("Looking..."), tc("t1", "read_file", { path: "x" })]); + expect(chunks).toEqual([ + { type: "text", text: "Looking..." }, + { + type: "tool-batch", + calls: [{ id: "t1", name: "read_file", arguments: { path: "x" } }], + }, + ]); + }); + + it("two consecutive tool-calls → one tool-batch with two entries", () => { + const chunks = run([tc("t1", "read_file"), tc("t2", "list_files")]); + expect(chunks).toHaveLength(1); + expect(chunks[0]).toMatchObject({ + type: "tool-batch", + calls: [ + { id: "t1", name: "read_file" }, + { id: "t2", name: "list_files" }, + ], + }); + }); + + it("tool-call then tool-call then text → two chunks (tool-batch with 2 entries, text)", () => { + const chunks = run([tc("t1"), tc("t2"), td("done")]); + expect(chunks).toHaveLength(2); + expect(chunks[0]).toMatchObject({ + type: "tool-batch", + calls: [{ id: "t1" }, { id: "t2" }], + }); + expect(chunks[1]).toEqual({ type: "text", text: "done" }); + }); + + it("tool-result arrives → updates matching tool-call entry in the latest tool-batch chunk by id", () => { + const chunks = run([ + tc("t1"), + tc("t2"), + tr("t1", "first-result"), + tr("t2", "second-result", true), + ]); + expect(chunks).toHaveLength(1); + const batch = chunks[0]; + expect(batch.type).toBe("tool-batch"); + if (batch.type !== "tool-batch") throw new Error("type guard"); + expect(batch.calls[0]).toMatchObject({ id: "t1", result: "first-result", isError: false }); + expect(batch.calls[1]).toMatchObject({ id: "t2", result: "second-result", isError: true }); + }); + + it("shell-output arrives → appends to the most recent tool-call's shellOutput", () => { + const chunks = run([ + tc("t1", "run_shell"), + so("hello\n", "stdout"), + so("world\n", "stdout"), + so("err!\n", "stderr"), + ]); + expect(chunks).toHaveLength(1); + const batch = chunks[0]; + if (batch.type !== "tool-batch") throw new Error("type guard"); + expect(batch.calls[0]?.shellOutput).toEqual({ + stdout: "hello\nworld\n", + stderr: "err!\n", + }); + }); + + it("error event → opens an error chunk; subsequent events go to new chunks", () => { + const chunks = run([td("partial..."), err("network failed", 503), td("recovery")]); + expect(chunks).toEqual([ + { type: "text", text: "partial..." }, + { type: "error", message: "network failed", statusCode: 503 }, + { type: "text", text: "recovery" }, + ]); + }); + + it("system event during text run → closes text, opens system, would re-open text on next text-delta", () => { + const chunks = run([td("first "), notice("model swap"), td("second")]); + expect(chunks).toEqual([ + { type: "text", text: "first " }, + { type: "system", kind: "notice", text: "model swap" }, + { type: "text", text: "second" }, + ]); + }); + + it("two consecutive system events → two separate system chunks (no coalescing)", () => { + const chunks = run([notice("a"), notice("b")]); + expect(chunks).toEqual([ + { type: "system", kind: "notice", text: "a" }, + { type: "system", kind: "notice", text: "b" }, + ]); + }); + + it("interleaved think → text → think → tool → think → text → 6 chunks in order", () => { + const chunks = run([ + rd("planning..."), + td("here goes:"), + rd("hmm, actually"), + tc("t1", "read_file"), + rd("ok now"), + td("and so..."), + ]); + expect(chunks.map((c) => c.type)).toEqual([ + "thinking", + "text", + "thinking", + "tool-batch", + "thinking", + "text", + ]); + expect(chunks[0]).toEqual({ type: "thinking", text: "planning..." }); + expect(chunks[1]).toEqual({ type: "text", text: "here goes:" }); + expect(chunks[2]).toEqual({ type: "thinking", text: "hmm, actually" }); + expect(chunks[3]).toMatchObject({ type: "tool-batch", calls: [{ id: "t1" }] }); + expect(chunks[4]).toEqual({ type: "thinking", text: "ok now" }); + expect(chunks[5]).toEqual({ type: "text", text: "and so..." }); + }); +}); + +// ─── Additional transition coverage ────────────────────────────── + +describe("appendEventToChunks — transition matrix", () => { + it("thinking → thinking coalesces", () => { + const chunks = run([rd("a"), rd("b")]); + expect(chunks).toEqual([{ type: "thinking", text: "ab" }]); + }); + + it("thinking → text opens a new text chunk", () => { + const chunks = run([rd("think"), td("speak")]); + expect(chunks).toEqual([ + { type: "thinking", text: "think" }, + { type: "text", text: "speak" }, + ]); + }); + + it("tool-batch → text opens a new text chunk", () => { + const chunks = run([tc("t1"), td("after tool")]); + expect(chunks).toHaveLength(2); + expect(chunks[1]).toEqual({ type: "text", text: "after tool" }); + }); + + it("text → reasoning-delta after a multi-delta text run still splits cleanly", () => { + const chunks = run([td("a"), td("b"), rd("x"), rd("y"), td("c")]); + expect(chunks).toEqual([ + { type: "text", text: "ab" }, + { type: "thinking", text: "xy" }, + { type: "text", text: "c" }, + ]); + }); + + it("error → text opens a fresh text chunk after the error", () => { + const chunks = run([err("boom"), td("recovered")]); + expect(chunks).toEqual([ + { type: "error", message: "boom" }, + { type: "text", text: "recovered" }, + ]); + }); + + it("two consecutive errors stay as two error chunks (no coalescing)", () => { + const chunks = run([err("first"), err("second", 429)]); + expect(chunks).toEqual([ + { type: "error", message: "first" }, + { type: "error", message: "second", statusCode: 429 }, + ]); + }); + + it("system → tool-call opens a new tool-batch (does not extend the system chunk)", () => { + const chunks = run([notice("info"), tc("t1")]); + expect(chunks).toHaveLength(2); + expect(chunks[1]).toMatchObject({ type: "tool-batch", calls: [{ id: "t1" }] }); + }); + + it("tool-result with no matching call is silently dropped", () => { + const chunks = run([td("hi"), tr("no-such-id", "ignored")]); + expect(chunks).toEqual([{ type: "text", text: "hi" }]); + }); + + it("shell-output with no tool-batch in scope is silently dropped", () => { + const chunks = run([td("hi"), so("orphan")]); + expect(chunks).toEqual([{ type: "text", text: "hi" }]); + }); + + it("tool-result for an earlier batch still updates the right call (results can arrive late)", () => { + // Order: tc -> td -> tc(new batch) -> tr(for first batch's id) + const chunks = run([ + tc("t1", "read_file"), + td("midstream text"), + tc("t2", "list_files"), + tr("t1", "late result for first"), + ]); + // Two tool-batches, separated by the text chunk. The result must land + // inside the FIRST batch (the one containing t1), not the most-recent. + expect(chunks.map((c) => c.type)).toEqual(["tool-batch", "text", "tool-batch"]); + const first = chunks[0]; + if (first?.type !== "tool-batch") throw new Error("type guard"); + expect(first.calls[0]).toMatchObject({ id: "t1", result: "late result for first" }); + const second = chunks[2]; + if (second?.type !== "tool-batch") throw new Error("type guard"); + // t2 in the second batch has no result yet. + expect(second.calls[0]?.result).toBeUndefined(); + }); + + it("shell-output goes to the most recent tool-batch's most recent entry, even with intervening chunks", () => { + // First batch's tool runs, emits output, then later a second batch starts and emits output. + const chunks = run([ + tc("t1", "run_shell"), + so("first-stdout\n"), + td("interlude"), + tc("t2", "run_shell"), + so("second-stdout\n"), + ]); + expect(chunks.map((c) => c.type)).toEqual(["tool-batch", "text", "tool-batch"]); + const first = chunks[0]; + const second = chunks[2]; + if (first?.type !== "tool-batch" || second?.type !== "tool-batch") { + throw new Error("type guard"); + } + expect(first.calls[0]?.shellOutput).toEqual({ stdout: "first-stdout\n", stderr: "" }); + expect(second.calls[0]?.shellOutput).toEqual({ stdout: "second-stdout\n", stderr: "" }); + }); + + it("model-changed event opens a system chunk with kind=model-changed", () => { + const chunks = run([modelChanged("anthropic-1", "claude-sonnet-4")]); + expect(chunks).toEqual([ + { + type: "system", + kind: "model-changed", + text: "Switched to claude-sonnet-4 (anthropic-1)", + }, + ]); + }); + + it("config-reload event opens a system chunk with kind=config-reload", () => { + const chunks = run([configReload]); + expect(chunks).toEqual([ + { type: "system", kind: "config-reload", text: "Configuration reloaded" }, + ]); + }); + + it("non-content events (status / done / task-list-update / message-queued etc.) are no-ops", () => { + const chunks = run([ + td("hello"), + { type: "status", status: "running" }, + { type: "task-list-update", tasks: [] }, + { + type: "tab-created", + id: "tab1", + title: "x", + keyId: null, + modelId: null, + parentTabId: null, + workingDirectory: null, + }, + { type: "message-queued", tabId: "t", messageId: "m", message: "queued" }, + { type: "message-consumed", tabId: "t", messageIds: ["m"] }, + { type: "message-cancelled", tabId: "t", messageId: "m" }, + { + type: "done", + message: { role: "assistant", chunks: [] }, + }, + td(" world"), + ]); + expect(chunks).toEqual([{ type: "text", text: "hello world" }]); + }); + + it("error chunk omits statusCode when not provided", () => { + const chunks = run([err("boom")]); + expect(chunks).toEqual([{ type: "error", message: "boom" }]); + // And no stray statusCode key: + expect(Object.hasOwn(chunks[0], "statusCode")).toBe(false); + }); + + it("tool-result updates isError=false correctly (default success path)", () => { + const chunks = run([tc("t1"), tr("t1", "ok", false)]); + const batch = chunks[0]; + if (batch?.type !== "tool-batch") throw new Error("type guard"); + expect(batch.calls[0]).toMatchObject({ result: "ok", isError: false }); + }); +}); + +// ─── applySystemEvent routing ──────────────────────────────────── + +describe("applySystemEvent", () => { + type Msg = { id: string; role: "user" | "assistant" | "system"; chunks: Chunk[] }; + + let counter = 0; + const idFactory = () => `gen-${++counter}`; + + it("creates a new role:system message when message list is empty", () => { + counter = 0; + const messages: Msg[] = []; + const result = applySystemEvent(messages, { kind: "notice", text: "hello" }, idFactory); + expect(result.messageId).toBe("gen-1"); + expect(messages).toEqual([ + { + id: "gen-1", + role: "system", + chunks: [{ type: "system", kind: "notice", text: "hello" }], + }, + ]); + }); + + it("creates a new role:system message when last message is user", () => { + counter = 0; + const messages: Msg[] = [{ id: "u1", role: "user", chunks: [{ type: "text", text: "hi" }] }]; + const result = applySystemEvent(messages, { kind: "model-changed", text: "swap" }, idFactory); + expect(result.messageId).toBe("gen-1"); + expect(messages).toHaveLength(2); + expect(messages[1]).toMatchObject({ + id: "gen-1", + role: "system", + chunks: [{ type: "system", kind: "model-changed", text: "swap" }], + }); + }); + + it("creates a new role:system message when last message is assistant", () => { + counter = 0; + const messages: Msg[] = [ + { id: "a1", role: "assistant", chunks: [{ type: "text", text: "done" }] }, + ]; + applySystemEvent(messages, { kind: "config-reload", text: "reloaded" }, idFactory); + expect(messages).toHaveLength(2); + expect(messages[1]?.role).toBe("system"); + }); + + it("appends a chunk to the existing system message when last message is role:system", () => { + counter = 0; + const messages: Msg[] = [ + { + id: "s1", + role: "system", + chunks: [{ type: "system", kind: "notice", text: "first" }], + }, + ]; + const result = applySystemEvent(messages, { kind: "notice", text: "second" }, idFactory); + expect(result.messageId).toBe("s1"); + expect(messages).toHaveLength(1); + expect(messages[0]?.chunks).toEqual([ + { type: "system", kind: "notice", text: "first" }, + { type: "system", kind: "notice", text: "second" }, + ]); + }); + + it("multiple consecutive calls accumulate in the same system message", () => { + counter = 0; + const messages: Msg[] = [{ id: "u1", role: "user", chunks: [{ type: "text", text: "hi" }] }]; + applySystemEvent(messages, { kind: "notice", text: "a" }, idFactory); + applySystemEvent(messages, { kind: "notice", text: "b" }, idFactory); + applySystemEvent(messages, { kind: "model-changed", text: "c" }, idFactory); + expect(messages).toHaveLength(2); + const sys = messages[1]; + expect(sys?.role).toBe("system"); + expect(sys?.chunks).toEqual([ + { type: "system", kind: "notice", text: "a" }, + { type: "system", kind: "notice", text: "b" }, + { type: "system", kind: "model-changed", text: "c" }, + ]); + }); + + it("returns the same messageId across appends to the same system message", () => { + counter = 0; + const messages: Msg[] = []; + const first = applySystemEvent(messages, { kind: "notice", text: "a" }, idFactory); + const second = applySystemEvent(messages, { kind: "notice", text: "b" }, idFactory); + expect(first.messageId).toBe(second.messageId); + }); + + it("works against the core ChatMessage shape (with id added by caller)", () => { + // Sanity: ChatMessage has {role, chunks}; the caller layers id on top. + // This test exists to prove the generic constraint doesn't reject the + // real persistence/in-memory shape we'll see in Phase 5. + counter = 0; + const messages: Array = []; + const result = applySystemEvent(messages, { kind: "cancelled", text: "user stop" }, idFactory); + expect(result.messageId).toBe("gen-1"); + expect(messages[0]?.role).toBe("system"); + expect(messages[0]?.chunks[0]).toMatchObject({ kind: "cancelled", text: "user stop" }); + }); +}); diff --git a/packages/core/tests/tools/write-file.test.ts b/packages/core/tests/tools/write-file.test.ts index 6853a8e..f071e12 100644 --- a/packages/core/tests/tools/write-file.test.ts +++ b/packages/core/tests/tools/write-file.test.ts @@ -1,4 +1,4 @@ -import { access, mkdtemp, readFile, readdir, rm, symlink } from "node:fs/promises"; +import { access, mkdtemp, readdir, readFile, rm, symlink } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; diff --git a/packages/frontend/package.json b/packages/frontend/package.json index 5228e4a..453df07 100644 --- a/packages/frontend/package.json +++ b/packages/frontend/package.json @@ -31,6 +31,7 @@ } }, "dependencies": { + "@dispatch/core": "workspace:*", "dompurify": "^3.4.5", "highlight.js": "^11.11.1", "marked": "^18.0.4", diff --git a/packages/frontend/src/lib/components/ChatMessage.svelte b/packages/frontend/src/lib/components/ChatMessage.svelte index c6b6034..0c85349 100644 --- a/packages/frontend/src/lib/components/ChatMessage.svelte +++ b/packages/frontend/src/lib/components/ChatMessage.svelte @@ -1,7 +1,7 @@ -{#if isSystem} -
-
- {#each message.content as segment} - {#if segment.type === "text"} - {segment.text} - {/if} - {/each} -
-
-{:else} -
-
- {#if message.thinking} +{#snippet renderChunks(chunks: Chunk[], streaming: boolean | undefined)} + {#each chunks as chunk, i (chunkKey(chunk, i))} + {#if chunk.type === "text"} + + {:else if chunk.type === "thinking"}
Thinking...
-

{message.thinking}

+

{chunk.text}

+ {:else if chunk.type === "tool-batch"} + {#each chunk.calls as call (call.id)} + + {/each} + {:else if chunk.type === "error"} +
+
+ {chunk.message} + {#if chunk.statusCode !== undefined} + status {chunk.statusCode} + {/if} +
+
+ {:else if chunk.type === "system"} +
+ {SYSTEM_KIND_LABEL[chunk.kind]}: + {chunk.text} +
{/if} - {#each message.content as segment, i (segment.type === "tool-call" ? segment.id : i)} - {#if segment.type === "text"} - - {:else if segment.type === "tool-call"} - - {/if} - {/each} + {/each} +{/snippet} + +{#if isSystem} +
+
+ {@render renderChunks(message.chunks, false)} +
+
+{:else} +
+
+ {@render renderChunks(message.chunks, message.isStreaming)} {#if message.isStreaming} {/if} diff --git a/packages/frontend/src/lib/components/ToolCallDisplay.svelte b/packages/frontend/src/lib/components/ToolCallDisplay.svelte index 7c7aef6..1b4ebca 100644 --- a/packages/frontend/src/lib/components/ToolCallDisplay.svelte +++ b/packages/frontend/src/lib/components/ToolCallDisplay.svelte @@ -1,8 +1,8 @@