diff options
| author | Adam Malczewski <[email protected]> | 2026-05-22 20:54:19 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-05-22 20:54:19 +0900 |
| commit | c47346cc6237044ecb60ff22c4011d89744af581 (patch) | |
| tree | 2359a25e687e1290ba5180fd60eae83b03b53a23 | |
| parent | 288b21cec98421fda57028a0c8c9d835cfbb14b0 (diff) | |
| download | dispatch-c47346cc6237044ecb60ff22c4011d89744af581.tar.gz dispatch-c47346cc6237044ecb60ff22c4011d89744af581.zip | |
feat: message queue/interrupt system, CORS fix, mobile fixes, chat splitting
- Add message queue allowing users to send messages while agent is running
- Queue messages are injected into tool results as [USER INTERRUPT]
- Retrieve tool interrupted via Promise.race when user message arrives
- Queued messages show with 'queued' badge and cancel button
- Consumed messages repositioned and chat splits at interrupt point
- New assistant message block created after interrupt for clean flow
- Add POST /chat/cancel endpoint for cancelling queued messages
- Fix CORS to allow any origin (Tailscale/LAN access)
- Fix crypto.randomUUID fallback for non-secure contexts (HTTP)
- Fix frontend API URL derivation from page hostname
- Auto-create DB tab if missing on processMessage (foreign key fix)
- Add error logging to processMessage catch block
- Fix working directory input sync on agent switch
- Fix agent mode button to re-apply agent settings
| -rw-r--r-- | packages/api/src/agent-manager.ts | 91 | ||||
| -rw-r--r-- | packages/api/src/app.ts | 19 | ||||
| -rw-r--r-- | packages/core/src/agent/agent.ts | 76 | ||||
| -rw-r--r-- | packages/core/src/tools/retrieve.ts | 28 | ||||
| -rw-r--r-- | packages/core/src/types/index.ts | 19 | ||||
| -rw-r--r-- | packages/frontend/src/lib/components/ChatInput.svelte | 5 | ||||
| -rw-r--r-- | packages/frontend/src/lib/components/ChatMessage.svelte | 29 | ||||
| -rw-r--r-- | packages/frontend/src/lib/components/ChatPanel.svelte | 3 | ||||
| -rw-r--r-- | packages/frontend/src/lib/components/ModelSelector.svelte | 22 | ||||
| -rw-r--r-- | packages/frontend/src/lib/config.ts | 12 | ||||
| -rw-r--r-- | packages/frontend/src/lib/tabs.svelte.ts | 260 | ||||
| -rw-r--r-- | packages/frontend/src/lib/types.ts | 11 |
12 files changed, 511 insertions, 64 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 82b8456..70cada4 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -21,6 +21,7 @@ import { loadConfig, loadSkills, ModelRegistry, + type QueuedMessage, refreshAccountCredentials, refreshAccountCredentialsAsync, resolveApiKey, @@ -129,6 +130,10 @@ interface TabAgent { toolsOverride?: string[]; /** Working directory override for child agents. */ workingDirectoryOverride?: string; + /** Queue of messages sent while the agent is running. */ + messageQueue: QueuedMessage[]; + /** Callbacks to wake up blocking tools waiting for queued messages. */ + queueListeners: Array<() => void>; } export class AgentManager { @@ -262,6 +267,8 @@ export class AgentManager { keyId: null, modelId: null, taskList, + messageQueue: [], + queueListeners: [], }; this.tabAgents.set(tabId, tabAgent); } @@ -503,8 +510,9 @@ export class AgentManager { tabAgent.modelId = null; } - const customSystemPrompt = getSetting("system_prompt") || undefined; - tabAgent.agent = new Agent({ + const customSystemPrompt = getSetting("system_prompt") || undefined; + tabAgent.agent = new Agent( + { model, apiKey, baseURL, @@ -515,7 +523,12 @@ export class AgentManager { ruleset, provider, ...(claudeCredentials ? { claudeCredentials } : {}), - }); + }, + { + dequeueMessages: () => this.dequeueMessages(tabId), + waitForQueuedMessage: () => this.waitForQueuedMessage(tabId), + }, + ); } return tabAgent.agent; } @@ -744,6 +757,19 @@ export class AgentManager { try { const agent = await this.getOrCreateAgentForTab(tabId, keyId, modelId); + // Ensure tab exists in DB (frontend may have failed to create it) + try { + const { getDatabase } = await import("@dispatch/core"); + const db = getDatabase(); + const exists = db.query("SELECT 1 FROM tabs WHERE id = $id").get({ $id: tabId }); + if (!exists) { + const { createTab } = await import("@dispatch/core"); + createTab(tabId, "New Tab", { keyId: keyId ?? null, modelId: modelId ?? null }); + } + } catch { + // Best-effort — if this fails, appendMessage will throw and we'll catch it below + } + // Persist user message to DB appendMessage( tabId, @@ -805,6 +831,7 @@ export class AgentManager { } } } catch (err) { + console.error(`[dispatch] processMessage error for tab ${tabId}:`, err); const errorMsg = err instanceof Error ? err.message : String(err); processError = errorMsg; tabAgent.status = "error"; @@ -839,6 +866,64 @@ export class AgentManager { } } + queueMessage(tabId: string, message: string, clientId?: string): { messageId: string } { + const tabAgent = this.tabAgents.get(tabId); + if (!tabAgent) throw new Error("Tab not found"); + const id = clientId || crypto.randomUUID(); + const queued: QueuedMessage = { id, message, timestamp: Date.now() }; + tabAgent.messageQueue.push(queued); + // Wake up any blocking tools waiting for queue + for (const listener of tabAgent.queueListeners) { + listener(); + } + tabAgent.queueListeners = []; + this.emit({ type: "message-queued", tabId, messageId: id, message }, tabId); + return { messageId: id }; + } + + cancelQueuedMessage(tabId: string, messageId: string): boolean { + const tabAgent = this.tabAgents.get(tabId); + if (!tabAgent) return false; + const idx = tabAgent.messageQueue.findIndex((m) => m.id === messageId); + if (idx === -1) return false; + tabAgent.messageQueue.splice(idx, 1); + this.emit({ type: "message-cancelled", tabId, messageId }, tabId); + return true; + } + + dequeueMessages(tabId: string): QueuedMessage[] { + const tabAgent = this.tabAgents.get(tabId); + if (!tabAgent) return []; + const messages = [...tabAgent.messageQueue]; + tabAgent.messageQueue = []; + if (messages.length > 0) { + this.emit( + { type: "message-consumed", tabId, messageIds: messages.map((m) => m.id) }, + tabId, + ); + } + return messages; + } + + waitForQueuedMessage(tabId: string): { promise: Promise<void>; cancel: () => void } { + const tabAgent = this.tabAgents.get(tabId); + if (!tabAgent) return { promise: Promise.resolve(), cancel: () => {} }; + if (tabAgent.messageQueue.length > 0) return { promise: Promise.resolve(), cancel: () => {} }; + + let listener: (() => void) | null = null; + const promise = new Promise<void>((resolve) => { + listener = resolve; + tabAgent.queueListeners.push(resolve); + }); + const cancel = () => { + if (listener) { + tabAgent.queueListeners = tabAgent.queueListeners.filter(l => l !== listener); + listener = null; + } + }; + return { promise, cancel }; + } + destroy(): void { this.configWatcher?.close(); this.skillsWatcher?.close(); diff --git a/packages/api/src/app.ts b/packages/api/src/app.ts index 37514c3..53cfb6c 100644 --- a/packages/api/src/app.ts +++ b/packages/api/src/app.ts @@ -16,7 +16,7 @@ export const app = new Hono(); app.use( "*", cors({ - origin: "http://localhost:5173", + origin: (origin) => origin || "*", credentials: true, allowHeaders: ["Content-Type", "Authorization"], allowMethods: ["GET", "POST", "PATCH", "PUT", "DELETE", "OPTIONS"], @@ -43,6 +43,7 @@ app.post("/chat", async (c) => { modelId?: unknown; reasoningEffort?: unknown; workingDirectory?: unknown; + queueId?: unknown; }>(); const { tabId, message } = body; @@ -55,7 +56,9 @@ app.post("/chat", async (c) => { } if (agentManager.getTabStatus(tabId) === "running") { - return c.json({ error: "agent is already running for this tab" }, 409); + const queueId = typeof body.queueId === "string" ? body.queueId : undefined; + const { messageId } = agentManager.queueMessage(tabId, message, queueId); + return c.json({ status: "queued", messageId }); } const keyId = typeof body.keyId === "string" ? body.keyId : undefined; @@ -74,6 +77,18 @@ app.post("/chat", async (c) => { }); app.route("/config", configRoutes); + +app.post("/chat/cancel", async (c) => { + const body = await c.req.json(); + if (typeof body.tabId !== "string" || typeof body.messageId !== "string") { + return c.json({ error: "tabId and messageId are required strings" }, 400); + } + const tabId = body.tabId; + const messageId = body.messageId; + const cancelled = agentManager.cancelQueuedMessage(tabId, messageId); + return c.json({ success: cancelled }); +}); + app.route("/skills", skillsRoutes); app.route("/models", modelsRoutes); app.route("/tabs", tabsRoutes); diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index c2c5880..b4c0eec 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -11,6 +11,7 @@ import type { AgentEvent, AgentStatus, ChatMessage, + QueueCallbacks, ToolCall, ToolResult, } from "../types/index.js"; @@ -72,9 +73,11 @@ export class Agent { messages: ChatMessage[] = []; private config: AgentConfig; + private queueCallbacks?: QueueCallbacks; - constructor(config: AgentConfig) { + constructor(config: AgentConfig, queueCallbacks?: QueueCallbacks) { this.config = config; + this.queueCallbacks = queueCallbacks; } private async executeToolWithStreaming( @@ -185,11 +188,12 @@ export class Agent { } try { - const execPromise = tool.execute(tc.arguments, { - onOutput: (data: string, stream: "stdout" | "stderr") => { - shellOutputQueue.push({ data, stream }); - }, - }); + const execPromise = tool.execute(tc.arguments, { + onOutput: (data: string, stream: "stdout" | "stderr") => { + shellOutputQueue.push({ data, stream }); + }, + queueCallbacks: this.queueCallbacks, + }); const rawResult = await execPromise; return { @@ -401,15 +405,30 @@ export class Agent { } } - // Drain any remaining shell output emitted before we read the result - while (shellOutputQueue.length > 0) { - const item = shellOutputQueue.shift()!; - yield { type: "shell-output", data: item.data, stream: item.stream }; + // Drain any remaining shell output emitted before we read the result + while (shellOutputQueue.length > 0) { + const item = shellOutputQueue.shift()!; + yield { type: "shell-output", data: item.data, stream: item.stream }; + } + + // Check for queued user messages and append them to the tool result + let finalToolResult = toolResult; + if (this.queueCallbacks) { + const queuedMsgs = this.queueCallbacks.dequeueMessages(); + if (queuedMsgs.length > 0) { + const userMessages = queuedMsgs + .map((m) => m.message) + .join("\n---\n"); + finalToolResult = { + ...toolResult, + result: `${toolResult.result}\n\n[USER INTERRUPT]\nThe user has sent you message(s) while you were working. You MUST address these before continuing with your current task:\n\n${userMessages}`, + }; } + } - stepToolResults.push(toolResult); - allToolResults.push(toolResult); - yield { type: "tool-result", toolResult }; + stepToolResults.push(finalToolResult); + allToolResults.push(finalToolResult); + yield { type: "tool-result", toolResult: finalToolResult }; } // Add tool results back to step messages so LLM can see them @@ -431,14 +450,29 @@ export class Agent { } } - const assistantMessage: ChatMessage = { - role: "assistant", - content: finalText, - toolCalls: allToolCalls.length > 0 ? allToolCalls : undefined, - toolResults: allToolResults.length > 0 ? allToolResults : undefined, - }; - this.messages.push(assistantMessage); - yield { type: "done", message: assistantMessage }; + const assistantMessage: ChatMessage = { + role: "assistant", + content: finalText, + toolCalls: allToolCalls.length > 0 ? allToolCalls : undefined, + toolResults: allToolResults.length > 0 ? allToolResults : undefined, + }; + this.messages.push(assistantMessage); + + // Drain any remaining queued messages that arrived after the last tool call + if (this.queueCallbacks) { + const remaining = this.queueCallbacks.dequeueMessages(); + if (remaining.length > 0) { + // These messages arrived too late to be injected into a tool result. + // Append them as a user message to the conversation so they're not lost. + const userMessages = remaining.map(m => m.message).join("\n---\n"); + this.messages.push({ + role: "user", + content: userMessages, + }); + } + } + + yield { type: "done", message: assistantMessage }; } catch (err) { const errorMsg = formatError(err, this.config); yield { type: "error", error: errorMsg }; diff --git a/packages/core/src/tools/retrieve.ts b/packages/core/src/tools/retrieve.ts index 93f4c89..8e14a96 100644 --- a/packages/core/src/tools/retrieve.ts +++ b/packages/core/src/tools/retrieve.ts @@ -1,5 +1,5 @@ import { z } from "zod"; -import type { ToolDefinition } from "../types/index.js"; +import type { ToolDefinition, ToolExecuteContext } from "../types/index.js"; export interface RetrieveCallbacks { getResult( @@ -24,11 +24,33 @@ export function createRetrieveTool(callbacks: RetrieveCallbacks): ToolDefinition parameters: z.object({ agent_id: z.string().describe("The agent_id returned by a previous summon call."), }), - execute: async (args: Record<string, unknown>): Promise<string> => { + execute: async (args: Record<string, unknown>, context?: ToolExecuteContext): Promise<string> => { const agentId = args.agent_id as string; + const queueCallbacks = context?.queueCallbacks; try { - const outcome = await callbacks.getResult(agentId); + let outcome: { status: "done"; result: string } | { status: "error"; error: string }; + + if (queueCallbacks) { + const childPromise = callbacks.getResult(agentId); + const { promise: queuePromise, cancel: cancelQueueWait } = queueCallbacks.waitForQueuedMessage(); + const queueSignal = queuePromise.then(() => "QUEUE_INTERRUPT" as const); + + const raceResult = await Promise.race([childPromise, queueSignal]); + + if (raceResult === "QUEUE_INTERRUPT") { + const queuedMsgs = queueCallbacks.dequeueMessages(); + const userMessages = queuedMsgs.map((m) => m.message).join("\n---\n"); + return `The subagent (agent_id: ${agentId}) has not completed its task yet. You will need to call retrieve with this agent_id again later to get the result.\n\n[USER INTERRUPT]\nThe user has sent you message(s) while you were working. You MUST address these before continuing with your current task:\n\n${userMessages}`; + } + + // Child finished first — clean up the queue listener + cancelQueueWait(); + outcome = raceResult; + } else { + outcome = await callbacks.getResult(agentId); + } + if (outcome.status === "done") { return ["<agent_result>", outcome.result, "</agent_result>"].join("\n"); } diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index 8f80b08..be37674 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -47,12 +47,16 @@ export type AgentEvent = keyId: string | null; modelId: string | null; parentTabId: string | null; - }; + } + | { type: "message-queued"; tabId: string; messageId: string; message: string } + | { type: "message-consumed"; tabId: string; messageIds: string[] } + | { type: "message-cancelled"; tabId: string; messageId: string }; // ─── Tool Types ────────────────────────────────────────────────── export interface ToolExecuteContext { onOutput?: (data: string, stream: "stdout" | "stderr") => void; + queueCallbacks?: QueueCallbacks; } export interface ToolDefinition { @@ -147,6 +151,19 @@ export interface ConfigError { message: string; } +// ─── Message Queue Types ───────────────────────────────────────── + +export interface QueuedMessage { + id: string; + message: string; + timestamp: number; +} + +export interface QueueCallbacks { + dequeueMessages: () => QueuedMessage[]; + waitForQueuedMessage: () => { promise: Promise<void>; cancel: () => void }; +} + // ─── Agent Definition Types ────────────────────────────────────── export interface AgentModelEntry { diff --git a/packages/frontend/src/lib/components/ChatInput.svelte b/packages/frontend/src/lib/components/ChatInput.svelte index 21c85fe..dac7a3a 100644 --- a/packages/frontend/src/lib/components/ChatInput.svelte +++ b/packages/frontend/src/lib/components/ChatInput.svelte @@ -3,7 +3,6 @@ import { tabStore } from "../tabs.svelte.js"; let inputEl: HTMLInputElement | undefined; let inputValue = $state(""); -const isDisabled = $derived((tabStore.activeTab?.agentStatus ?? "idle") === "running"); $effect(() => { inputEl?.focus(); @@ -18,7 +17,7 @@ function handleKeydown(e: KeyboardEvent) { function submit() { const text = inputValue.trim(); - if (!text || isDisabled) return; + if (!text) return; inputValue = ""; tabStore.sendMessage(text); } @@ -36,7 +35,7 @@ function submit() { <button type="button" class="btn btn-primary" - disabled={isDisabled || !inputValue.trim()} + disabled={!inputValue.trim()} onclick={submit} > Send diff --git a/packages/frontend/src/lib/components/ChatMessage.svelte b/packages/frontend/src/lib/components/ChatMessage.svelte index 308e6ba..c6b6034 100644 --- a/packages/frontend/src/lib/components/ChatMessage.svelte +++ b/packages/frontend/src/lib/components/ChatMessage.svelte @@ -1,13 +1,26 @@ <script lang="ts"> import { appSettings } from "../settings.svelte.js"; +import { tabStore } from "../tabs.svelte.js"; import type { ChatMessage } from "../types.js"; import MarkdownRenderer from "./MarkdownRenderer.svelte"; import ToolCallDisplay from "./ToolCallDisplay.svelte"; -const { message }: { message: ChatMessage } = $props(); +const { message, tabId }: { message: ChatMessage; tabId?: string } = $props(); const isUser = $derived(message.role === "user"); const isSystem = $derived(message.role === "system"); + +// Check if this message is queued: its id starts with "queued-" +const queuedMessageId = $derived( + isUser && message.id.startsWith("queued-") ? message.id.slice("queued-".length) : null, +); +const isQueued = $derived(queuedMessageId !== null); + +function cancelQueued() { + if (tabId && queuedMessageId) { + void tabStore.cancelQueuedMessage(tabId, queuedMessageId); + } +} </script> {#if isSystem} @@ -21,7 +34,7 @@ const isSystem = $derived(message.role === "system"); </div> </div> {:else} -<div class="chat chat-start mb-2 [&>.chat-bubble]:max-w-full"> +<div class="chat chat-start mb-2 [&>.chat-bubble]:max-w-full {isQueued ? 'opacity-60' : ''}"> <div class="chat-bubble break-words {isUser ? 'chat-bubble-primary w-fit' : 'bg-transparent w-full'}"> {#if message.thinking} <div class="collapse collapse-arrow mb-2 p-1"> @@ -43,5 +56,17 @@ const isSystem = $derived(message.role === "system"); <span class="inline-block w-1.5 h-4 bg-current animate-pulse ml-0.5 align-middle rounded-sm"></span> {/if} </div> + {#if isQueued} + <div class="flex items-center gap-1 mt-0.5 ml-1"> + <span class="badge badge-ghost badge-xs text-base-content/40">queued</span> + <button + class="btn btn-xs btn-ghost text-base-content/40 hover:text-error px-1 min-h-0 h-auto" + onclick={cancelQueued} + title="Cancel queued message" + > + ✕ + </button> + </div> + {/if} </div> {/if} diff --git a/packages/frontend/src/lib/components/ChatPanel.svelte b/packages/frontend/src/lib/components/ChatPanel.svelte index 8c92e4f..82bd4ee 100644 --- a/packages/frontend/src/lib/components/ChatPanel.svelte +++ b/packages/frontend/src/lib/components/ChatPanel.svelte @@ -8,6 +8,7 @@ let userScrolledUp = $state(false); let isAutoScrolling = false; const messages = $derived(tabStore.activeTab?.messages ?? []); +const activeTabId = $derived(tabStore.activeTab?.id); function isNearBottom(el: HTMLElement): boolean { return el.scrollHeight - el.scrollTop - el.clientHeight < 64; @@ -60,7 +61,7 @@ $effect(() => { </div> {/if} {#each messages as message (message.id)} - <ChatMessageComponent {message} /> + <ChatMessageComponent {message} tabId={activeTabId} /> {/each} </div> diff --git a/packages/frontend/src/lib/components/ModelSelector.svelte b/packages/frontend/src/lib/components/ModelSelector.svelte index 21364f9..5949e71 100644 --- a/packages/frontend/src/lib/components/ModelSelector.svelte +++ b/packages/frontend/src/lib/components/ModelSelector.svelte @@ -175,7 +175,7 @@ const modelCache = new Map<string, string[]>(); class="input input-bordered input-sm font-mono text-xs flex-1" placeholder="default (project root)" value={workingDirectory ?? ""} - oninput={(e) => { + onchange={(e) => { const val = e.currentTarget.value.trim(); onWorkingDirectoryChange(val || null); }} @@ -202,7 +202,19 @@ const modelCache = new Map<string, string[]>(); </button> <button class="btn btn-xs {mode === 'agent' ? 'btn-primary' : 'btn-ghost'}" - onclick={() => { modeOverride = "agent"; fetchAgents(); }} + onclick={async () => { + modeOverride = "agent"; + await fetchAgents(); + // Re-apply the active agent's settings (including cwd) + const current = agents.find(a => a.slug === activeAgentSlug); + const agentToApply = current ?? agents[0] ?? null; + if (agentToApply) { + onAgentChange(agentToApply); + // Force-update the input since the prop may not change (already set) + const cwdEl = document.getElementById("cwd-input") as HTMLInputElement | null; + if (cwdEl) cwdEl.value = agentToApply.cwd ?? ""; + } + }} > Agent </button> @@ -253,7 +265,11 @@ const modelCache = new Map<string, string[]>(); {#each agents as agent (agent.slug + ":" + agent.scope)} <button class="w-full text-left rounded-lg px-3 py-2 transition-colors {activeAgentSlug === agent.slug ? 'bg-primary text-primary-content' : 'bg-base-300 hover:bg-base-200'}" - onclick={() => onAgentChange(agent)} + onclick={() => { + onAgentChange(agent); + const cwdEl = document.getElementById("cwd-input") as HTMLInputElement | null; + if (cwdEl) cwdEl.value = agent.cwd ?? ""; + }} > <div class="flex items-center justify-between gap-2"> <span class="font-medium text-sm">{agent.name}</span> diff --git a/packages/frontend/src/lib/config.ts b/packages/frontend/src/lib/config.ts index c8f4d9f..0565367 100644 --- a/packages/frontend/src/lib/config.ts +++ b/packages/frontend/src/lib/config.ts @@ -1,5 +1,15 @@ const STORAGE_KEY = "dispatch-api-url"; -const DEFAULT_API_BASE = import.meta.env.VITE_API_URL ?? "http://localhost:3000"; + +function getDefaultApiBase(): string { + if (import.meta.env.VITE_API_URL) return import.meta.env.VITE_API_URL; + // Derive from current page hostname so it works over Tailscale/LAN + if (typeof window !== "undefined" && window.location.hostname !== "localhost") { + return `http://${window.location.hostname}:3000`; + } + return "http://localhost:3000"; +} + +const DEFAULT_API_BASE = getDefaultApiBase(); function loadApiBase(): string { if (typeof localStorage !== "undefined") { diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts index 7722a7e..d186ccb 100644 --- a/packages/frontend/src/lib/tabs.svelte.ts +++ b/packages/frontend/src/lib/tabs.svelte.ts @@ -7,12 +7,20 @@ import type { DebugInfo, LogEntry, PermissionPrompt, + QueuedMessage, TaskItem, } from "./types.js"; import { wsClient } from "./ws.svelte.js"; -function generateId() { - return crypto.randomUUID(); +function generateId(): string { + if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") { + return crypto.randomUUID(); + } + // Fallback for non-secure contexts (HTTP over Tailscale/LAN) + return "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(/[xy]/g, (c) => { + const r = (Math.random() * 16) | 0; + return (c === "x" ? r : (r & 0x3) | 0x8).toString(16); + }); } function makeDebugInfo(overrides: Partial<DebugInfo> = {}): DebugInfo { @@ -44,6 +52,8 @@ export interface Tab { agentScope: string | null; /** Custom working directory override for this tab */ workingDirectory: string | null; + /** Messages queued to be sent once the agent finishes its current run */ + queuedMessages: QueuedMessage[]; } function createTabStore() { @@ -54,6 +64,10 @@ function createTabStore() { let configReloaded = $state(false); let isConnected = $state(false); + // Track message IDs that were consumed before the POST /chat response arrived. + // Keyed by queueId — if consumed before we process the response, we skip the queued state. + const recentlyConsumedIds = new Set<string>(); + // Clear any stale listeners from HMR reloads, then register wsClient.clearCallbacks(); wsClient.onEvent((event) => { @@ -105,6 +119,7 @@ function createTabStore() { agentSlug: null, agentScope: null, workingDirectory: null, + queuedMessages: [], }; tabs = [...tabs, tab]; activeTabId = id; @@ -196,6 +211,7 @@ function createTabStore() { agentSlug: null, agentScope: null, workingDirectory: null, + queuedMessages: [], }; tabs = [...tabs, newTab]; activeTabId = agentId; @@ -449,27 +465,132 @@ function createTabStore() { }; // Only add if we don't already have this tab if (!getTabById(newTabEvent.id)) { - const tab: Tab = { - id: newTabEvent.id, - title: newTabEvent.title, - messages: [], - agentStatus: "running", - keyId: newTabEvent.keyId ?? null, - modelId: newTabEvent.modelId ?? null, - reasoningEffort: "max", - currentAssistantId: null, - tasks: [], - injectedSkills: [], - parentTabId: newTabEvent.parentTabId ?? null, - persistent: newTabEvent.parentTabId == null, - agentSlug: null, - agentScope: null, - workingDirectory: null, - }; + const tab: Tab = { + id: newTabEvent.id, + title: newTabEvent.title, + messages: [], + agentStatus: "running", + keyId: newTabEvent.keyId ?? null, + modelId: newTabEvent.modelId ?? null, + reasoningEffort: "max", + currentAssistantId: null, + tasks: [], + injectedSkills: [], + parentTabId: newTabEvent.parentTabId ?? null, + persistent: newTabEvent.parentTabId == null, + agentSlug: null, + agentScope: null, + workingDirectory: null, + queuedMessages: [], + }; tabs = [...tabs, tab]; } break; } + case "message-queued": { + if (!tabId) break; + const mqEvent = event as AgentEvent & { tabId: string; messageId: string; message: string }; + const mqTab = getTabById(tabId); + if (!mqTab) break; + // Only add to queuedMessages if not already tracked (might have been added + // optimistically by sendMessage using the same queueId) + const alreadyQueued = mqTab.queuedMessages.some((m) => m.id === mqEvent.messageId); + if (!alreadyQueued) { + // Message came from another client/session — add it fresh + const qm: QueuedMessage = { + id: mqEvent.messageId, + message: mqEvent.message, + timestamp: Date.now(), + }; + updateTab(tabId, { queuedMessages: [...mqTab.queuedMessages, qm] }); + // Also add as a user chat message if not already present + const tabAfterQm = getTabById(tabId); + const existingMsg = tabAfterQm?.messages.find( + (m) => + m.id === `queued-${mqEvent.messageId}` || m.id === mqEvent.messageId, + ); + if (!existingMsg) { + const userMsg: ChatMessage = { + id: `queued-${mqEvent.messageId}`, + role: "user", + content: [{ type: "text", text: mqEvent.message }], + }; + updateTab(tabId, { messages: [...(tabAfterQm?.messages ?? []), userMsg] }); + } + } + // If alreadyQueued, the optimistic update already put everything in place with the + // correct `queued-${messageId}` id — nothing more to do. + break; + } + case "message-consumed": { + if (!tabId) break; + const mcEvent = event as AgentEvent & { tabId: string; messageIds: string[] }; + const mcTab = getTabById(tabId); + if (!mcTab) break; + // Track recently consumed IDs so sendMessage can detect early consumption + for (const id of mcEvent.messageIds) { + recentlyConsumedIds.add(id); + setTimeout(() => recentlyConsumedIds.delete(id), 10000); + } + updateTab(tabId, { + queuedMessages: mcTab.queuedMessages.filter((m) => !mcEvent.messageIds.includes(m.id)), + }); + // Split the current assistant message: finalize it, then insert + // the consumed user messages after it. Subsequent streaming events + // will create a NEW assistant message block below. + const currentAssistantId = mcTab.currentAssistantId; + updateMessages(tabId, (msgs) => { + // Extract consumed messages + const consumed: ChatMessage[] = []; + const rest: ChatMessage[] = []; + for (const m of msgs) { + if (m.id.startsWith("queued-")) { + const queuedId = m.id.slice(7); + if (mcEvent.messageIds.includes(queuedId)) { + consumed.push({ ...m, id: queuedId }); + continue; + } + } + rest.push(m); + } + if (consumed.length === 0) return msgs; + + // Mark the current assistant message as done streaming + const result = rest.map((m) => + m.id === currentAssistantId + ? { ...m, isStreaming: false } + : m, + ); + + // Insert consumed messages right after the current assistant message + let insertIdx = result.length; + for (let i = result.length - 1; i >= 0; i--) { + if (result[i].id === currentAssistantId) { + insertIdx = i + 1; + break; + } + } + result.splice(insertIdx, 0, ...consumed); + return result; + }); + // Clear currentAssistantId so the next streaming event creates + // a new assistant message block after the user's message + updateTab(tabId, { currentAssistantId: null }); + break; + } + case "message-cancelled": { + if (!tabId) break; + const cancelEvent = event as AgentEvent & { tabId: string; messageId: string }; + const cancelTab = getTabById(tabId); + if (!cancelTab) break; + updateTab(tabId, { + queuedMessages: cancelTab.queuedMessages.filter((m) => m.id !== cancelEvent.messageId), + messages: cancelTab.messages.filter( + (m) => !(m.role === "user" && m.id === `queued-${cancelEvent.messageId}`), + ), + }); + break; + } } } @@ -614,9 +735,22 @@ function createTabStore() { role: "user", content: [{ type: "text", text }], }; - updateTab(tab.id, { messages: [...tab.messages, userMsg] }); - // Generate title from first user message + // If the agent is currently running, we expect the POST to be queued. + // Optimistically assign the queued- prefix and add to queuedMessages BEFORE + // the POST so that the WS "message-queued" event (which may arrive before + // the HTTP response) can match the existing chat message instead of creating + // a duplicate. + const isRunning = tab.agentStatus === "running"; + let queueId: string | null = null; + if (isRunning) { + queueId = generateId(); + userMsg.id = `queued-${queueId}`; + // Pre-populate queuedMessages so WS event finds it immediately + tab.queuedMessages = [...tab.queuedMessages, { id: queueId, message: text, timestamp: Date.now() }]; + } + + updateTab(tab.id, { messages: [...tab.messages, userMsg] }); // Generate title from first user message if (tab.messages.length === 0 || (tab.messages.length === 1 && tab.title === "New Tab")) { const titleText = text.length > 50 ? `${text.slice(0, 47)}...` : text; updateTab(tab.id, { title: titleText }); @@ -670,10 +804,25 @@ function createTabStore() { ...(tab.modelId ? { modelId: tab.modelId } : {}), reasoningEffort: tab.reasoningEffort, ...(tab.workingDirectory ? { workingDirectory: tab.workingDirectory } : {}), + ...(queueId ? { queueId } : {}), }), }); if (!res.ok) { const body = await res.text(); + // Rollback optimistic queued state on error + if (queueId) { + const currentTab = getTabById(tab.id); + if (currentTab) { + updateTab(tab.id, { + queuedMessages: currentTab.queuedMessages.filter((m) => m.id !== queueId), + }); + } + updateMessages(tab.id, (msgs) => + msgs.map((m) => + m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m, + ), + ); + } const errMsg: ChatMessage = { id: generateId(), role: "assistant", @@ -686,8 +835,51 @@ function createTabStore() { }), }; updateTab(tab.id, { messages: [...(getTabById(tab.id)?.messages ?? []), errMsg] }); + } else { + const responseData = (await res.json()) as { status: string; messageId?: string }; + if (responseData.status === "queued" && responseData.messageId) { + // The backend confirmed the message was queued with the ID we sent (queueId). + // If the message was already consumed before we got here (super-fast agent), + // clean up the optimistic queued state. + if (queueId && recentlyConsumedIds.has(queueId)) { + recentlyConsumedIds.delete(queueId); + // queuedMessages and the queued- prefix have already been cleaned up by + // the message-consumed handler. Nothing more to do. + } + // Otherwise everything is already in place from the optimistic update above. + } else if (responseData.status === "ok") { + // Agent wasn't running after all — undo the optimistic queued state if we set it. + if (queueId) { + const currentTab = getTabById(tab.id); + if (currentTab) { + updateTab(tab.id, { + queuedMessages: currentTab.queuedMessages.filter((m) => m.id !== queueId), + }); + } + // Restore the message to a normal (non-queued) ID + updateMessages(tab.id, (msgs) => + msgs.map((m) => + m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m, + ), + ); + } + } } } catch (err) { + // Rollback optimistic queued state on network error + if (queueId) { + const currentTab = getTabById(tab.id); + if (currentTab) { + updateTab(tab.id, { + queuedMessages: currentTab.queuedMessages.filter((m) => m.id !== queueId), + }); + } + updateMessages(tab.id, (msgs) => + msgs.map((m) => + m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m, + ), + ); + } const errMsg: ChatMessage = { id: generateId(), role: "assistant", @@ -745,8 +937,8 @@ function createTabStore() { if (!tab) return; if (!agent) { - // Switch back to manual mode — clear agent - updateTab(tab.id, { agentSlug: null, agentScope: null }); + // Switch back to manual mode — clear agent and reset working directory + updateTab(tab.id, { agentSlug: null, agentScope: null, workingDirectory: null }); return; } @@ -810,6 +1002,27 @@ function createTabStore() { } } + async function cancelQueuedMessage(tabId: string, messageId: string): Promise<void> { + const tab = getTabById(tabId); + if (tab) { + updateTab(tabId, { + queuedMessages: tab.queuedMessages.filter((m) => m.id !== messageId), + messages: tab.messages.filter( + (m) => !(m.role === "user" && m.id === `queued-${messageId}`), + ), + }); + } + try { + await fetch(`${config.apiBase}/chat/cancel`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ tabId, messageId }), + }); + } catch { + // ignore + } + } + function copyConversation(): string { const tab = getActiveTab(); if (!tab) return ""; @@ -871,6 +1084,7 @@ function createTabStore() { switchTab, closeTab, sendMessage, + cancelQueuedMessage, changeModel, setKey, setAgent, diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts index 4523828..79b2867 100644 --- a/packages/frontend/src/lib/types.ts +++ b/packages/frontend/src/lib/types.ts @@ -71,7 +71,10 @@ export type AgentEvent = keyId: string | null; modelId: string | null; parentTabId: string | null; - }; + } + | { type: "message-queued"; tabId: string; messageId: string; message: string } + | { type: "message-consumed"; tabId: string; messageIds: string[] } + | { type: "message-cancelled"; tabId: string; messageId: string }; export interface TaskItem { id: string; @@ -102,6 +105,12 @@ export interface KeyInfo { exhaustedAt: number | null; } +export interface QueuedMessage { + id: string; + message: string; + timestamp: number; +} + export interface LogEntry { id: string; permission: string; |
