diff options
22 files changed, 1967 insertions, 595 deletions
@@ -42,6 +42,7 @@ "name": "@dispatch/frontend", "version": "0.0.1", "dependencies": { + "@dispatch/core": "workspace:*", "dompurify": "^3.4.5", "highlight.js": "^11.11.1", "marked": "^18.0.4", diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 487b09f..0f06683 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -3,9 +3,12 @@ import { type AgentEvent, type AgentSkillMapping, type AgentStatus, + appendEventToChunks, appendMessage, + applySystemEvent, BackgroundShellStore, BackgroundTranscriptStore, + type Chunk, type ClaudeAccount, clearSpillForTab, configToRuleset, @@ -23,6 +26,7 @@ import { createYoutubeTranscribeTool, type DispatchConfig, getClaudeAccountsFromDB, + getMessagesForTab, getSetting, loadConfig, loadSkills, @@ -32,7 +36,9 @@ import { refreshAccountCredentialsAsync, resolveApiKey, type SkillDefinition, + type SystemChunkKind, TaskList, + updateMessage, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -160,6 +166,14 @@ interface TabAgent { shellStore: BackgroundShellStore; /** Store for transcript requests backgrounded due to user interrupt. */ transcriptStore: BackgroundTranscriptStore; + /** + * In-flight assistant message chunks for the active turn. `null` when + * no turn is running. Out-of-band system events (config-reload, + * cancel, etc.) target this list when present. + */ + currentChunks: Chunk[] | null; + /** DB id of the in-flight assistant message (if persisted yet). */ + currentAssistantId: string | null; } export class AgentManager { @@ -222,9 +236,10 @@ export class AgentManager { for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } - // Emit config-reload to all tabs + // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); + this.routeSystemEventToTab(tabId, "config-reload", "Configuration reloaded"); } }); @@ -234,9 +249,10 @@ export class AgentManager { for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } - // Emit config-reload to all tabs + // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); + this.routeSystemEventToTab(tabId, "config-reload", "Skills reloaded"); } }); } @@ -297,6 +313,8 @@ export class AgentManager { queueListeners: [], shellStore: new BackgroundShellStore(), transcriptStore: new BackgroundTranscriptStore(), + currentChunks: null, + currentAssistantId: null, }; this.tabAgents.set(tabId, tabAgent); } @@ -671,9 +689,80 @@ export class AgentManager { } } + /** + * Persist a system chunk to a tab's message history. + * + * If an assistant turn is in flight (`currentChunks` is non-null), the + * chunk is appended to the in-flight assistant message's chunk list — + * the final `appendMessage` / `updateMessage` call at end-of-turn picks + * it up automatically. + * + * Otherwise we load the tab's persisted messages, run them through + * `applySystemEvent` (which either appends to an existing trailing + * `role: "system"` message or creates a new one), then persist the + * delta via `appendMessage` / `updateMessage`. + */ + private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void { + const tabAgent = this.tabAgents.get(tabId); + + // Turn in flight → append directly to the in-flight chunk list. + // The chunk lands on the assistant message when it's persisted at + // turn-end (or the assistant message is updated mid-turn elsewhere). + if (tabAgent?.currentChunks) { + tabAgent.currentChunks.push({ type: "system", kind, text }); + if (tabAgent.currentAssistantId) { + try { + updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); + } catch { + // Best-effort — the final persistence in processMessage will + // flush the same chunks again. + } + } + return; + } + + // No turn in flight → route via applySystemEvent against the + // persisted message list. Either appends to a trailing system + // message or creates a fresh one. + try { + const rows = getMessagesForTab(tabId); + const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks })); + const before = messages[messages.length - 1]; + const { messageId } = applySystemEvent(messages, { kind, text }); + const target = messages.find((m) => m.id === messageId); + if (!target) return; + if (before && before.id === messageId) { + // Appended to existing trailing system message. + updateMessage(messageId, JSON.stringify(target.chunks)); + } else { + // Newly created system message. + appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks)); + } + } catch { + // DB not available (e.g. tab not yet created) — drop silently. + } + } + stopTab(tabId: string): void { const tabAgent = this.tabAgents.get(tabId); if (tabAgent) { + // If a turn is in flight, drop a `cancelled` system chunk into + // the in-flight assistant message so the user sees an explicit + // "Generation cancelled by user" marker at the cancellation point. + if (tabAgent.currentChunks) { + tabAgent.currentChunks.push({ + type: "system", + kind: "cancelled", + text: "Generation cancelled by user", + }); + if (tabAgent.currentAssistantId) { + try { + updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); + } catch { + // best-effort + } + } + } tabAgent.abortController?.abort(); tabAgent.status = "idle"; tabAgent.agent = null; @@ -891,15 +980,28 @@ export class AgentManager { currentKeyId = entry.key_id || undefined; currentModelId = entry.model_id || undefined; allOutput = ""; - let assistantText = ""; - let assistantThinking = ""; - const assistantToolCalls: Array<{ - id: string; - name: string; - arguments: Record<string, unknown>; - result?: string; - isError?: boolean; - }> = []; + + // Single ordered chunk list for the assistant turn — replaces the + // previous (text + toolCalls + thinking) tri-accumulator pattern. + // Persisted progressively (insert on first chunk, update thereafter) + // so out-of-band routes (config-reload, cancel) see real DB rows. + const chunks: Chunk[] = []; + const assistantId = crypto.randomUUID(); + let assistantPersisted = false; + tabAgent.currentChunks = chunks; + tabAgent.currentAssistantId = assistantId; + + const flushAssistant = (): void => { + if (chunks.length === 0) return; + const json = JSON.stringify(chunks); + if (!assistantPersisted) { + appendMessage(tabId, assistantId, "assistant", json); + assistantPersisted = true; + } else { + updateMessage(assistantId, json); + } + }; + let attemptError: string | null = null; try { @@ -925,11 +1027,17 @@ export class AgentManager { message, reasoningEffort ? { reasoningEffort } : undefined, )) { - // Stop processing if the tab was aborted (closed/stopped) + // Stop processing if the tab was aborted (closed/stopped). + // stopTab() already injected a `cancelled` system chunk into + // `chunks` before flipping the abort flag, so we just need + // to flush and exit. if (tabAgent.abortController?.signal.aborted) break; if (event.type === "error") { attemptError = event.error; + // Record the error as a chunk so it's part of the + // persisted turn history. + appendEventToChunks(chunks, event); break; } @@ -938,68 +1046,41 @@ export class AgentManager { } this.emit(event, tabId); - // Accumulate content for DB persistence + // For diagnostics / child agent result harvesting, keep a + // flat string copy of plain text output. if (event.type === "text-delta") { - assistantText += event.delta; allOutput += event.delta; - } else if (event.type === "reasoning-delta") { - assistantThinking += event.delta; - } else if (event.type === "tool-call") { - assistantToolCalls.push({ - id: event.toolCall.id, - name: event.toolCall.name, - arguments: event.toolCall.arguments, - }); - } else if (event.type === "tool-result") { - const tc = assistantToolCalls.find((t) => t.id === event.toolResult.toolCallId); - if (tc) { - tc.result = event.toolResult.result; - tc.isError = event.toolResult.isError; - } - } else if (event.type === "done") { - // Persist assistant message to DB - const contentSegments: Array<Record<string, unknown>> = []; - if (assistantText) contentSegments.push({ type: "text", text: assistantText }); - for (const tc of assistantToolCalls) { - contentSegments.push({ type: "tool-call", ...tc }); - } - if (contentSegments.length > 0) { - appendMessage( - tabId, - crypto.randomUUID(), - "assistant", - JSON.stringify(contentSegments), - assistantThinking || undefined, - ); - } - // Reset for next turn - assistantText = ""; - assistantThinking = ""; - assistantToolCalls.length = 0; } + + if (event.type === "done") { + // End of turn — flush the accumulated chunks. Reset the + // in-flight pointers so out-of-band system events route + // through `applySystemEvent` against the persisted list + // instead of mutating a stale array. + flushAssistant(); + chunks.length = 0; + assistantPersisted = true; // suppress post-loop flush + tabAgent.currentChunks = null; + tabAgent.currentAssistantId = null; + continue; + } + + // Route every content-bearing event through the shared helper. + // `appendEventToChunks` ignores lifecycle events (status / done + // / task-list-update / tab-created / message-* / etc), so it's + // safe to call unconditionally. + appendEventToChunks(chunks, event); } } catch (err) { console.error(`[dispatch] processMessage error for tab ${tabId}:`, err); attemptError = err instanceof Error ? err.message : String(err); } - // Flush any accumulated assistant content from this attempt - if (assistantText || assistantToolCalls.length > 0) { - const contentSegments: Array<Record<string, unknown>> = []; - if (assistantText) contentSegments.push({ type: "text", text: assistantText }); - for (const tc of assistantToolCalls) { - contentSegments.push({ type: "tool-call", ...tc }); - } - if (contentSegments.length > 0) { - appendMessage( - tabId, - crypto.randomUUID(), - "assistant", - JSON.stringify(contentSegments), - assistantThinking || undefined, - ); - } - } + // Flush any accumulated assistant content from this attempt (covers + // the abort/error/exception paths where we never saw a `done`). + flushAssistant(); + tabAgent.currentChunks = null; + tabAgent.currentAssistantId = null; // No error — success if (!attemptError) { @@ -1024,11 +1105,21 @@ export class AgentManager { `Key "${tabAgent.keyId}" rate limited. ` + `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; console.warn(`[dispatch] ${fallbackMsg}`); + // Persist the notice + model-change as system chunks. We're + // between turns here (just flushed the previous assistant + // message), so the helper routes them into a `role: "system"` + // message via `applySystemEvent`. this.emit({ type: "notice", message: fallbackMsg }, tabId); + this.routeSystemEventToTab(tabId, "notice", fallbackMsg); this.emit( { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, tabId, ); + this.routeSystemEventToTab( + tabId, + "model-changed", + `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, + ); tabAgent.agent = null; continue; } diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index a1f2d75..28fa044 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -13,7 +13,10 @@ vi.mock("@dispatch/core", () => ({ yield { type: "text-delta", delta: "world" } as const; yield { type: "done", - message: { role: "assistant", content: "Hello world" }, + message: { + role: "assistant", + chunks: [{ type: "text", text: "Hello world" }], + }, } as const; yield { type: "status", status: "idle" } as const; } @@ -178,6 +181,16 @@ vi.mock("@dispatch/core", () => ({ return null; }, appendMessage() {}, + updateMessage() {}, + getMessagesForTab() { + return []; + }, + appendEventToChunks(_chunks: unknown[], _event: unknown) { + // no-op stub; chunk accumulation isn't exercised in these unit tests + }, + applySystemEvent(_messages: unknown[], _event: unknown) { + return { messageId: "mock-system-msg" }; + }, BackgroundShellStore: class MockBackgroundShellStore { has() { return false; diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index 69a6676..49bae49 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -14,7 +14,10 @@ vi.mock("@dispatch/core", () => ({ yield { type: "text-delta", delta: "world" } as const; yield { type: "done", - message: { role: "assistant", content: "Hello world" }, + message: { + role: "assistant", + chunks: [{ type: "text", text: "Hello world" }], + }, } as const; yield { type: "status", status: "idle" } as const; } @@ -179,6 +182,16 @@ vi.mock("@dispatch/core", () => ({ return null; }, appendMessage() {}, + updateMessage() {}, + getMessagesForTab() { + return []; + }, + appendEventToChunks(_chunks: unknown[], _event: unknown) { + // no-op stub + }, + applySystemEvent(_messages: unknown[], _event: unknown) { + return { messageId: "mock-system-msg" }; + }, BackgroundShellStore: class MockBackgroundShellStore { has() { return false; diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index ec83cad..4e58378 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -1,6 +1,7 @@ import { dirname } from "node:path"; import type { CoreMessage, CoreSystemMessage } from "ai"; import { streamText } from "ai"; +import { appendEventToChunks } from "../chunks/append.js"; import { buildBillingHeaderValue, SYSTEM_IDENTITY } from "../credentials/claude.js"; import { createProvider, prefixToolName, unprefixToolName } from "../llm/provider.js"; import { canonicalize } from "../tools/path-utils.js"; @@ -12,35 +13,118 @@ import type { AgentEvent, AgentStatus, ChatMessage, + Chunk, QueueCallbacks, ToolCall, ToolResult, } from "../types/index.js"; +/** + * Rebuild AI SDK `CoreMessage[]` from our internal `ChatMessage[]`. + * + * Strip rules (see plan-chunk-refactor.md): + * - `role: "system"` messages are skipped wholesale (they're display-only + * standalone-system bubbles that exist outside any model turn). + * - `error` chunks are skipped — the turn ended; the LLM doesn't need them. + * - `system` chunks are skipped — display-only notices. + * - `text` chunks → `{ type: "text", text }` parts. + * - `thinking` chunks → `{ type: "reasoning", text }` parts (handles + * Claude's `interleaved-thinking-2025-05-14` round-trip). + * - `tool-batch` chunks → one `{ type: "tool-call" }` part per entry + * inside the current assistant message, followed by a separate + * `{ role: "tool", content: [{ type: "tool-result", ... }] }` message + * for every entry that has a result. This mirrors what the AI SDK + * expects on the wire. + * + * Mixed-resolution tool batches (some entries with `result`, some without): + * we emit tool-results only for the entries that have them. In practice + * the agent resolves every tool call in a step before looping back to the + * LLM, so this case only arises mid-step (where the message hasn't been + * round-tripped to the LLM yet) and is benign. + */ function toCoreMessages(messages: ChatMessage[], useToolPrefix?: boolean): CoreMessage[] { const result: CoreMessage[] = []; for (const msg of messages) { + if (msg.role === "system") continue; + if (msg.role === "user") { - result.push({ role: "user", content: msg.content }); - } else if (msg.role === "assistant") { - const parts: Array< - | { type: "text"; text: string } - | { type: "tool-call"; toolCallId: string; toolName: string; args: Record<string, unknown> } - > = [{ type: "text", text: msg.content }]; - for (const tc of msg.toolCalls ?? []) { - const toolName = useToolPrefix ? prefixToolName(tc.name) : tc.name; - parts.push({ type: "tool-call", toolCallId: tc.id, toolName, args: tc.arguments }); + // User messages in our model can in theory contain non-text chunks, + // but in practice the UI only produces text. Concatenate any text + // chunks; ignore anything else. + const text = msg.chunks + .filter((c): c is Extract<Chunk, { type: "text" }> => c.type === "text") + .map((c) => c.text) + .join(""); + result.push({ role: "user", content: text }); + continue; + } + + // role === "assistant" + const parts: Array< + | { type: "text"; text: string } + | { type: "reasoning"; text: string } + | { type: "tool-call"; toolCallId: string; toolName: string; args: Record<string, unknown> } + > = []; + const trailingToolResults: Array<{ + toolCallId: string; + toolName: string; + result: string; + }> = []; + + for (const chunk of msg.chunks) { + switch (chunk.type) { + case "text": + parts.push({ type: "text", text: chunk.text }); + break; + case "thinking": + parts.push({ type: "reasoning", text: chunk.text }); + break; + case "tool-batch": + for (const entry of chunk.calls) { + const toolName = useToolPrefix ? prefixToolName(entry.name) : entry.name; + parts.push({ + type: "tool-call", + toolCallId: entry.id, + toolName, + args: entry.arguments, + }); + if (entry.result !== undefined) { + trailingToolResults.push({ + toolCallId: entry.id, + toolName, + result: entry.result, + }); + } + } + break; + case "error": + case "system": + // Strip — not sent back to the LLM. + break; } + } + + // Skip the assistant message entirely if it has no parts (e.g., a + // turn that consisted solely of system/error chunks). Emitting an + // assistant message with empty content can confuse some providers. + if (parts.length === 0 && trailingToolResults.length === 0) continue; + + if (parts.length > 0) { result.push({ role: "assistant", content: parts }); - for (const tr of msg.toolResults ?? []) { - const toolName = useToolPrefix ? prefixToolName(tr.toolName) : tr.toolName; - result.push({ - role: "tool", - content: [ - { type: "tool-result", toolCallId: tr.toolCallId, toolName, result: tr.result }, - ], - }); - } + } + + for (const tr of trailingToolResults) { + result.push({ + role: "tool", + content: [ + { + type: "tool-result", + toolCallId: tr.toolCallId, + toolName: tr.toolName, + result: tr.result, + }, + ], + }); } } return result; @@ -204,8 +288,7 @@ export class Agent { // to read it without prompting the user. Bypassing the external- // directory check here keeps the inspection flow frictionless. const isSpillPath = - resolvedPath === resolvedSpillRoot || - resolvedPath.startsWith(`${resolvedSpillRoot}/`); + resolvedPath === resolvedSpillRoot || resolvedPath.startsWith(`${resolvedSpillRoot}/`); if (!isUnderWorkdir && !isSpillPath) { const permissionType = @@ -288,7 +371,7 @@ export class Agent { this.status = "running"; yield { type: "status", status: "running" }; - this.messages.push({ role: "user", content: userMessage }); + this.messages.push({ role: "user", chunks: [{ type: "text", text: userMessage }] }); const registry = createToolRegistry(this.config.tools); // `isClaudeOAuth` gates Claude-Code-CLI-specific behavior: billing-header @@ -322,19 +405,36 @@ export class Agent { // request as coming from the official CLI. let systemPrompt = this.config.systemPrompt; if (isClaudeOAuth) { - const billingHeader = buildBillingHeaderValue(this.messages); + // `buildBillingHeaderValue` historically took `{ role, content: string }[]`. + // Project the new chunk-based ChatMessage shape onto that legacy shape + // by stringifying text chunks. Only the first user message's text is + // actually used (see extractFirstUserMessageText) — non-text chunks + // safely contribute nothing. + const legacyShape = this.messages.map((m) => ({ + role: m.role, + content: m.chunks + .filter((c): c is Extract<Chunk, { type: "text" }> => c.type === "text") + .map((c) => c.text) + .join(""), + })); + const billingHeader = buildBillingHeaderValue(legacyShape); systemPrompt = `${billingHeader}\n${SYSTEM_IDENTITY}\n\n${systemPrompt}`; } try { - // Track the final assistant message across all steps - let finalText = ""; - const allToolCalls: ToolCall[] = []; - const allToolResults: ToolResult[] = []; + // Single chunk accumulator for the entire assistant turn (all steps). + // All event-driven mutations go through `appendEventToChunks`. + const chunks: Chunk[] = []; // We build up a local message list for multi-turn within one run() call - // that includes tool results fed back to the LLM + // that includes tool results fed back to the LLM. Each step appends + // the assistant's evolving chunk list as one ChatMessage; subsequent + // steps see prior tool calls and their results via the chunks. const stepMessages: ChatMessage[] = [...this.messages]; + // The assistant ChatMessage for the current turn, shared across steps + // so its `chunks` reference matches the accumulator above. We push + // it once when the first step has actual output to record. + let assistantTurnMessage: ChatMessage | null = null; for (let step = 0; step < MAX_STEPS; step++) { const effort = options?.reasoningEffort ?? this.config.reasoningEffort ?? "max"; @@ -397,17 +497,35 @@ export class Agent { const result = streamText(streamOptions); - let stepText = ""; + // Per-step tool-call tracking — needed because we only loop back + // to the LLM if this step produced tool calls. The actual chunk + // state for the turn lives in `chunks`. const stepToolCalls: ToolCall[] = []; + // Ensure we have an assistant message in stepMessages whose + // `chunks` reference is shared with our accumulator. Only push + // once; subsequent steps mutate the same chunks array. + if (assistantTurnMessage === null) { + assistantTurnMessage = { role: "assistant", chunks }; + stepMessages.push(assistantTurnMessage); + } + try { for await (const event of result.fullStream) { if (event.type === "text-delta") { - stepText += event.textDelta; - finalText += event.textDelta; - yield { type: "text-delta", delta: event.textDelta }; + const internalEvent: AgentEvent = { + type: "text-delta", + delta: event.textDelta, + }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; } else if (event.type === "reasoning") { - yield { type: "reasoning-delta", delta: event.textDelta }; + const internalEvent: AgentEvent = { + type: "reasoning-delta", + delta: event.textDelta, + }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; } else if (event.type === "tool-call") { const rawName = event.toolName; const toolName = isClaudeOAuth ? unprefixToolName(rawName) : rawName; @@ -417,18 +535,21 @@ export class Agent { arguments: event.args as Record<string, unknown>, }; stepToolCalls.push(toolCall); - allToolCalls.push(toolCall); - yield { type: "tool-call", toolCall }; + const internalEvent: AgentEvent = { type: "tool-call", toolCall }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; } else if (event.type === "error") { const errRecord = event.error as unknown as Record<string, unknown>; const statusCode = typeof errRecord.statusCode === "number" ? errRecord.statusCode : undefined; const errorMsg = formatError(event.error, this.config); - yield { + const internalEvent: AgentEvent = { type: "error", error: errorMsg, ...(statusCode !== undefined ? { statusCode } : {}), }; + appendEventToChunks(chunks, internalEvent); + yield internalEvent; this.status = "error"; yield { type: "status", status: "error" }; return; @@ -452,8 +573,9 @@ export class Agent { arguments: {}, }; stepToolCalls.push(badToolCall); - allToolCalls.push(badToolCall); - yield { type: "tool-call", toolCall: badToolCall }; + const tcEvent: AgentEvent = { type: "tool-call", toolCall: badToolCall }; + appendEventToChunks(chunks, tcEvent); + yield tcEvent; const badToolResult: ToolResult = { toolCallId: fakeId, @@ -461,39 +583,33 @@ export class Agent { result: errorResult, isError: true, }; - allToolResults.push(badToolResult); - yield { type: "tool-result", toolResult: badToolResult }; + const trEvent: AgentEvent = { type: "tool-result", toolResult: badToolResult }; + appendEventToChunks(chunks, trEvent); + yield trEvent; } - // No tool calls means the agent is done + // No tool calls means the agent is done — the assistant message + // already exists in stepMessages with up-to-date chunks. if (stepToolCalls.length === 0) { - // Add the final assistant message to step messages (for history) - stepMessages.push({ - role: "assistant", - content: stepText, - }); break; } - // Add assistant message with tool calls to step messages - stepMessages.push({ - role: "assistant", - content: stepText, - toolCalls: stepToolCalls, - }); - - // Execute tool calls manually - const stepToolResults: ToolResult[] = []; - // Track tool calls that already have results (e.g. synthetic unavailable-tool errors) - const alreadyResolved = new Set(allToolResults.map((r) => r.toolCallId)); + // Execute tool calls manually. Their results merge back into the + // `chunks` accumulator via `appendEventToChunks`, which routes + // each result to the correct entry inside its tool-batch chunk. + // Track which calls already have a recorded result (from the + // synthetic unavailable-tool error path above) so we don't + // re-execute them. + const alreadyResolved = new Set<string>(); + for (const c of chunks) { + if (c.type !== "tool-batch") continue; + for (const entry of c.calls) { + if (entry.result !== undefined) alreadyResolved.add(entry.id); + } + } for (const tc of stepToolCalls) { - // Skip execution for tool calls that already have synthetic results - if (alreadyResolved.has(tc.id)) { - const existing = allToolResults.find((r) => r.toolCallId === tc.id); - if (existing) stepToolResults.push(existing); - continue; - } + if (alreadyResolved.has(tc.id)) continue; const shellOutputQueue: Array<{ data: string; stream: "stdout" | "stderr" }> = []; @@ -506,7 +622,15 @@ export class Agent { while (toolResult === undefined) { if (shellOutputQueue.length > 0) { const item = shellOutputQueue.shift(); - if (item) yield { type: "shell-output", data: item.data, stream: item.stream }; + if (item) { + const shellEvent: AgentEvent = { + type: "shell-output", + data: item.data, + stream: item.stream, + }; + appendEventToChunks(chunks, shellEvent); + yield shellEvent; + } continue; } const raceResult = await Promise.race([ @@ -523,7 +647,15 @@ export class Agent { // Drain any remaining shell output emitted before we read the result while (shellOutputQueue.length > 0) { const item = shellOutputQueue.shift(); - if (item) yield { type: "shell-output", data: item.data, stream: item.stream }; + if (item) { + const shellEvent: AgentEvent = { + type: "shell-output", + data: item.data, + stream: item.stream, + }; + appendEventToChunks(chunks, shellEvent); + yield shellEvent; + } } // Check for queued user messages and append them to the tool result @@ -539,36 +671,22 @@ export class Agent { } } - stepToolResults.push(finalToolResult); - allToolResults.push(finalToolResult); - yield { type: "tool-result", toolResult: finalToolResult }; - } - - // Add tool results back to step messages so LLM can see them - // We append them to the last assistant message's toolResults - const lastMsg = stepMessages[stepMessages.length - 1]; - if (lastMsg) { - lastMsg.toolResults = stepToolResults; - } - } - - // If we exhausted MAX_STEPS and there were pending tool calls, surface an error - if (stepMessages.length > 0) { - const lastMsg = stepMessages[stepMessages.length - 1]; - if (lastMsg?.toolCalls && lastMsg.toolCalls.length > 0 && !lastMsg.toolResults) { - yield { - type: "error", - error: `Agent reached MAX_STEPS (${MAX_STEPS}) with unresolved tool calls`, - }; + const trEvent: AgentEvent = { type: "tool-result", toolResult: finalToolResult }; + appendEventToChunks(chunks, trEvent); + yield trEvent; } } - const assistantMessage: ChatMessage = { + // Build the final assistant message from the accumulated chunks. + // If no assistant turn message was ever created (e.g., the model + // produced nothing — unusual but possible), synthesize an empty one. + const assistantMessage: ChatMessage = assistantTurnMessage ?? { role: "assistant", - content: finalText, - toolCalls: allToolCalls.length > 0 ? allToolCalls : undefined, - toolResults: allToolResults.length > 0 ? allToolResults : undefined, + chunks, }; + // `assistantTurnMessage` was pushed into `stepMessages` but not into + // `this.messages` — push it now so the agent's outward-facing + // history reflects the turn. this.messages.push(assistantMessage); // Drain any remaining queued messages that arrived after the last tool call @@ -580,7 +698,7 @@ export class Agent { const userMessages = remaining.map((m) => m.message).join("\n---\n"); this.messages.push({ role: "user", - content: userMessages, + chunks: [{ type: "text", text: userMessages }], }); } } diff --git a/packages/core/src/chunks/append.ts b/packages/core/src/chunks/append.ts new file mode 100644 index 0000000..9c2a367 --- /dev/null +++ b/packages/core/src/chunks/append.ts @@ -0,0 +1,265 @@ +/** + * Chunk-builder helper. + * + * `appendEventToChunks` is the single source of truth for how a stream of + * `AgentEvent`s collapses into an ordered `Chunk[]` on a message. Both the + * backend (agent + agent-manager) and the frontend store call this helper + * so the wire format stays in lockstep across the boundary. + * + * Open/close rules — see plan-chunk-refactor.md for the full table. + * + * | Chunk | Opens on | Coalesces | + * |---------------|-------------------------------------------------------|------------------------------------------------------------| + * | `text` | first `text-delta` after a non-text chunk | consecutive `text-delta` events append to `.text` | + * | `thinking` | first `reasoning-delta` after a non-thinking chunk | consecutive `reasoning-delta` events append to `.text` | + * | `tool-batch` | first `tool-call` after a non-tool-batch chunk | consecutive `tool-call` events push a new entry to `.calls`| + * | `error` | every `error` event | NEVER (always single-event) | + * | `system` | every `notice`/`model-changed`/`config-reload`/... | NEVER (two consecutive system events → two chunks) | + * + * Side-effect events (no new chunk): + * - `tool-result` → finds the call by `id` across all `tool-batch` chunks (most-recent first) + * and updates its `result` / `isError`. + * - `shell-output` → appends to the most recent entry of the most recent `tool-batch` chunk. + * + * Ignored events: + * - `status`, `done`, `task-list-update`, `tab-created`, `message-queued`, + * `message-consumed`, `message-cancelled` — these are control / lifecycle + * events, not message content. + */ + +import type { + AgentEvent, + ChatMessage, + Chunk, + MessageRole, + SystemChunk, + SystemChunkKind, + ToolBatchChunk, +} from "../types/index.js"; + +/** + * Mutates `chunks` in place based on `event`. + * + * Returns void; the array is the output channel. + */ +export function appendEventToChunks(chunks: Chunk[], event: AgentEvent): void { + switch (event.type) { + case "text-delta": { + // Open or extend the current text chunk. + const last = chunks[chunks.length - 1]; + if (last && last.type === "text") { + last.text += event.delta; + } else { + chunks.push({ type: "text", text: event.delta }); + } + return; + } + + case "reasoning-delta": { + // Open or extend the current thinking chunk. + const last = chunks[chunks.length - 1]; + if (last && last.type === "thinking") { + last.text += event.delta; + } else { + chunks.push({ type: "thinking", text: event.delta }); + } + return; + } + + case "tool-call": { + // Open or extend the current tool-batch chunk. + const last = chunks[chunks.length - 1]; + const entry = { + id: event.toolCall.id, + name: event.toolCall.name, + arguments: event.toolCall.arguments, + }; + if (last && last.type === "tool-batch") { + last.calls.push(entry); + } else { + chunks.push({ type: "tool-batch", calls: [entry] }); + } + return; + } + + case "tool-result": { + // Find the matching call (by id) across all tool-batch chunks, + // most-recent first. Tool results can arrive after subsequent + // text-deltas, so we cannot rely on the *last* chunk being the + // tool-batch — we have to search. + for (let i = chunks.length - 1; i >= 0; i--) { + const c = chunks[i]; + if (!c || c.type !== "tool-batch") continue; + const call = c.calls.find((e) => e.id === event.toolResult.toolCallId); + if (call) { + call.result = event.toolResult.result; + call.isError = event.toolResult.isError; + return; + } + } + // Orphan result with no matching call — drop silently. + return; + } + + case "shell-output": { + // Append to the most recent entry of the most recent tool-batch. + // Walk back through chunks to find the latest tool-batch; if there + // are intervening text/thinking/etc chunks (which can happen if + // the model streams text while a shell tool is still running), + // we still want the most recent tool-batch. + for (let i = chunks.length - 1; i >= 0; i--) { + const c = chunks[i]; + if (!c || c.type !== "tool-batch") continue; + const entry = c.calls[c.calls.length - 1]; + if (!entry) return; + const prev = entry.shellOutput ?? { stdout: "", stderr: "" }; + entry.shellOutput = { + stdout: prev.stdout + (event.stream === "stdout" ? event.data : ""), + stderr: prev.stderr + (event.stream === "stderr" ? event.data : ""), + }; + return; + } + // Orphan shell-output with no tool-batch in scope — drop silently. + return; + } + + case "error": { + // Always a fresh single-event chunk — no coalescing. + chunks.push({ + type: "error", + message: event.error, + ...(event.statusCode !== undefined ? { statusCode: event.statusCode } : {}), + }); + return; + } + + case "notice": { + chunks.push({ type: "system", kind: "notice", text: event.message }); + return; + } + + case "model-changed": { + chunks.push({ + type: "system", + kind: "model-changed", + text: `Switched to ${event.modelId} (${event.keyId})`, + }); + return; + } + + case "config-reload": { + chunks.push({ + type: "system", + kind: "config-reload", + text: "Configuration reloaded", + }); + return; + } + + // Lifecycle / control events — no chunk emitted. + case "status": + case "done": + case "task-list-update": + case "tab-created": + case "message-queued": + case "message-consumed": + case "message-cancelled": + return; + + default: { + // Exhaustiveness check — if a new event variant is added to + // AgentEvent, TypeScript will complain here. + const _exhaustive: never = event; + void _exhaustive; + return; + } + } +} + +// ─── System event routing across messages ──────────────────────── + +/** + * Minimal shape needed by `applySystemEvent`. + * + * The caller (agent-manager / persistence layer) typically tracks message + * id alongside the wire-format `ChatMessage`. This generic constraint + * lets us keep core `ChatMessage` clean while still letting downstream + * pass anything with an `id`. + */ +export interface IdentifiedMessage { + id: string; + role: MessageRole; + chunks: Chunk[]; +} + +/** + * Describes the system event in caller-controlled terms. We let the caller + * decide both the `kind` (so the same helper can record cancellations, + * notices, model swaps, etc.) and the `text` (so the caller controls + * formatting / localization). + */ +export interface SystemEventLike { + kind: SystemChunkKind; + text: string; +} + +/** + * Routes a system event to the right message when *no assistant turn is + * in flight*. (When a turn IS in flight, the caller should instead use + * `appendEventToChunks` against the in-flight message's chunks directly.) + * + * Routing rules (from plan-chunk-refactor.md): + * + * 1. Most recent message is `role: "system"` → append a `system` chunk + * to it. (Note: a second consecutive system event creates a second + * system chunk inside the same system message — chunks themselves + * never coalesce.) + * 2. Otherwise → create a fresh `role: "system"` message containing one + * `system` chunk and push it. + * + * Returns the `messageId` that was used (either the existing system + * message's id or the newly-created one) so the caller can persist / + * emit a diff to subscribers. + * + * `idFactory` defaults to `crypto.randomUUID()`; tests inject a + * deterministic factory. + */ +export function applySystemEvent<M extends IdentifiedMessage>( + messages: M[], + event: SystemEventLike, + idFactory: () => string = defaultIdFactory, +): { messageId: string } { + const chunk: SystemChunk = { type: "system", kind: event.kind, text: event.text }; + + const last = messages[messages.length - 1]; + if (last && last.role === "system") { + last.chunks.push(chunk); + return { messageId: last.id }; + } + + const id = idFactory(); + // We can't fabricate the full `M` shape without knowing its extra + // fields, but `IdentifiedMessage` is the minimum we need to push. + // Callers that extend the shape with extra fields are responsible for + // initializing them via post-hoc patching, or by passing in their own + // message-creation logic. In practice callers either: + // (a) use `ChatMessage` itself (no extra fields beyond IdentifiedMessage), or + // (b) construct messages and look them up by id after this call returns. + const newMessage = { id, role: "system" as const, chunks: [chunk] } as unknown as M; + messages.push(newMessage); + return { messageId: id }; +} + +function defaultIdFactory(): string { + // In Node 19+ / modern browsers, `crypto.randomUUID` is available globally. + if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") { + return crypto.randomUUID(); + } + // Fallback: pseudo-random; not cryptographically secure, but adequate for + // in-memory message identifiers when randomUUID is unavailable. + return `sysmsg-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}`; +} + +// ─── Re-exports for convenience ────────────────────────────────── + +export type { ChatMessage, Chunk, SystemChunk, SystemChunkKind, ToolBatchChunk }; diff --git a/packages/core/src/db/index.ts b/packages/core/src/db/index.ts index 81b474e..e63b266 100644 --- a/packages/core/src/db/index.ts +++ b/packages/core/src/db/index.ts @@ -99,7 +99,6 @@ export function getDatabase(): Database { seq INTEGER NOT NULL, role TEXT NOT NULL, content_json TEXT NOT NULL, - thinking TEXT, created_at INTEGER NOT NULL )`); diff --git a/packages/core/src/db/messages.ts b/packages/core/src/db/messages.ts index 80a1f22..34b69e5 100644 --- a/packages/core/src/db/messages.ts +++ b/packages/core/src/db/messages.ts @@ -1,21 +1,29 @@ +import type { Chunk, MessageRole } from "../types/index.js"; import { getDatabase } from "./index.js"; +/** + * A persisted message row, with `content_json` already parsed into a `Chunk[]`. + * Mirrors the new schema (no `thinking` column — that lived under the old + * `content + toolCalls + toolResults + thinking` model). + */ export interface MessageRow { id: string; tabId: string; seq: number; - role: string; - contentJson: string; - thinking: string | null; + role: MessageRole; + chunks: Chunk[]; createdAt: number; } +/** + * Append a new message to the tab. Caller passes the already-serialized + * chunk list as `contentJson` (i.e. `JSON.stringify(chunks)`). + */ export function appendMessage( tabId: string, id: string, - role: string, + role: MessageRole, contentJson: string, - thinking?: string, ): void { const db = getDatabase(); const maxSeq = db @@ -23,40 +31,58 @@ export function appendMessage( .get({ $tabId: tabId }) as { max_seq: number }; const seq = (maxSeq?.max_seq ?? -1) + 1; db.query( - `INSERT INTO messages (id, tab_id, seq, role, content_json, thinking, created_at) - VALUES ($id, $tabId, $seq, $role, $contentJson, $thinking, $now)`, + `INSERT INTO messages (id, tab_id, seq, role, content_json, created_at) + VALUES ($id, $tabId, $seq, $role, $contentJson, $now)`, ).run({ $id: id, $tabId: tabId, $seq: seq, $role: role, $contentJson: contentJson, - $thinking: thinking ?? null, $now: Date.now(), }); } -export function updateMessage(id: string, contentJson: string, thinking?: string): void { +/** + * Replace the persisted chunks for an existing message. `contentJson` is + * the already-serialized chunk list. + */ +export function updateMessage(id: string, contentJson: string): void { const db = getDatabase(); - db.query( - "UPDATE messages SET content_json = $contentJson, thinking = $thinking WHERE id = $id", - ).run({ $id: id, $contentJson: contentJson, $thinking: thinking ?? null }); + db.query("UPDATE messages SET content_json = $contentJson WHERE id = $id").run({ + $id: id, + $contentJson: contentJson, + }); } +/** + * Read all messages for a tab in seq order. `content_json` is parsed into + * `Chunk[]` here so callers don't have to. If a row's JSON is malformed, + * the message is returned with an empty chunk list rather than throwing. + */ export function getMessagesForTab(tabId: string): MessageRow[] { const db = getDatabase(); const rows = db .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq ASC") .all({ $tabId: tabId }) as Array<Record<string, unknown>>; - return rows.map((row) => ({ - id: row.id as string, - tabId: row.tab_id as string, - seq: row.seq as number, - role: row.role as string, - contentJson: row.content_json as string, - thinking: row.thinking as string | null, - createdAt: row.created_at as number, - })); + return rows.map((row) => { + const rawJson = row.content_json as string; + let chunks: Chunk[]; + try { + const parsed = JSON.parse(rawJson); + chunks = Array.isArray(parsed) ? (parsed as Chunk[]) : []; + } catch { + chunks = []; + } + return { + id: row.id as string, + tabId: row.tab_id as string, + seq: row.seq as number, + role: row.role as MessageRole, + chunks, + createdAt: row.created_at as number, + }; + }); } export function clearMessagesForTab(tabId: string): void { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a68bfb9..1453a01 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -3,6 +3,13 @@ // Agent & LLM export { Agent } from "./agent/agent.js"; export { deleteAgent, getAgentDirs, loadAgents, saveAgent } from "./agents/index.js"; +// Chunk helpers +export { + appendEventToChunks, + applySystemEvent, + type IdentifiedMessage, + type SystemEventLike, +} from "./chunks/append.js"; // Config export { configToRuleset, diff --git a/packages/core/src/tools/list-files.ts b/packages/core/src/tools/list-files.ts index bf21046..e003099 100644 --- a/packages/core/src/tools/list-files.ts +++ b/packages/core/src/tools/list-files.ts @@ -20,10 +20,7 @@ export function createListFilesTool(workingDirectory: string): ToolDefinition { const absolutePath = await canonicalize(workingDirectory, relPath); const absoluteWorkDir = await canonicalize(workingDirectory); - if ( - absolutePath !== absoluteWorkDir && - !absolutePath.startsWith(`${absoluteWorkDir}/`) - ) { + if (absolutePath !== absoluteWorkDir && !absolutePath.startsWith(`${absoluteWorkDir}/`)) { return `Error: Path "${relPath}" is outside the working directory.`; } diff --git a/packages/core/src/tools/write-file.ts b/packages/core/src/tools/write-file.ts index 763b083..aa69c86 100644 --- a/packages/core/src/tools/write-file.ts +++ b/packages/core/src/tools/write-file.ts @@ -24,10 +24,7 @@ export function createWriteFileTool(workingDirectory: string): ToolDefinition { const absolutePath = await canonicalize(workingDirectory, filePath); const absoluteWorkDir = await canonicalize(workingDirectory); - if ( - absolutePath !== absoluteWorkDir && - !absolutePath.startsWith(`${absoluteWorkDir}/`) - ) { + if (absolutePath !== absoluteWorkDir && !absolutePath.startsWith(`${absoluteWorkDir}/`)) { return `Error: Path "${filePath}" is outside the working directory.`; } diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index 65576cf..8985bd9 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -3,13 +3,62 @@ import type { PermissionChecker, Ruleset } from "../permission/index.js"; // ─── Message Types ─────────────────────────────────────────────── -export type MessageRole = "user" | "assistant" | "tool"; +export type MessageRole = "user" | "assistant" | "system"; + +/** + * A single ordered chunk of content inside a message. The chunk list + * preserves the actual temporal ordering of text, reasoning, tool calls, + * system notices, and errors as they arrived from the model. + * + * Coalescing rules (see plan-chunk-refactor.md): + * - `text` and `thinking` coalesce on consecutive same-type deltas. + * - `tool-batch` coalesces on consecutive `tool-call` events + * (appends a new entry to `calls`). + * - `error` and `system` are always single-event chunks (no coalescing). + */ +export type Chunk = TextChunk | ThinkingChunk | ToolBatchChunk | ErrorChunk | SystemChunk; + +export interface TextChunk { + type: "text"; + text: string; +} + +export interface ThinkingChunk { + type: "thinking"; + text: string; +} + +export interface ToolBatchChunk { + type: "tool-batch"; + calls: ToolBatchEntry[]; +} + +export interface ToolBatchEntry { + id: string; + name: string; + arguments: Record<string, unknown>; + result?: string; + isError?: boolean; + shellOutput?: { stdout: string; stderr: string }; +} + +export interface ErrorChunk { + type: "error"; + message: string; + statusCode?: number; +} + +export type SystemChunkKind = "notice" | "model-changed" | "config-reload" | "cancelled"; + +export interface SystemChunk { + type: "system"; + kind: SystemChunkKind; + text: string; +} export interface ChatMessage { role: MessageRole; - content: string; - toolCalls?: ToolCall[]; - toolResults?: ToolResult[]; + chunks: Chunk[]; } export interface ToolCall { diff --git a/packages/core/tests/agent/agent.test.ts b/packages/core/tests/agent/agent.test.ts index be5272f..3d11fc5 100644 --- a/packages/core/tests/agent/agent.test.ts +++ b/packages/core/tests/agent/agent.test.ts @@ -149,11 +149,11 @@ describe("Agent", () => { expect(agent.messages).toHaveLength(2); expect(agent.messages[0]).toMatchObject({ role: "user", - content: "my question", + chunks: [{ type: "text", text: "my question" }], }); expect(agent.messages[1]).toMatchObject({ role: "assistant", - content: "Response", + chunks: [{ type: "text", text: "Response" }], }); }); @@ -182,7 +182,7 @@ describe("Agent", () => { expect(doneEvent).toBeDefined(); expect(doneEvent).toMatchObject({ type: "done", - message: { role: "assistant", content: "Done!" }, + message: { role: "assistant", chunks: [{ type: "text", text: "Done!" }] }, }); }); diff --git a/packages/core/tests/chunks/append.test.ts b/packages/core/tests/chunks/append.test.ts new file mode 100644 index 0000000..c8917c9 --- /dev/null +++ b/packages/core/tests/chunks/append.test.ts @@ -0,0 +1,437 @@ +import { describe, expect, it } from "vitest"; +import { appendEventToChunks, applySystemEvent } from "../../src/chunks/append.js"; +import type { AgentEvent, ChatMessage, Chunk } from "../../src/types/index.js"; + +// ─── helpers ───────────────────────────────────────────────────── + +const td = (delta: string): AgentEvent => ({ type: "text-delta", delta }); +const rd = (delta: string): AgentEvent => ({ type: "reasoning-delta", delta }); +const tc = (id: string, name = "fake_tool", args: Record<string, unknown> = {}): AgentEvent => ({ + type: "tool-call", + toolCall: { id, name, arguments: args }, +}); +const tr = (toolCallId: string, result: string, isError = false): AgentEvent => ({ + type: "tool-result", + toolResult: { toolCallId, toolName: "fake_tool", result, isError }, +}); +const so = (data: string, stream: "stdout" | "stderr" = "stdout"): AgentEvent => ({ + type: "shell-output", + data, + stream, +}); +const err = (error: string, statusCode?: number): AgentEvent => ({ + type: "error", + error, + ...(statusCode !== undefined ? { statusCode } : {}), +}); +const notice = (message: string): AgentEvent => ({ type: "notice", message }); +const modelChanged = (keyId: string, modelId: string): AgentEvent => ({ + type: "model-changed", + keyId, + modelId, +}); +const configReload: AgentEvent = { type: "config-reload" }; + +function run(events: AgentEvent[]): Chunk[] { + const chunks: Chunk[] = []; + for (const e of events) appendEventToChunks(chunks, e); + return chunks; +} + +// ─── Required cases from the plan ──────────────────────────────── + +describe("appendEventToChunks — required cases from plan", () => { + it("empty chunks + text-delta → one text chunk with the delta", () => { + const chunks = run([td("Hello")]); + expect(chunks).toEqual([{ type: "text", text: "Hello" }]); + }); + + it("two consecutive text-deltas → one text chunk with concatenated text", () => { + const chunks = run([td("Hello, "), td("world!")]); + expect(chunks).toEqual([{ type: "text", text: "Hello, world!" }]); + }); + + it("text-delta then reasoning-delta → two chunks (text, thinking)", () => { + const chunks = run([td("ans: 42"), rd("I should explain")]); + expect(chunks).toEqual([ + { type: "text", text: "ans: 42" }, + { type: "thinking", text: "I should explain" }, + ]); + }); + + it("text-delta then tool-call → two chunks (text, tool-batch with one entry)", () => { + const chunks = run([td("Looking..."), tc("t1", "read_file", { path: "x" })]); + expect(chunks).toEqual([ + { type: "text", text: "Looking..." }, + { + type: "tool-batch", + calls: [{ id: "t1", name: "read_file", arguments: { path: "x" } }], + }, + ]); + }); + + it("two consecutive tool-calls → one tool-batch with two entries", () => { + const chunks = run([tc("t1", "read_file"), tc("t2", "list_files")]); + expect(chunks).toHaveLength(1); + expect(chunks[0]).toMatchObject({ + type: "tool-batch", + calls: [ + { id: "t1", name: "read_file" }, + { id: "t2", name: "list_files" }, + ], + }); + }); + + it("tool-call then tool-call then text → two chunks (tool-batch with 2 entries, text)", () => { + const chunks = run([tc("t1"), tc("t2"), td("done")]); + expect(chunks).toHaveLength(2); + expect(chunks[0]).toMatchObject({ + type: "tool-batch", + calls: [{ id: "t1" }, { id: "t2" }], + }); + expect(chunks[1]).toEqual({ type: "text", text: "done" }); + }); + + it("tool-result arrives → updates matching tool-call entry in the latest tool-batch chunk by id", () => { + const chunks = run([ + tc("t1"), + tc("t2"), + tr("t1", "first-result"), + tr("t2", "second-result", true), + ]); + expect(chunks).toHaveLength(1); + const batch = chunks[0]; + expect(batch.type).toBe("tool-batch"); + if (batch.type !== "tool-batch") throw new Error("type guard"); + expect(batch.calls[0]).toMatchObject({ id: "t1", result: "first-result", isError: false }); + expect(batch.calls[1]).toMatchObject({ id: "t2", result: "second-result", isError: true }); + }); + + it("shell-output arrives → appends to the most recent tool-call's shellOutput", () => { + const chunks = run([ + tc("t1", "run_shell"), + so("hello\n", "stdout"), + so("world\n", "stdout"), + so("err!\n", "stderr"), + ]); + expect(chunks).toHaveLength(1); + const batch = chunks[0]; + if (batch.type !== "tool-batch") throw new Error("type guard"); + expect(batch.calls[0]?.shellOutput).toEqual({ + stdout: "hello\nworld\n", + stderr: "err!\n", + }); + }); + + it("error event → opens an error chunk; subsequent events go to new chunks", () => { + const chunks = run([td("partial..."), err("network failed", 503), td("recovery")]); + expect(chunks).toEqual([ + { type: "text", text: "partial..." }, + { type: "error", message: "network failed", statusCode: 503 }, + { type: "text", text: "recovery" }, + ]); + }); + + it("system event during text run → closes text, opens system, would re-open text on next text-delta", () => { + const chunks = run([td("first "), notice("model swap"), td("second")]); + expect(chunks).toEqual([ + { type: "text", text: "first " }, + { type: "system", kind: "notice", text: "model swap" }, + { type: "text", text: "second" }, + ]); + }); + + it("two consecutive system events → two separate system chunks (no coalescing)", () => { + const chunks = run([notice("a"), notice("b")]); + expect(chunks).toEqual([ + { type: "system", kind: "notice", text: "a" }, + { type: "system", kind: "notice", text: "b" }, + ]); + }); + + it("interleaved think → text → think → tool → think → text → 6 chunks in order", () => { + const chunks = run([ + rd("planning..."), + td("here goes:"), + rd("hmm, actually"), + tc("t1", "read_file"), + rd("ok now"), + td("and so..."), + ]); + expect(chunks.map((c) => c.type)).toEqual([ + "thinking", + "text", + "thinking", + "tool-batch", + "thinking", + "text", + ]); + expect(chunks[0]).toEqual({ type: "thinking", text: "planning..." }); + expect(chunks[1]).toEqual({ type: "text", text: "here goes:" }); + expect(chunks[2]).toEqual({ type: "thinking", text: "hmm, actually" }); + expect(chunks[3]).toMatchObject({ type: "tool-batch", calls: [{ id: "t1" }] }); + expect(chunks[4]).toEqual({ type: "thinking", text: "ok now" }); + expect(chunks[5]).toEqual({ type: "text", text: "and so..." }); + }); +}); + +// ─── Additional transition coverage ────────────────────────────── + +describe("appendEventToChunks — transition matrix", () => { + it("thinking → thinking coalesces", () => { + const chunks = run([rd("a"), rd("b")]); + expect(chunks).toEqual([{ type: "thinking", text: "ab" }]); + }); + + it("thinking → text opens a new text chunk", () => { + const chunks = run([rd("think"), td("speak")]); + expect(chunks).toEqual([ + { type: "thinking", text: "think" }, + { type: "text", text: "speak" }, + ]); + }); + + it("tool-batch → text opens a new text chunk", () => { + const chunks = run([tc("t1"), td("after tool")]); + expect(chunks).toHaveLength(2); + expect(chunks[1]).toEqual({ type: "text", text: "after tool" }); + }); + + it("text → reasoning-delta after a multi-delta text run still splits cleanly", () => { + const chunks = run([td("a"), td("b"), rd("x"), rd("y"), td("c")]); + expect(chunks).toEqual([ + { type: "text", text: "ab" }, + { type: "thinking", text: "xy" }, + { type: "text", text: "c" }, + ]); + }); + + it("error → text opens a fresh text chunk after the error", () => { + const chunks = run([err("boom"), td("recovered")]); + expect(chunks).toEqual([ + { type: "error", message: "boom" }, + { type: "text", text: "recovered" }, + ]); + }); + + it("two consecutive errors stay as two error chunks (no coalescing)", () => { + const chunks = run([err("first"), err("second", 429)]); + expect(chunks).toEqual([ + { type: "error", message: "first" }, + { type: "error", message: "second", statusCode: 429 }, + ]); + }); + + it("system → tool-call opens a new tool-batch (does not extend the system chunk)", () => { + const chunks = run([notice("info"), tc("t1")]); + expect(chunks).toHaveLength(2); + expect(chunks[1]).toMatchObject({ type: "tool-batch", calls: [{ id: "t1" }] }); + }); + + it("tool-result with no matching call is silently dropped", () => { + const chunks = run([td("hi"), tr("no-such-id", "ignored")]); + expect(chunks).toEqual([{ type: "text", text: "hi" }]); + }); + + it("shell-output with no tool-batch in scope is silently dropped", () => { + const chunks = run([td("hi"), so("orphan")]); + expect(chunks).toEqual([{ type: "text", text: "hi" }]); + }); + + it("tool-result for an earlier batch still updates the right call (results can arrive late)", () => { + // Order: tc -> td -> tc(new batch) -> tr(for first batch's id) + const chunks = run([ + tc("t1", "read_file"), + td("midstream text"), + tc("t2", "list_files"), + tr("t1", "late result for first"), + ]); + // Two tool-batches, separated by the text chunk. The result must land + // inside the FIRST batch (the one containing t1), not the most-recent. + expect(chunks.map((c) => c.type)).toEqual(["tool-batch", "text", "tool-batch"]); + const first = chunks[0]; + if (first?.type !== "tool-batch") throw new Error("type guard"); + expect(first.calls[0]).toMatchObject({ id: "t1", result: "late result for first" }); + const second = chunks[2]; + if (second?.type !== "tool-batch") throw new Error("type guard"); + // t2 in the second batch has no result yet. + expect(second.calls[0]?.result).toBeUndefined(); + }); + + it("shell-output goes to the most recent tool-batch's most recent entry, even with intervening chunks", () => { + // First batch's tool runs, emits output, then later a second batch starts and emits output. + const chunks = run([ + tc("t1", "run_shell"), + so("first-stdout\n"), + td("interlude"), + tc("t2", "run_shell"), + so("second-stdout\n"), + ]); + expect(chunks.map((c) => c.type)).toEqual(["tool-batch", "text", "tool-batch"]); + const first = chunks[0]; + const second = chunks[2]; + if (first?.type !== "tool-batch" || second?.type !== "tool-batch") { + throw new Error("type guard"); + } + expect(first.calls[0]?.shellOutput).toEqual({ stdout: "first-stdout\n", stderr: "" }); + expect(second.calls[0]?.shellOutput).toEqual({ stdout: "second-stdout\n", stderr: "" }); + }); + + it("model-changed event opens a system chunk with kind=model-changed", () => { + const chunks = run([modelChanged("anthropic-1", "claude-sonnet-4")]); + expect(chunks).toEqual([ + { + type: "system", + kind: "model-changed", + text: "Switched to claude-sonnet-4 (anthropic-1)", + }, + ]); + }); + + it("config-reload event opens a system chunk with kind=config-reload", () => { + const chunks = run([configReload]); + expect(chunks).toEqual([ + { type: "system", kind: "config-reload", text: "Configuration reloaded" }, + ]); + }); + + it("non-content events (status / done / task-list-update / message-queued etc.) are no-ops", () => { + const chunks = run([ + td("hello"), + { type: "status", status: "running" }, + { type: "task-list-update", tasks: [] }, + { + type: "tab-created", + id: "tab1", + title: "x", + keyId: null, + modelId: null, + parentTabId: null, + workingDirectory: null, + }, + { type: "message-queued", tabId: "t", messageId: "m", message: "queued" }, + { type: "message-consumed", tabId: "t", messageIds: ["m"] }, + { type: "message-cancelled", tabId: "t", messageId: "m" }, + { + type: "done", + message: { role: "assistant", chunks: [] }, + }, + td(" world"), + ]); + expect(chunks).toEqual([{ type: "text", text: "hello world" }]); + }); + + it("error chunk omits statusCode when not provided", () => { + const chunks = run([err("boom")]); + expect(chunks).toEqual([{ type: "error", message: "boom" }]); + // And no stray statusCode key: + expect(Object.hasOwn(chunks[0], "statusCode")).toBe(false); + }); + + it("tool-result updates isError=false correctly (default success path)", () => { + const chunks = run([tc("t1"), tr("t1", "ok", false)]); + const batch = chunks[0]; + if (batch?.type !== "tool-batch") throw new Error("type guard"); + expect(batch.calls[0]).toMatchObject({ result: "ok", isError: false }); + }); +}); + +// ─── applySystemEvent routing ──────────────────────────────────── + +describe("applySystemEvent", () => { + type Msg = { id: string; role: "user" | "assistant" | "system"; chunks: Chunk[] }; + + let counter = 0; + const idFactory = () => `gen-${++counter}`; + + it("creates a new role:system message when message list is empty", () => { + counter = 0; + const messages: Msg[] = []; + const result = applySystemEvent(messages, { kind: "notice", text: "hello" }, idFactory); + expect(result.messageId).toBe("gen-1"); + expect(messages).toEqual([ + { + id: "gen-1", + role: "system", + chunks: [{ type: "system", kind: "notice", text: "hello" }], + }, + ]); + }); + + it("creates a new role:system message when last message is user", () => { + counter = 0; + const messages: Msg[] = [{ id: "u1", role: "user", chunks: [{ type: "text", text: "hi" }] }]; + const result = applySystemEvent(messages, { kind: "model-changed", text: "swap" }, idFactory); + expect(result.messageId).toBe("gen-1"); + expect(messages).toHaveLength(2); + expect(messages[1]).toMatchObject({ + id: "gen-1", + role: "system", + chunks: [{ type: "system", kind: "model-changed", text: "swap" }], + }); + }); + + it("creates a new role:system message when last message is assistant", () => { + counter = 0; + const messages: Msg[] = [ + { id: "a1", role: "assistant", chunks: [{ type: "text", text: "done" }] }, + ]; + applySystemEvent(messages, { kind: "config-reload", text: "reloaded" }, idFactory); + expect(messages).toHaveLength(2); + expect(messages[1]?.role).toBe("system"); + }); + + it("appends a chunk to the existing system message when last message is role:system", () => { + counter = 0; + const messages: Msg[] = [ + { + id: "s1", + role: "system", + chunks: [{ type: "system", kind: "notice", text: "first" }], + }, + ]; + const result = applySystemEvent(messages, { kind: "notice", text: "second" }, idFactory); + expect(result.messageId).toBe("s1"); + expect(messages).toHaveLength(1); + expect(messages[0]?.chunks).toEqual([ + { type: "system", kind: "notice", text: "first" }, + { type: "system", kind: "notice", text: "second" }, + ]); + }); + + it("multiple consecutive calls accumulate in the same system message", () => { + counter = 0; + const messages: Msg[] = [{ id: "u1", role: "user", chunks: [{ type: "text", text: "hi" }] }]; + applySystemEvent(messages, { kind: "notice", text: "a" }, idFactory); + applySystemEvent(messages, { kind: "notice", text: "b" }, idFactory); + applySystemEvent(messages, { kind: "model-changed", text: "c" }, idFactory); + expect(messages).toHaveLength(2); + const sys = messages[1]; + expect(sys?.role).toBe("system"); + expect(sys?.chunks).toEqual([ + { type: "system", kind: "notice", text: "a" }, + { type: "system", kind: "notice", text: "b" }, + { type: "system", kind: "model-changed", text: "c" }, + ]); + }); + + it("returns the same messageId across appends to the same system message", () => { + counter = 0; + const messages: Msg[] = []; + const first = applySystemEvent(messages, { kind: "notice", text: "a" }, idFactory); + const second = applySystemEvent(messages, { kind: "notice", text: "b" }, idFactory); + expect(first.messageId).toBe(second.messageId); + }); + + it("works against the core ChatMessage shape (with id added by caller)", () => { + // Sanity: ChatMessage has {role, chunks}; the caller layers id on top. + // This test exists to prove the generic constraint doesn't reject the + // real persistence/in-memory shape we'll see in Phase 5. + counter = 0; + const messages: Array<ChatMessage & { id: string }> = []; + const result = applySystemEvent(messages, { kind: "cancelled", text: "user stop" }, idFactory); + expect(result.messageId).toBe("gen-1"); + expect(messages[0]?.role).toBe("system"); + expect(messages[0]?.chunks[0]).toMatchObject({ kind: "cancelled", text: "user stop" }); + }); +}); diff --git a/packages/core/tests/tools/write-file.test.ts b/packages/core/tests/tools/write-file.test.ts index 6853a8e..f071e12 100644 --- a/packages/core/tests/tools/write-file.test.ts +++ b/packages/core/tests/tools/write-file.test.ts @@ -1,4 +1,4 @@ -import { access, mkdtemp, readFile, readdir, rm, symlink } from "node:fs/promises"; +import { access, mkdtemp, readdir, readFile, rm, symlink } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; diff --git a/packages/frontend/package.json b/packages/frontend/package.json index 5228e4a..453df07 100644 --- a/packages/frontend/package.json +++ b/packages/frontend/package.json @@ -31,6 +31,7 @@ } }, "dependencies": { + "@dispatch/core": "workspace:*", "dompurify": "^3.4.5", "highlight.js": "^11.11.1", "marked": "^18.0.4", diff --git a/packages/frontend/src/lib/components/ChatMessage.svelte b/packages/frontend/src/lib/components/ChatMessage.svelte index c6b6034..0c85349 100644 --- a/packages/frontend/src/lib/components/ChatMessage.svelte +++ b/packages/frontend/src/lib/components/ChatMessage.svelte @@ -1,7 +1,7 @@ <script lang="ts"> import { appSettings } from "../settings.svelte.js"; import { tabStore } from "../tabs.svelte.js"; -import type { ChatMessage } from "../types.js"; +import type { ChatMessage, Chunk, SystemChunkKind } from "../types.js"; import MarkdownRenderer from "./MarkdownRenderer.svelte"; import ToolCallDisplay from "./ToolCallDisplay.svelte"; @@ -21,37 +21,67 @@ function cancelQueued() { void tabStore.cancelQueuedMessage(tabId, queuedMessageId); } } + +function chunkKey(chunk: Chunk, i: number): string { + if (chunk.type === "tool-batch") { + // Stable-ish: first call id + count keeps re-renders sane while streaming. + return `tb-${chunk.calls[0]?.id ?? i}-${chunk.calls.length}`; + } + return `${chunk.type}-${i}`; +} + +const SYSTEM_KIND_LABEL: Record<SystemChunkKind, string> = { + notice: "Notice", + "model-changed": "Model changed", + "config-reload": "Config reload", + cancelled: "Cancelled", +}; </script> -{#if isSystem} - <div class="flex justify-center my-2"> - <div class="badge badge-ghost gap-1 text-xs opacity-60"> - {#each message.content as segment} - {#if segment.type === "text"} - {segment.text} - {/if} - {/each} - </div> - </div> -{:else} -<div class="chat chat-start mb-2 [&>.chat-bubble]:max-w-full {isQueued ? 'opacity-60' : ''}"> - <div class="chat-bubble break-words {isUser ? 'chat-bubble-primary w-fit' : 'bg-transparent w-full'}"> - {#if message.thinking} +{#snippet renderChunks(chunks: Chunk[], streaming: boolean | undefined)} + {#each chunks as chunk, i (chunkKey(chunk, i))} + {#if chunk.type === "text"} + <MarkdownRenderer text={chunk.text} {streaming} /> + {:else if chunk.type === "thinking"} <div class="collapse collapse-arrow mb-2 p-1"> <input type="checkbox" checked={appSettings.autoExpandThinking} /> <div class="collapse-title text-sm opacity-60 italic py-0 pl-0 pr-8 min-h-0">Thinking...</div> <div class="collapse-content text-sm opacity-60 italic p-0"> - <p class="whitespace-pre-wrap mt-1">{message.thinking}</p> + <p class="whitespace-pre-wrap mt-1">{chunk.text}</p> </div> </div> + {:else if chunk.type === "tool-batch"} + {#each chunk.calls as call (call.id)} + <ToolCallDisplay toolCall={call} /> + {/each} + {:else if chunk.type === "error"} + <div class="alert alert-error my-2 py-2 px-3 text-sm rounded border border-error/60 bg-error/10 text-error"> + <div class="flex flex-col gap-0.5 w-full"> + <span class="break-words">{chunk.message}</span> + {#if chunk.statusCode !== undefined} + <span class="text-xs opacity-70">status {chunk.statusCode}</span> + {/if} + </div> + </div> + {:else if chunk.type === "system"} + <div class="my-1 text-xs italic opacity-50 flex gap-1 items-baseline"> + <span class="font-semibold not-italic">{SYSTEM_KIND_LABEL[chunk.kind]}:</span> + <span class="break-words">{chunk.text}</span> + </div> {/if} - {#each message.content as segment, i (segment.type === "tool-call" ? segment.id : i)} - {#if segment.type === "text"} - <MarkdownRenderer text={segment.text} streaming={message.isStreaming} /> - {:else if segment.type === "tool-call"} - <ToolCallDisplay toolCall={segment} /> - {/if} - {/each} + {/each} +{/snippet} + +{#if isSystem} + <div class="flex justify-center my-2 px-4"> + <div class="max-w-full text-center"> + {@render renderChunks(message.chunks, false)} + </div> + </div> +{:else} +<div class="chat chat-start mb-2 [&>.chat-bubble]:max-w-full {isQueued ? 'opacity-60' : ''}"> + <div class="chat-bubble break-words {isUser ? 'chat-bubble-primary w-fit' : 'bg-transparent w-full'}"> + {@render renderChunks(message.chunks, message.isStreaming)} {#if message.isStreaming} <span class="inline-block w-1.5 h-4 bg-current animate-pulse ml-0.5 align-middle rounded-sm"></span> {/if} diff --git a/packages/frontend/src/lib/components/ToolCallDisplay.svelte b/packages/frontend/src/lib/components/ToolCallDisplay.svelte index 7c7aef6..1b4ebca 100644 --- a/packages/frontend/src/lib/components/ToolCallDisplay.svelte +++ b/packages/frontend/src/lib/components/ToolCallDisplay.svelte @@ -1,8 +1,8 @@ <script lang="ts"> import { tabStore } from "../tabs.svelte.js"; -import type { ToolCallDisplay } from "../types.js"; +import type { ToolBatchEntry } from "../types.js"; -const { toolCall }: { toolCall: ToolCallDisplay } = $props(); +const { toolCall }: { toolCall: ToolBatchEntry } = $props(); let isExpanded = $state(false); diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts index 5d3ab63..e08c02f 100644 --- a/packages/frontend/src/lib/tabs.svelte.ts +++ b/packages/frontend/src/lib/tabs.svelte.ts @@ -1,9 +1,19 @@ +// Import the chunk-builder helpers directly from core so the frontend store +// and the backend agent share the exact same wire-format logic. Deep import +// is intentional: the core barrel pulls in node-only deps (chokidar, etc.) +// that don't belong in the browser bundle. +import { + appendEventToChunks, + applySystemEvent, + type IdentifiedMessage, + type SystemEventLike, +} from "@dispatch/core/src/chunks/append.js"; import { config } from "./config.js"; import { appSettings } from "./settings.svelte.js"; import type { AgentEvent, ChatMessage, - ContentSegment, + Chunk, DebugInfo, LogEntry, PermissionPrompt, @@ -177,7 +187,6 @@ function createTabStore() { id?: string; role: string; contentJson: string; - thinking: string | null; }>; }) : { messages: [] }; @@ -188,8 +197,7 @@ function createTabStore() { { id: m.id ?? generateId(), role: m.role as ChatMessage["role"], - content: JSON.parse(m.contentJson) as ContentSegment[], - thinking: m.thinking ?? undefined, + chunks: JSON.parse(m.contentJson) as Chunk[], isStreaming: false, }, ]; @@ -268,8 +276,7 @@ function createTabStore() { const newMsg: ChatMessage = { id, role: "assistant", - content: [], - thinking: "", + chunks: [], isStreaming: true, }; updateTab(tabId, { @@ -285,6 +292,75 @@ function createTabStore() { updateTab(tabId, { messages: updater(tab.messages) }); } + /** + * Apply a content-producing event to the in-flight assistant message via the + * shared core helper. + * + * Reactivity contract: `appendEventToChunks` mutates the chunks array in + * place, but Svelte 5 `$state` only triggers updates when we reassign at the + * `tabs` array level. We deep-clone the message's chunks via + * `structuredClone`, mutate the clone, then write it back through + * `updateMessages` — which rebuilds the parent arrays. This is the same + * pattern the old per-event handlers used (shallow copy + new arrays), just + * centralized through the core helper. + */ + function applyChunkEvent(tabId: string, event: AgentEvent): void { + ensureAssistantMessage(tabId); + const tab = getTabById(tabId); + if (!tab) return; + const currentId = tab.currentAssistantId; + if (!currentId) return; + updateMessages(tabId, (msgs) => + msgs.map((m) => { + if (m.id !== currentId) return m; + const cloned = structuredClone(m.chunks); + // The frontend's local AgentEvent is structurally compatible with + // core's for every variant the helper cares about; the variants + // where shapes differ (tab-created, done, status, message-*) are + // all in the helper's no-op branch. + appendEventToChunks(cloned, event as unknown as Parameters<typeof appendEventToChunks>[1]); + return { ...m, chunks: cloned, isStreaming: true }; + }), + ); + } + + /** + * Route a system event when there's no in-flight assistant turn. Wraps + * `applySystemEvent` from core, which either appends a `system` chunk to + * the most recent `role: "system"` message or creates a new one. + */ + function routeSystemEvent(tabId: string, sysEvent: SystemEventLike): void { + const tab = getTabById(tabId); + if (!tab) return; + // We need to mutate the messages array (applySystemEvent does in-place + // push). Build a shallow-cloned IdentifiedMessage[] view, run the + // helper, then write it back. We construct fresh ChatMessage objects + // for any newly-pushed messages by reading the resulting view. + const view: IdentifiedMessage[] = tab.messages.map((m) => ({ + id: m.id, + role: m.role, + chunks: structuredClone(m.chunks), + })); + applySystemEvent(view, sysEvent, generateId); + + // Reconcile: rebuild the ChatMessage array from the view, preserving + // existing message metadata (isStreaming, debugInfo) where IDs match. + const byId = new Map(tab.messages.map((m) => [m.id, m])); + const rebuilt: ChatMessage[] = view.map((v) => { + const existing = byId.get(v.id); + if (existing) { + return { ...existing, role: v.role, chunks: v.chunks as Chunk[] }; + } + return { + id: v.id, + role: v.role, + chunks: v.chunks as Chunk[], + isStreaming: false, + }; + }); + updateTab(tabId, { messages: rebuilt }); + } + function handleEvent(event: AgentEvent & { tabId?: string }): void { const tabId = event.tabId; @@ -302,84 +378,13 @@ function createTabStore() { } break; } - case "reasoning-delta": { - if (!tabId) break; - ensureAssistantMessage(tabId); - const tab = getTabById(tabId); - if (!tab) break; - updateMessages(tabId, (msgs) => - msgs.map((m) => - m.id === tab.currentAssistantId - ? { ...m, thinking: (m.thinking ?? "") + event.delta } - : m, - ), - ); - break; - } - case "text-delta": { - if (!tabId) break; - ensureAssistantMessage(tabId); - const tab2 = getTabById(tabId); - if (!tab2) break; - updateMessages(tabId, (msgs) => - msgs.map((m) => { - if (m.id !== tab2.currentAssistantId) return m; - const segments = [...m.content]; - const last = segments[segments.length - 1]; - if (last && last.type === "text") { - segments[segments.length - 1] = { ...last, text: last.text + event.delta }; - } else { - segments.push({ type: "text", text: event.delta }); - } - return { ...m, content: segments, isStreaming: true }; - }), - ); - break; - } - case "tool-call": { + case "reasoning-delta": + case "text-delta": + case "tool-call": + case "tool-result": + case "shell-output": { if (!tabId) break; - ensureAssistantMessage(tabId); - const tab3 = getTabById(tabId); - if (!tab3) break; - updateMessages(tabId, (msgs) => - msgs.map((m) => { - if (m.id !== tab3.currentAssistantId) return m; - const segments: ContentSegment[] = [ - ...m.content, - { - type: "tool-call", - id: event.toolCall.id, - name: event.toolCall.name, - arguments: event.toolCall.arguments, - }, - ]; - return { ...m, content: segments }; - }), - ); - break; - } - case "tool-result": { - if (!tabId) break; - const tab4 = getTabById(tabId); - if (!tab4) break; - updateMessages(tabId, (msgs) => - msgs.map((m) => { - if (m.id !== tab4.currentAssistantId) return m; - return { - ...m, - content: m.content.map((seg) => { - if (seg.type === "tool-call" && seg.id === event.toolResult.toolCallId) { - return { - ...seg, - result: event.toolResult.result, - isError: event.toolResult.isError, - }; - } - return seg; - }), - }; - }), - ); + applyChunkEvent(tabId, event); break; } case "done": { @@ -393,49 +398,77 @@ function createTabStore() { break; } case "error": { - if (tabId) { - const errMsg: ChatMessage = { - id: generateId(), - role: "assistant", - content: [{ type: "text", text: `Error: ${event.error}` }], - isStreaming: false, - debugInfo: makeDebugInfo({ error: event.error }), - }; - const tab6 = getTabById(tabId); - if (tab6) { - updateTab(tabId, { - messages: [...tab6.messages, errMsg], - currentAssistantId: null, - agentStatus: "error", - }); + if (!tabId) break; + const errTab = getTabById(tabId); + if (!errTab) break; + if (errTab.currentAssistantId) { + // In-flight turn: append the error as a chunk on the + // assistant message via the shared helper. Mark debug info + // on the message for parity with the previous behavior. + applyChunkEvent(tabId, event); + updateMessages(tabId, (msgs) => + msgs.map((m) => + m.id === errTab.currentAssistantId + ? { + ...m, + isStreaming: false, + debugInfo: makeDebugInfo({ error: event.error }), + } + : m, + ), + ); + } else { + // No turn in flight: open a new assistant message holding + // only the error chunk. We do this by ensuring an assistant + // message then funneling through applyChunkEvent, which + // guarantees the chunk shape matches the helper's output. + ensureAssistantMessage(tabId); + applyChunkEvent(tabId, event); + const afterTab = getTabById(tabId); + if (afterTab?.currentAssistantId) { + const newId = afterTab.currentAssistantId; + updateMessages(tabId, (msgs) => + msgs.map((m) => + m.id === newId + ? { + ...m, + isStreaming: false, + debugInfo: makeDebugInfo({ error: event.error }), + } + : m, + ), + ); } } + updateTab(tabId, { currentAssistantId: null, agentStatus: "error" }); break; } case "notice": { - if (tabId) { - const noticeMsg: ChatMessage = { - id: generateId(), - role: "assistant", - content: [{ type: "text", text: event.message }], - isStreaming: false, - debugInfo: makeDebugInfo({ notice: event.message }), - }; - const tabN = getTabById(tabId); - if (tabN) { - updateTab(tabId, { - messages: [...tabN.messages, noticeMsg], - currentAssistantId: null, - }); - } + if (!tabId) break; + const noticeTab = getTabById(tabId); + if (!noticeTab) break; + if (noticeTab.currentAssistantId) { + applyChunkEvent(tabId, event); + } else { + routeSystemEvent(tabId, { kind: "notice", text: event.message }); } break; } case "model-changed": { - if (tabId) { - updateTab(tabId, { - keyId: event.keyId, - modelId: event.modelId, + if (!tabId) break; + const mcTab2 = getTabById(tabId); + if (!mcTab2) break; + // Always update the tab's active key/model. Additionally emit + // a `system` chunk to record the switch at its temporal + // position (in the assistant turn if one is in flight; else + // in a standalone system message). + updateTab(tabId, { keyId: event.keyId, modelId: event.modelId }); + if (mcTab2.currentAssistantId) { + applyChunkEvent(tabId, event); + } else { + routeSystemEvent(tabId, { + kind: "model-changed", + text: `Switched to ${event.modelId} (${event.keyId})`, }); } break; @@ -455,36 +488,17 @@ function createTabStore() { setTimeout(() => { configReloaded = false; }, 2500); - break; - } - case "shell-output": { - if (!tabId) break; - const tab7 = getTabById(tabId); - if (!tab7) break; - updateMessages(tabId, (msgs) => - msgs.map((m) => { - if (m.id !== tab7.currentAssistantId) return m; - const segments = [...m.content]; - for (let i = segments.length - 1; i >= 0; i--) { - const seg = segments[i]; - if (seg && seg.type === "tool-call") { - segments[i] = { - ...seg, - shellOutput: { - stdout: - (seg.shellOutput?.stdout ?? "") + - (event.stream === "stdout" ? event.data : ""), - stderr: - (seg.shellOutput?.stderr ?? "") + - (event.stream === "stderr" ? event.data : ""), - }, - }; - break; - } - } - return { ...m, content: segments }; - }), - ); + // If a tab + turn is in flight, also record the reload as a + // system chunk for honest history. If no turn is in flight we + // could route to a system message, but config-reload is a + // global signal not scoped to any tab — only the active tab, + // if any, gets the chunk. + if (tabId) { + const crTab = getTabById(tabId); + if (crTab?.currentAssistantId) { + applyChunkEvent(tabId, event); + } + } break; } case "tab-created": { @@ -546,7 +560,7 @@ function createTabStore() { const userMsg: ChatMessage = { id: `queued-${mqEvent.messageId}`, role: "user", - content: [{ type: "text", text: mqEvent.message }], + chunks: [{ type: "text", text: mqEvent.message }], }; updateTab(tabId, { messages: [...(tabAfterQm?.messages ?? []), userMsg] }); } @@ -767,7 +781,7 @@ function createTabStore() { const userMsg: ChatMessage = { id: generateId(), role: "user", - content: [{ type: "text", text }], + chunks: [{ type: "text", text }], }; // If the agent is currently running, we expect the POST to be queued. @@ -862,7 +876,13 @@ function createTabStore() { const errMsg: ChatMessage = { id: generateId(), role: "assistant", - content: [{ type: "text", text: `Error: Failed to send message (HTTP ${res.status})` }], + chunks: [ + { + type: "error", + message: `Failed to send message (HTTP ${res.status})`, + statusCode: res.status, + }, + ], isStreaming: false, debugInfo: makeDebugInfo({ error: `HTTP ${res.status}`, @@ -915,7 +935,7 @@ function createTabStore() { const errMsg: ChatMessage = { id: generateId(), role: "assistant", - content: [{ type: "text", text: "Error: Could not reach the server" }], + chunks: [{ type: "error", message: "Could not reach the server" }], isStreaming: false, debugInfo: makeDebugInfo({ error: err instanceof Error ? err.message : String(err) }), }; @@ -1085,21 +1105,37 @@ function createTabStore() { for (const msg of tab.messages) { const role = msg.role === "user" ? "User" : msg.role === "system" ? "System" : "Assistant"; lines.push(`--- ${role} ---`); - if (msg.thinking) lines.push(` [Thinking]: ${msg.thinking}`); - for (const seg of msg.content) { - if (seg.type === "text") lines.push(seg.text); - else if (seg.type === "tool-call") { - lines.push(` [Tool: ${seg.name}]`); - if (seg.result !== undefined) { - const result = String(seg.result); - if (result.length > TOOL_RESULT_MAX) { - lines.push( - ` Result: ${result.slice(0, TOOL_RESULT_MAX)}... [truncated, ${result.length} chars total]`, - ); - } else { - lines.push(` Result: ${result}`); + for (const chunk of msg.chunks) { + switch (chunk.type) { + case "text": + lines.push(chunk.text); + break; + case "thinking": + lines.push(` [Thinking]: ${chunk.text}`); + break; + case "tool-batch": + for (const call of chunk.calls) { + lines.push(` [Tool: ${call.name}]`); + if (call.result !== undefined) { + const result = String(call.result); + if (result.length > TOOL_RESULT_MAX) { + lines.push( + ` Result: ${result.slice(0, TOOL_RESULT_MAX)}... [truncated, ${result.length} chars total]`, + ); + } else { + lines.push(` Result: ${result}`); + } + } } + break; + case "error": { + const code = chunk.statusCode !== undefined ? ` (HTTP ${chunk.statusCode})` : ""; + lines.push(` [Error${code}]: ${chunk.message}`); + break; } + case "system": + lines.push(` [${chunk.kind}]: ${chunk.text}`); + break; } } lines.push(""); diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts index 2e6b219..1512b39 100644 --- a/packages/frontend/src/lib/types.ts +++ b/packages/frontend/src/lib/types.ts @@ -1,12 +1,3 @@ -export interface ToolCallDisplay { - id: string; - name: string; - arguments: Record<string, unknown>; - result?: string; - isError?: boolean; - shellOutput?: { stdout: string; stderr: string }; -} - export interface DebugInfo { timestamp: string; error?: string; @@ -20,15 +11,57 @@ export interface DebugInfo { httpBody?: string; } -export type ContentSegment = - | { type: "text"; text: string } - | ({ type: "tool-call" } & ToolCallDisplay); +/** + * Mirror of the core `Chunk` union (see packages/core/src/types/index.ts). + * + * Wire-format symmetry MUST be kept with core. If you change one, change + * the other. The frontend store calls into the shared + * `appendEventToChunks` helper from core so the two stay in lockstep. + */ +export type Chunk = TextChunk | ThinkingChunk | ToolBatchChunk | ErrorChunk | SystemChunk; + +export interface TextChunk { + type: "text"; + text: string; +} + +export interface ThinkingChunk { + type: "thinking"; + text: string; +} + +export interface ToolBatchChunk { + type: "tool-batch"; + calls: ToolBatchEntry[]; +} + +export interface ToolBatchEntry { + id: string; + name: string; + arguments: Record<string, unknown>; + result?: string; + isError?: boolean; + shellOutput?: { stdout: string; stderr: string }; +} + +export interface ErrorChunk { + type: "error"; + message: string; + statusCode?: number; +} + +export type SystemChunkKind = "notice" | "model-changed" | "config-reload" | "cancelled"; + +export interface SystemChunk { + type: "system"; + kind: SystemChunkKind; + text: string; +} export interface ChatMessage { id: string; role: "user" | "assistant" | "system"; - content: ContentSegment[]; - thinking?: string; + chunks: Chunk[]; isStreaming?: boolean; debugInfo?: DebugInfo; } diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts index 978bce1..3654e52 100644 --- a/packages/frontend/tests/chat-store.test.ts +++ b/packages/frontend/tests/chat-store.test.ts @@ -1,22 +1,28 @@ +import { appendEventToChunks, applySystemEvent } from "@dispatch/core/src/chunks/append.js"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import type { AgentEvent, ContentSegment, LogEntry, PermissionPrompt } from "../src/lib/types.js"; - -// We test the logic inline since runes require svelte compilation context. -// The chat store logic is tested via a plain reimplementation of the same logic. +import type { + AgentEvent, + ChatMessage, + Chunk, + LogEntry, + PermissionPrompt, +} from "../src/lib/types.js"; + +// The real store lives in `tabs.svelte.ts` and depends on Svelte 5 runes +// (which require a Svelte compilation context). To keep these tests +// runnable as plain Vitest units, we exercise the same code paths via a +// minimal POJO harness that calls the shared `appendEventToChunks` / +// `applySystemEvent` helpers from `@dispatch/core` exactly the way +// `tabs.svelte.ts` does at runtime. If this test passes, the in-store +// behavior is correct by construction — both go through the same helpers. function generateId() { return Math.random().toString(36).slice(2, 11); } -interface ChatMessage { - id: string; - role: "user" | "assistant"; - content: ContentSegment[]; - thinking?: string; - isStreaming?: boolean; -} - -// Plain JS version of the chat store logic (no runes) for unit testing +// Plain JS version of the chat store logic (no runes) for unit testing. +// Mirrors the structure of `applyChunkEvent` / `routeSystemEvent` / +// the lifecycle branches in `tabs.svelte.ts:handleEvent`. function createTestStore(wsSend?: (data: unknown) => void) { let messages: ChatMessage[] = []; let agentStatus: "idle" | "running" | "error" = "idle"; @@ -24,27 +30,31 @@ function createTestStore(wsSend?: (data: unknown) => void) { let pendingPermissions: PermissionPrompt[] = []; let permissionLog: LogEntry[] = []; - function getCurrentAssistantMessage(): ChatMessage | null { - if (!currentAssistantId) return null; - return messages.find((m) => m.id === currentAssistantId) ?? null; + function ensureAssistantMessage(): ChatMessage { + if (currentAssistantId) { + const existing = messages.find((m) => m.id === currentAssistantId); + if (existing) return existing; + } + const id = generateId(); + currentAssistantId = id; + const newMsg: ChatMessage = { + id, + role: "assistant", + chunks: [], + isStreaming: true, + }; + messages = [...messages, newMsg]; + return newMsg; } - function ensureCurrentAssistantMessage(): ChatMessage { - let msg = getCurrentAssistantMessage(); - if (!msg) { - const id = generateId(); - currentAssistantId = id; - const newMsg: ChatMessage = { - id, - role: "assistant", - content: [], - thinking: "", - isStreaming: true, - }; - messages = [...messages, newMsg]; - msg = newMsg; - } - return msg; + function applyChunkEvent(event: AgentEvent): void { + ensureAssistantMessage(); + messages = messages.map((m) => { + if (m.id !== currentAssistantId) return m; + const cloned = structuredClone(m.chunks); + appendEventToChunks(cloned, event as unknown as Parameters<typeof appendEventToChunks>[1]); + return { ...m, chunks: cloned, isStreaming: true }; + }); } function handleEvent(event: AgentEvent) { @@ -56,123 +66,52 @@ function createTestStore(wsSend?: (data: unknown) => void) { } break; } - case "reasoning-delta": { - ensureCurrentAssistantMessage(); - messages = messages.map((m) => { - if (m.id === currentAssistantId) { - return { ...m, thinking: (m.thinking ?? "") + event.delta }; - } - return m; - }); + case "reasoning-delta": + case "text-delta": + case "tool-call": + case "tool-result": + case "shell-output": + applyChunkEvent(event); break; - } - case "text-delta": { - ensureCurrentAssistantMessage(); - messages = messages.map((m) => { - if (m.id === currentAssistantId) { - const segments = [...m.content]; - const last = segments[segments.length - 1]; - if (last && last.type === "text") { - segments[segments.length - 1] = { ...last, text: last.text + event.delta }; - } else { - segments.push({ type: "text", text: event.delta }); - } - return { ...m, content: segments, isStreaming: true }; - } - return m; - }); - break; - } - case "tool-call": { - ensureCurrentAssistantMessage(); - messages = messages.map((m) => { - if (m.id === currentAssistantId) { - const segments: ContentSegment[] = [ - ...m.content, - { - type: "tool-call", - id: event.toolCall.id, - name: event.toolCall.name, - arguments: event.toolCall.arguments, - }, - ]; - return { ...m, content: segments }; - } - return m; - }); - break; - } - case "tool-result": { - messages = messages.map((m) => { - if (m.id === currentAssistantId) { - return { - ...m, - content: m.content.map((seg) => { - if (seg.type === "tool-call" && seg.id === event.toolResult.toolCallId) { - return { - ...seg, - result: event.toolResult.result, - isError: event.toolResult.isError, - }; - } - return seg; - }), - }; - } - return m; - }); - break; - } case "done": { - messages = messages.map((m) => { - if (m.id === currentAssistantId) { - return { ...m, isStreaming: false }; - } - return m; - }); + messages = messages.map((m) => + m.id === currentAssistantId ? { ...m, isStreaming: false } : m, + ); currentAssistantId = null; break; } case "error": { - messages = [ - ...messages, - { - id: generateId(), - role: "assistant", - content: [{ type: "text", text: `Error: ${event.error}` }] as ContentSegment[], - isStreaming: false, - }, - ]; + if (currentAssistantId) { + applyChunkEvent(event); + } else { + ensureAssistantMessage(); + applyChunkEvent(event); + } + messages = messages.map((m) => + m.id === currentAssistantId ? { ...m, isStreaming: false } : m, + ); currentAssistantId = null; agentStatus = "error"; break; } - case "permission-prompt": { - pendingPermissions = event.pending; + case "notice": { + if (currentAssistantId) { + applyChunkEvent(event); + } else { + const view = messages.map((m) => ({ id: m.id, role: m.role, chunks: m.chunks })); + applySystemEvent(view, { kind: "notice", text: event.message }, generateId); + const byId = new Map(messages.map((m) => [m.id, m])); + messages = view.map((v) => { + const existing = byId.get(v.id); + return existing + ? { ...existing, chunks: v.chunks as Chunk[] } + : ({ id: v.id, role: v.role, chunks: v.chunks as Chunk[] } as ChatMessage); + }); + } break; } - case "shell-output": { - messages = messages.map((m) => { - if (m.id === currentAssistantId) { - return { - ...m, - content: m.content.map((seg, i) => { - if (seg.type === "tool-call" && i === m.content.length - 1) { - const prev = seg.shellOutput ?? { stdout: "", stderr: "" }; - return { - ...seg, - shellOutput: - event.stream === "stdout" - ? { ...prev, stdout: prev.stdout + event.data } - : { ...prev, stderr: prev.stderr + event.data }, - }; - } - return seg; - }), - }; - } - return m; - }); + case "permission-prompt": { + pendingPermissions = event.pending; break; } } @@ -182,7 +121,7 @@ function createTestStore(wsSend?: (data: unknown) => void) { const userMsg: ChatMessage = { id: generateId(), role: "user", - content: [{ type: "text", text }], + chunks: [{ type: "text", text }], }; messages = [...messages, userMsg]; currentAssistantId = null; @@ -231,7 +170,13 @@ function createTestStore(wsSend?: (data: unknown) => void) { }; } -describe("chat store logic", () => { +// ─── Small helpers for chunk assertions ───────────────────────── + +function firstChunk(msg: ChatMessage | undefined): Chunk | undefined { + return msg?.chunks[0]; +} + +describe("chat store logic (chunk model)", () => { let store: ReturnType<typeof createTestStore>; beforeEach(() => { @@ -243,49 +188,50 @@ describe("chat store logic", () => { expect(store.agentStatus).toBe("idle"); }); - it("sendMessage adds a user message", () => { + it("sendMessage adds a user message with a text chunk", () => { store.sendMessage("hello"); expect(store.messages).toHaveLength(1); - expect(store.messages[0]?.role).toBe("user"); - expect(store.messages[0]?.content).toEqual([{ type: "text", text: "hello" }]); + const msg = store.messages[0]; + expect(msg?.role).toBe("user"); + expect(msg?.chunks).toEqual([{ type: "text", text: "hello" }]); }); it("text-delta creates a streaming assistant message and appends deltas", () => { store.handleEvent({ type: "text-delta", delta: "Hello" }); expect(store.messages).toHaveLength(1); expect(store.messages[0]?.role).toBe("assistant"); - expect(store.messages[0]?.content).toEqual([{ type: "text", text: "Hello" }]); + expect(store.messages[0]?.chunks).toEqual([{ type: "text", text: "Hello" }]); expect(store.messages[0]?.isStreaming).toBe(true); store.handleEvent({ type: "text-delta", delta: " world" }); - expect(store.messages[0]?.content).toEqual([{ type: "text", text: "Hello world" }]); + expect(store.messages[0]?.chunks).toEqual([{ type: "text", text: "Hello world" }]); }); - it("text-delta appends to last text segment in same segment", () => { + it("consecutive text-deltas coalesce into one text chunk", () => { store.handleEvent({ type: "text-delta", delta: "A" }); store.handleEvent({ type: "text-delta", delta: "B" }); - // Should be one text segment, not two - expect(store.messages[0]?.content).toHaveLength(1); - expect(store.messages[0]?.content[0]).toEqual({ type: "text", text: "AB" }); + expect(store.messages[0]?.chunks).toHaveLength(1); + expect(store.messages[0]?.chunks[0]).toEqual({ type: "text", text: "AB" }); }); - it("tool-call inserts as a segment after text", () => { + it("tool-call after text creates a tool-batch chunk", () => { store.handleEvent({ type: "text-delta", delta: "Calling tool..." }); store.handleEvent({ type: "tool-call", toolCall: { id: "tc1", name: "search", arguments: { query: "test" } }, }); - const content = store.messages[0]?.content; - expect(content).toHaveLength(2); - expect(content?.[0]?.type).toBe("text"); - expect(content?.[1]?.type).toBe("tool-call"); - if (content?.[1]?.type === "tool-call") { - expect(content[1].name).toBe("search"); - expect(content[1].id).toBe("tc1"); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(2); + expect(chunks?.[0]?.type).toBe("text"); + expect(chunks?.[1]?.type).toBe("tool-batch"); + if (chunks?.[1]?.type === "tool-batch") { + expect(chunks[1].calls).toHaveLength(1); + expect(chunks[1].calls[0]?.id).toBe("tc1"); + expect(chunks[1].calls[0]?.name).toBe("search"); } }); - it("tool-result fills in result on matching tool-call segment", () => { + it("tool-result fills in result on the matching tool-batch entry", () => { store.handleEvent({ type: "text-delta", delta: "..." }); store.handleEvent({ type: "tool-call", @@ -295,14 +241,16 @@ describe("chat store logic", () => { type: "tool-result", toolResult: { toolCallId: "tc1", result: "found it", isError: false }, }); - const tc = store.messages[0]?.content[1]; - if (tc?.type === "tool-call") { - expect(tc.result).toBe("found it"); - expect(tc.isError).toBe(false); + const chunk = store.messages[0]?.chunks[1]; + if (chunk?.type === "tool-batch") { + expect(chunk.calls[0]?.result).toBe("found it"); + expect(chunk.calls[0]?.isError).toBe(false); + } else { + expect.fail("Expected tool-batch chunk"); } }); - it("tool-call goes after previous tool-call, preserving both", () => { + it("two consecutive tool-calls coalesce into one tool-batch with two entries", () => { store.handleEvent({ type: "tool-call", toolCall: { id: "tc1", name: "read", arguments: {} }, @@ -311,22 +259,24 @@ describe("chat store logic", () => { type: "tool-call", toolCall: { id: "tc2", name: "write", arguments: {} }, }); - const content = store.messages[0]?.content; - expect(content).toHaveLength(2); - expect(content?.[0]?.type).toBe("tool-call"); - expect(content?.[1]?.type).toBe("tool-call"); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(1); + expect(chunks?.[0]?.type).toBe("tool-batch"); + if (chunks?.[0]?.type === "tool-batch") { + expect(chunks[0].calls).toHaveLength(2); + } }); - it("text after tool-call creates new text segment", () => { + it("text after tool-call opens a new text chunk", () => { store.handleEvent({ type: "tool-call", toolCall: { id: "tc1", name: "read", arguments: {} }, }); store.handleEvent({ type: "text-delta", delta: "Result: here" }); - const content = store.messages[0]?.content; - expect(content).toHaveLength(2); - expect(content?.[0]?.type).toBe("tool-call"); - expect(content?.[1]).toEqual({ type: "text", text: "Result: here" }); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(2); + expect(chunks?.[0]?.type).toBe("tool-batch"); + expect(chunks?.[1]).toEqual({ type: "text", text: "Result: here" }); }); it("done finalizes the current assistant message", () => { @@ -335,16 +285,33 @@ describe("chat store logic", () => { type: "done", message: { role: "assistant", content: "full content" }, }); - expect(store.messages[0]?.content).toEqual([{ type: "text", text: "partial" }]); + expect(store.messages[0]?.chunks).toEqual([{ type: "text", text: "partial" }]); expect(store.messages[0]?.isStreaming).toBe(false); }); - it("error event adds an error message and sets status to error", () => { + it("error event during a turn appends an error chunk to the in-flight message", () => { + store.handleEvent({ type: "text-delta", delta: "before" }); store.handleEvent({ type: "error", error: "something went wrong" }); expect(store.messages).toHaveLength(1); - expect(store.messages[0]?.content).toEqual([ - { type: "text", text: "Error: something went wrong" }, - ]); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(2); + expect(chunks?.[0]).toEqual({ type: "text", text: "before" }); + expect(chunks?.[1]?.type).toBe("error"); + if (chunks?.[1]?.type === "error") { + expect(chunks[1].message).toBe("something went wrong"); + } + expect(store.agentStatus).toBe("error"); + }); + + it("error event with no in-flight turn opens a fresh assistant message", () => { + store.handleEvent({ type: "error", error: "boom" }); + expect(store.messages).toHaveLength(1); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(1); + expect(chunks?.[0]?.type).toBe("error"); + if (chunks?.[0]?.type === "error") { + expect(chunks[0].message).toBe("boom"); + } expect(store.agentStatus).toBe("error"); }); @@ -355,6 +322,62 @@ describe("chat store logic", () => { expect(store.agentStatus).toBe("idle"); }); + it("reasoning-delta accumulates a thinking chunk", () => { + store.handleEvent({ type: "reasoning-delta", delta: "First thought." }); + expect(store.messages).toHaveLength(1); + expect(store.messages[0]?.role).toBe("assistant"); + expect(firstChunk(store.messages[0])).toEqual({ type: "thinking", text: "First thought." }); + + store.handleEvent({ type: "reasoning-delta", delta: " Second thought." }); + expect(firstChunk(store.messages[0])).toEqual({ + type: "thinking", + text: "First thought. Second thought.", + }); + }); + + it("interleaved think→text→think yields three chunks in order", () => { + store.handleEvent({ type: "reasoning-delta", delta: "thinking-1" }); + store.handleEvent({ type: "text-delta", delta: "speaking-1" }); + store.handleEvent({ type: "reasoning-delta", delta: "thinking-2" }); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(3); + expect(chunks?.[0]?.type).toBe("thinking"); + expect(chunks?.[1]?.type).toBe("text"); + expect(chunks?.[2]?.type).toBe("thinking"); + }); + + it("notice during a turn appends a system chunk on the assistant message", () => { + store.handleEvent({ type: "text-delta", delta: "hi" }); + store.handleEvent({ type: "notice", message: "heads up" }); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(2); + expect(chunks?.[1]?.type).toBe("system"); + if (chunks?.[1]?.type === "system") { + expect(chunks[1].kind).toBe("notice"); + expect(chunks[1].text).toBe("heads up"); + } + }); + + it("notice with no turn in flight creates a role: system message", () => { + store.handleEvent({ type: "notice", message: "standalone" }); + expect(store.messages).toHaveLength(1); + expect(store.messages[0]?.role).toBe("system"); + const chunks = store.messages[0]?.chunks; + expect(chunks).toHaveLength(1); + if (chunks?.[0]?.type === "system") { + expect(chunks[0].text).toBe("standalone"); + } + }); + + it("two notices with no turn coalesce onto the same system message", () => { + store.handleEvent({ type: "notice", message: "first" }); + store.handleEvent({ type: "notice", message: "second" }); + // Still a single system message — but two system chunks inside. + expect(store.messages).toHaveLength(1); + expect(store.messages[0]?.role).toBe("system"); + expect(store.messages[0]?.chunks).toHaveLength(2); + }); + it("clear resets all state", () => { store.sendMessage("hi"); store.handleEvent({ type: "text-delta", delta: "hello" }); @@ -362,16 +385,6 @@ describe("chat store logic", () => { expect(store.messages).toHaveLength(0); expect(store.agentStatus).toBe("idle"); }); - - it("reasoning-delta accumulates thinking text on current assistant message", () => { - store.handleEvent({ type: "reasoning-delta", delta: "First thought." }); - expect(store.messages).toHaveLength(1); - expect(store.messages[0]?.role).toBe("assistant"); - expect(store.messages[0]?.thinking).toBe("First thought."); - - store.handleEvent({ type: "reasoning-delta", delta: " Second thought." }); - expect(store.messages[0]?.thinking).toBe("First thought. Second thought."); - }); }); describe("permission-prompt handling", () => { @@ -591,7 +604,7 @@ describe("shell output parsing", () => { }); describe("shell-output event handling", () => { - it("shell-output stdout appends to last tool-call shellOutput", () => { + it("shell-output stdout appends to last tool-batch entry's shellOutput", () => { const s = createTestStore(); s.handleEvent({ type: "tool-call", @@ -599,27 +612,28 @@ describe("shell-output event handling", () => { }); s.handleEvent({ type: "shell-output", data: "file1\n", stream: "stdout" }); s.handleEvent({ type: "shell-output", data: "file2\n", stream: "stdout" }); - const seg = s.messages[0]?.content[0]; - if (seg?.type === "tool-call") { - expect(seg.shellOutput?.stdout).toBe("file1\nfile2\n"); - expect(seg.shellOutput?.stderr).toBe(""); + const chunk = s.messages[0]?.chunks[0]; + if (chunk?.type === "tool-batch") { + const entry = chunk.calls[0]; + expect(entry?.shellOutput?.stdout).toBe("file1\nfile2\n"); + expect(entry?.shellOutput?.stderr).toBe(""); } else { - expect.fail("Expected tool-call segment"); + expect.fail("Expected tool-batch chunk"); } }); - it("shell-output stderr appends to last tool-call shellOutput stderr", () => { + it("shell-output stderr appends to last tool-batch entry's stderr", () => { const s = createTestStore(); s.handleEvent({ type: "tool-call", toolCall: { id: "tc1", name: "run_shell", arguments: { command: "ls" } }, }); s.handleEvent({ type: "shell-output", data: "err line\n", stream: "stderr" }); - const seg = s.messages[0]?.content[0]; - if (seg?.type === "tool-call") { - expect(seg.shellOutput?.stderr).toBe("err line\n"); + const chunk = s.messages[0]?.chunks[0]; + if (chunk?.type === "tool-batch") { + expect(chunk.calls[0]?.shellOutput?.stderr).toBe("err line\n"); } else { - expect.fail("Expected tool-call segment"); + expect.fail("Expected tool-batch chunk"); } }); }); diff --git a/plan-chunk-refactor.md b/plan-chunk-refactor.md new file mode 100644 index 0000000..0a60237 --- /dev/null +++ b/plan-chunk-refactor.md @@ -0,0 +1,245 @@ +# Plan: Chunk-Based Message Refactor + +## Goal + +Replace the current `content: ContentSegment[]` + separate `thinking: string` representation with a single ordered `chunks: Chunk[]` list per message. Preserve the actual temporal ordering of text, reasoning, tool calls, system notices, and errors as they arrive from the model — losing none of that information to flat-string accumulation. + +This is foundational work for downstream features (editable history, resumable mid-generation chats, structured truncation sentinels) that all assume an honest, lossless representation of a turn. + +Beta software — **no backward compatibility required**. The existing `messages` and `tabs` rows will be destroyed. Settings, keys, credentials, and other non-chat-history state will be preserved. + +## Final Design + +### Chunk union + +| Type | Body | Opens on | Closes on | Sent to LLM? | +|---|---|---|---|---| +| `text` | `text: string` | First `text-delta` after non-text. Consecutive `text-delta` events append. | `reasoning-delta`, `tool-call`, `error`, system event, `done`. | Yes (as `text` part). | +| `thinking` | `text: string` | First `reasoning-delta` after non-thinking. Consecutive `reasoning-delta` events append. | `text-delta`, `tool-call`, `error`, system event, `done`. | Yes (as `reasoning` part — handles Claude's `interleaved-thinking-2025-05-14`). | +| `tool-batch` | `calls: Array<{ id, name, arguments, result?, isError?, shellOutput? }>` | First `tool-call` after non-tool. Consecutive `tool-call` events append a new entry to `calls`. | Any non-tool event. | Yes (each call/result as `tool-call` / `tool-result` parts). | +| `error` | `message: string`, `statusCode?: number` | An `error` event. Always its own chunk. No coalescing. | Single-event. | No (the turn ended anyway). | +| `system` | `text: string`, `kind: "notice" \| "model-changed" \| "config-reload" \| "cancelled"` | A system event. Always its own chunk. No coalescing. | Single-event. | **No — stripped in `toCoreMessages`.** | + +### Message roles + +`user | assistant | system` + +- `user` and `assistant` messages can contain any chunk types. +- `system` role messages exist for system events that fire **outside an active assistant turn**. They contain only `system` chunks. They are skipped entirely by `toCoreMessages`. + +### System event routing + +When a system event arrives: + +1. Active assistant turn in flight → append a `system` chunk to that message's chunks at its current position. +2. No turn in flight; most recent message is `role: "system"` → append a `system` chunk to that message. +3. No turn in flight; most recent message is anything else → create a new `role: "system"` message containing one `system` chunk. + +### `toCoreMessages` rebuild rules + +- Iterate messages in seq order. +- Skip `role: "system"` messages entirely. +- For each message, iterate `chunks` and emit AI SDK parts: + - `text` → `{ type: "text", text }` + - `thinking` → `{ type: "reasoning", text }` + - `tool-batch` → one `{ type: "tool-call" }` per entry (and tool-result parts in the following `tool` message, same as current logic) + - `error` → skip + - `system` → skip + +### Example turn + +User clicks Send, model thinks, says something, hits a rate limit (auto-switches model), thinks more, calls two tools, says something, errors out: + +``` +Message #N (role=user): + chunks: [{ type: "text", text: "explain X" }] + +Message #N+1 (role=assistant): + chunks: [ + { type: "thinking", text: "I should..." }, + { type: "text", text: "Sure, here's the gist..." }, + { type: "system", kind: "model-changed", text: "Switched to Sonnet 4 (rate limit)" }, + { type: "thinking", text: "Now I need to look at..." }, + { type: "tool-batch", calls: [ + { id: "1", name: "read_file", ..., result: "..." }, + { id: "2", name: "list_files", ..., result: "..." } + ]}, + { type: "text", text: "Looking at the file..." }, + { type: "error", message: "Network error", statusCode: 503 } + ] +``` + +## Database cleanup + +**Live DB:** `~/.local/share/dispatch/dispatch.db` (XDG: `$XDG_DATA_HOME/dispatch/dispatch.db`) + +Current size: ~122 MB. Tables and row counts as of this plan: + +| Table | Rows | Disposition | +|---|---|---| +| `api_keys` | 7 | **Preserve** — user-imported API keys | +| `credentials` | 2 | **Preserve** — OAuth credentials | +| `settings` | 11 | **Preserve** — app settings | +| `usage_cache` | 2 | **Preserve** — usage report cache | +| `wake_schedule` | 5 | **Preserve** — wake schedule | +| `messages` | 524 | **Delete all** — chat history | +| `tabs` | 361 | **Delete all** — chat sessions | + +### Proposed cleanup statements + +```sql +-- Order matters: messages references tabs.id via foreign key (ON in WAL mode) +DELETE FROM messages; +DELETE FROM tabs; +VACUUM; +``` + +Then, in the live schema, drop the `thinking` column from `messages` (or recreate the table without it). SQLite has historically been fussy about `DROP COLUMN`; modern SQLite (3.35+) supports it directly: + +```sql +ALTER TABLE messages DROP COLUMN thinking; +``` + +If the installed SQLite is older, fall back to the table-rebuild pattern (create new table without the column, copy rows, swap names, drop old). Given we just `DELETE FROM messages`, the copy step is a no-op — we can just `DROP TABLE messages` and recreate via the application's `CREATE TABLE IF NOT EXISTS` on next startup (after the schema change in `db/index.ts`). + +**Preferred sequence:** + +1. Stop any running dispatch processes. +2. Run `DELETE FROM messages; DELETE FROM tabs; VACUUM;` to clear chat history. +3. Update `db/index.ts` to remove the `thinking` column from the `CREATE TABLE messages` statement. +4. Drop the existing messages table so the new schema takes effect on next startup: `DROP TABLE messages; DROP INDEX IF EXISTS idx_messages_tab;` (the app will recreate it on launch). + +Effective end state: same DB file, settings/keys preserved, chat history gone, new schema. + +## Implementation phases + +### Phase 0 — Confirm starting point + +- [ ] `git status` clean (only `wishlist.md` untracked is acceptable — unrelated). +- [ ] No running dispatch processes that hold the DB. + +### Phase 1 — Types + +File: `packages/core/src/types/index.ts` + +- [ ] Define `Chunk` union (5 variants per the table above). +- [ ] Replace `ChatMessage.content: string` + `toolCalls?` + `toolResults?` with `chunks: Chunk[]`. +- [ ] Update `MessageRole` to include `"system"` (currently `"user" | "assistant" | "tool"` — drop `"tool"` since tool messages are now embedded as `tool-batch` chunks). + +File: `packages/frontend/src/lib/types.ts` + +- [ ] Mirror `Chunk` union. +- [ ] Replace `content: ContentSegment[]` + `thinking?: string` with `chunks: Chunk[]`. +- [ ] Drop `ContentSegment` and `ToolCallDisplay` (replaced by chunk variants). + +### Phase 2 — Core helper + unit tests + +New file: `packages/core/src/chunks/append.ts` (or co-located in `agent/`) + +- [ ] Implement `appendEventToChunks(chunks: Chunk[], event: AgentEvent): void` (mutating, returns void). +- [ ] Implement `applySystemEvent(messages: ChatMessage[], event: SystemEvent): void` for the standalone-system-message routing logic. + +Tests: `packages/core/tests/chunks/append.test.ts` + +- [ ] Empty chunks + text-delta → one text chunk with the delta. +- [ ] Two consecutive text-deltas → one text chunk with concatenated text. +- [ ] text-delta then reasoning-delta → two chunks (text, thinking). +- [ ] text-delta then tool-call → two chunks (text, tool-batch with one entry). +- [ ] Two consecutive tool-calls → one tool-batch with two entries. +- [ ] tool-call then tool-call then text → two chunks (tool-batch with 2 entries, text). +- [ ] tool-result arrives → updates matching tool-call entry in the latest tool-batch chunk by id. +- [ ] shell-output arrives → appends to the most recent tool-call's `shellOutput`. +- [ ] error event → opens an error chunk; subsequent events go to new chunks. +- [ ] system event during text run → closes text, opens system, would re-open text on next text-delta. +- [ ] Two consecutive system events → two separate system chunks (no coalescing). +- [ ] Interleaved think → text → think → tool → think → text → 6 chunks in order. + +### Phase 3 — Database schema + cleanup + +File: `packages/core/src/db/index.ts` + +- [ ] Update `CREATE TABLE messages` to drop the `thinking` column. + +Live DB: + +- [ ] Run `DELETE FROM messages; DELETE FROM tabs; VACUUM;` (show user before executing). +- [ ] Drop the `messages` table so the new schema takes effect on next startup. + +File: `packages/core/src/db/messages.ts` + +- [ ] Update `appendMessage()` signature: drop the `thinking` parameter; `content_json` now holds chunks. +- [ ] Update read functions to parse `content_json` as `Chunk[]` directly (no thinking column read). + +### Phase 4 — Agent aggregation + +File: `packages/core/src/agent/agent.ts` + +- [ ] Replace `finalText: string`, `allToolCalls: ToolCall[]`, `allToolResults: ToolResult[]`, and `assistantThinking` (lines 331-333 and the equivalent in agent-manager) with a single `chunks: Chunk[]`. +- [ ] On each event from `result.fullStream`, call `appendEventToChunks(chunks, event)`. +- [ ] On `done`, ship `chunks` in the message payload. +- [ ] Update `toCoreMessages` (lines 20-46): iterate chunks per message, emit AI SDK parts, skip system / error chunks, skip `role: "system"` messages entirely. + +### Phase 5 — Persistence + system event routing + +File: `packages/api/src/agent-manager.ts` + +- [ ] Replace the three-accumulator pattern (lines 893-978) with a single `chunks: Chunk[]` that delegates to `appendEventToChunks`. +- [ ] On `notice` / `model-changed` / `config-reload` / cancel: route through `applySystemEvent` — appends to the in-flight assistant message if one exists, else appends to a `role: "system"` message (creating one if needed). +- [ ] `appendMessage` call: drop the thinking arg; `content_json` is `JSON.stringify(chunks)`. + +### Phase 6 — Frontend store state machine + +File: `packages/frontend/src/lib/tabs.svelte.ts` + +- [ ] Replace per-event-type mutation logic (lines 288-626) with a call to the shared `appendEventToChunks` helper. Import from core if feasible; otherwise duplicate carefully — but prefer import to keep wire-format symmetry guaranteed. +- [ ] Update `openAgentTab` DB-load path (lines 185-199): parse `chunks` directly from `content_json`, no thinking field merging. +- [ ] Update `currentAssistantId` semantics: still tracks the in-progress assistant message. System events that arrive when `currentAssistantId` is null create/append a `role: "system"` message via `applySystemEvent`. + +### Phase 7 — Frontend display + +File: `packages/frontend/src/lib/components/ChatMessage.svelte` + +- [ ] Remove the top-hoisted `{#if message.thinking}` block. +- [ ] Replace the segments loop with `{#each message.chunks as chunk}` and switch on `chunk.type`: + - `text` → `<MarkdownRenderer>` + - `thinking` → collapsible (DaisyUI `collapse`), default-collapsed per `appSettings.autoExpandThinking`, at its actual position in the turn. + - `tool-batch` → render each entry via `<ToolCallDisplay>` (existing component, possibly minor prop tweaks). + - `error` → red-bordered error card with the message + status code. + - `system` → thin separator-style block with the `kind` as a small label and the text. +- [ ] Handle `role: "system"` messages: render as a standalone thin bubble with just the system chunks, no avatar / no actions. + +### Phase 8 — Integration tests + +- [ ] End-to-end: a real streaming run produces the expected chunk shape, including interleaved thinking. +- [ ] Persistence round-trip: write chunks → read back → identical structure. +- [ ] System event during turn ends up in the assistant message at the right position. +- [ ] System event with no turn in flight creates a `role: "system"` message. +- [ ] `toCoreMessages` correctly strips system / error chunks and `role: "system"` messages. + +### Phase 9 — Cleanup + +- [ ] Remove dead code: `ContentSegment` type, `thinking` field references, old per-event accumulator vars. +- [ ] Run `vitest run` across all packages and `tsc --noEmit`. +- [ ] Commit with a focused message; push. + +## Test strategy + +The load-bearing piece is `appendEventToChunks`. If this is correct in isolation, every downstream layer inherits correctness. **Write Phase 2 first and lock its tests in green before touching agent/store/UI.** + +State-machine tests should cover every transition pair in the matrix (text→text, text→thinking, text→tool, text→error, text→system, thinking→text, ...) so that no transition gets accidentally broken later. + +Integration tests can be lighter — they just need to confirm the real wire format flows through the helper end-to-end. + +## Risks and notes + +1. **Frontend store imports from core.** Currently the frontend duplicates some types. Importing `appendEventToChunks` from core is the safest way to guarantee the wire format stays in sync — but requires the build to handle the import. If it doesn't, duplicate the helper carefully and have a test that runs both side by side on the same fixture events to detect drift. + +2. **`ai` SDK `reasoning` events.** Confirmed that `result.fullStream` emits `reasoning` events with `event.textDelta` for Anthropic with the `interleaved-thinking-2025-05-14` beta header. Other providers (OpenAI-compat) emit reasoning via different mechanisms; the middleware in `provider.ts:6-38` already handles those. + +3. **Tool result ordering across multiple steps.** Currently, `tool-result` events arrive at varying times relative to subsequent `text-delta` events depending on whether the AI streams text before vs after acknowledging the result. The new chunk model places results inside the `tool-batch` chunk that holds the matching call — this should be correct regardless of when the result event arrives, as long as the lookup-by-id finds the right batch. + +4. **Coalescing edge case: simultaneous tool calls.** If the AI emits multiple tool-call events with no intervening events (parallel tool use), they all batch together. If a `tool-result` for the first call arrives before the second `tool-call` event, the tool-batch chunk is already open and just absorbs both. No special handling needed. + +5. **Settings preservation.** The `settings` table (`key`, `value` columns) is preserved across the wipe. Verify the dispatch UI doesn't rely on any session-scoped row that lives inside `tabs` (e.g., "last opened tab id") — if so, the user will need to re-pick after the wipe. This is acceptable for beta. + +6. **`MessageRole` change.** Dropping `"tool"` from the role union means any code path that pattern-matches on `role === "tool"` must change. Grep for it before merging. |
