import { Agent, type AgentEvent, type AgentModelEntry, type AgentSkillMapping, type AgentStatus, appendChunks, appendEventToChunks, BackgroundShellStore, BackgroundTranscriptStore, buildCompactionRequest, buildSummaryTurnText, type ChatMessage, type Chunk, type ClaudeAccount, clearSpillForTab, configToRuleset, createConfigWatcher, createKeyUsageTool, createListFilesTool, createLspTool, createReadFileSliceTool, createReadFileTool, createReadTabTool, createRetrieveTool, createRunShellTool, createSearchCodeTool, createSendToTabTool, createSkillsWatcher, createSummonTool, createTab, createTaskListTool, createWebSearchTool, createWriteFileTool, createYoutubeTranscribeTool, type DispatchConfig, expandAgentToolNames, explodeTurn, explodeUserText, GLOBAL_AGENTS_DIR, getAgentDirPaths, getChunksForTab, getClaudeAccountsFromDB, getMessagesForTab, getSetting, getTab, getUsageStatsForTab, groupRowsToMessages, LspManager, listOpenTabs, loadAgent, loadAgents, loadConfig, loadSkills, ModelRegistry, type QueuedMessage, type ReasoningEffort, type ResolvedLspServer, refreshAccountCredentials, refreshAccountCredentialsAsync, rekeyChunks, reportDiagnostics, resolveApiKey, resolveServersFromConfig, resolveTabPrefix, type SkillDefinition, type SystemChunkKind, shortestUniquePrefix, type TabResolution, type TabStatusSnapshot, TaskList, toAvailableSubagents, toAvailableUserAgents, type UsageData, type UsageStats, type UserContentPart, validateConfig, watchDirConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; import { setConfigGetter } from "./routes/config.js"; import { setAccountsGetter, setModelsGetter } from "./routes/models.js"; import { setSkillsGetter } from "./routes/skills.js"; import { setTabsAgentManager } from "./routes/tabs.js"; const TOOL_DESCRIPTIONS: Record = { read_file: "Read the contents of a file", read_file_slice: "Read a character-range slice of a single line in a file (for inspecting long lines that read_file truncated)", list_files: "List files and directories", write_file: "Write content to a file (creates parent directories if needed)", run_shell: "Execute shell commands in the working directory (bash). Returns stdout, stderr, and exit code. Set background=true to run in the background and get a job_id for later retrieval. Do NOT run destructive or irreversible commands unless the user explicitly requests them.", search_code: "Search the codebase by query using the 'cs' code search engine (relevance-ranked, structure-aware). Returns the most relevant files first with matching snippets and line numbers. Better than grep/find for exploratory 'where is X / how does Y work' searches; use run_shell with rg for exhaustive exact-match lists.", todo: "Create/maintain a todo list to plan and track work. Declarative whole-list write: send the entire list in `todos` each call (it replaces the previous list). Statuses: pending, in_progress, completed, cancelled.", key_usage: "Report current usage levels for configured API keys: provider, active/exhausted status, remaining rate-limit headroom and reset times per window (5-hour, weekly, monthly where available), and whether the figures are live or cached. Pass key_id for one key; omit to report all. Supported for anthropic and opencode-go keys.", summon: "Spawn a child agent to work on a task independently. By default blocks until the child finishes. Set background=true to return immediately with an agent_id for later retrieval.", retrieve: "Wait for a background task to finish and get its result (blocking). Pass the job_id or agent_id.", web_search: "Search the web and optionally scrape full page content from results.", youtube_transcribe: "Fetch the transcript/subtitles for a YouTube video. Set background=true to start in the background and get a job_id for later retrieval.", send_to_tab: "Send a message to another tab (agent) by its short ID, as shown in the tab bar. Fire-and-forget: it queues/wakes the target and returns immediately without waiting for a reply. Do NOT sleep, poll, or run commands to wait — if the target replies it will wake you with a new message in a later turn; if you are only waiting, end your turn.", read_tab: "Read another tab (agent)'s most recent completed response by its short ID. Returns a non-blocking snapshot; if the target is still running you get its previous completed turn. Use after send_to_tab to collect a reply.", lsp: "Query the configured Language Server (e.g. luau-lsp for Roblox Luau) about a file: diagnostics, hover, definition, references, or documentSymbol. Line/character are 1-based.", }; /** * Maximum number of CONSECUTIVE agent-to-agent auto-wakes a tab will accept * before it stops auto-responding and waits for a human. Each `send_to_tab` * that would wake an idle tab consumes one unit; any human-originated message * (e.g. via `POST /chat`) refills the budget to full. This bounds runaway * agent ping-pong loops (A wakes B wakes A ...) that would otherwise spend * tokens unbounded with no human in the loop. See notes/plan-tab-comm.md. */ const MAX_AGENT_AUTO_WAKES = 6; /** * Cap on how many OTHER files' LSP error blocks are appended to a write_file * result, after the written file's own errors. Bounds context spend when a * single edit surfaces project-wide diagnostics. Mirrors opencode's * MAX_PROJECT_DIAGNOSTICS_FILES. */ const MAX_LSP_OTHER_FILE_DIAGNOSTICS = 5; const DEFAULT_SYSTEM_PROMPT = "You are Dispatch, an agent designed to help with any task that the user asks for. Be helpful and concise."; const TASK_MANAGEMENT_GUIDANCE = ` ## Task Management You have access to the \`todo\` tool to plan and track tasks. Use it VERY frequently so the user can see your plan and progress in real time. It is also a powerful planning aid: breaking larger work into smaller steps keeps you from forgetting important tasks — that is unacceptable. The \`todo\` tool is DECLARATIVE: every call sends the ENTIRE list in the \`todos\` parameter and replaces the previous list. There are no ids and no per-item actions — to change one item, resend the whole list with that item updated. To clear the list, send an empty array. ### When to use - A task needs 3+ distinct steps, or benefits from planning - The user gives multiple tasks (numbered or comma-separated) or asks for a todo list - New instructions arrive — capture them as todos - You start a task — mark it in_progress (only one at a time) before working - You finish a task — mark it completed and add any follow-ups discovered ### When NOT to use - A single, straightforward task (or fewer than 3 trivial steps) - Purely informational or conversational requests - When tracking adds no organizational value ### States - pending — not started - in_progress — actively working (exactly ONE at a time) - completed — finished successfully - cancelled — no longer needed ### Rules - Send the full desired list every time; the tool replaces the stored list - Update status in real time; do NOT batch completions - Mark completed only after the work is actually done (including any required verification), never on intent - Keep exactly one in_progress while work remains; if blocked, keep it in_progress and add a follow-up todo describing the blocker ### Examples User: "Run the build and fix any type errors" Write the list, then work it: send [{content:"Run the build", status:"in_progress"}, {content:"Fix any type errors", status:"pending"}]. Run the build. If it surfaces 10 errors, resend the whole list — the build item completed, plus one item per error — then drive each to completed one at a time. User: "How do I print Hello World in Python?" No todo needed — this is a single informational question. User: "Rename getUser to fetchUser across the project" Send [{content:"Search for all occurrences of getUser", status:"in_progress"}, ...]. After the grep reveals the files, resend the whole list with one item per file, then work through them, resending the list as each flips to completed. `.trim(); /** * Returns true for OpenCode Go models served via the Anthropic-format * `/messages` endpoint (MiniMax M2.x, Qwen3.x Plus). See * https://opencode.ai/docs/go/#endpoints for the per-model endpoint table. */ function isOpencodeGoAnthropicModel(modelId: string): boolean { return modelId.startsWith("minimax-") || modelId.startsWith("qwen"); } function buildSystemPrompt(toolNames: string[], basePrompt?: string): string { const base = basePrompt || DEFAULT_SYSTEM_PROMPT; const toolList = toolNames .filter((name) => TOOL_DESCRIPTIONS[name]) .map((name) => `- ${name}: ${TOOL_DESCRIPTIONS[name]}`) .join("\n"); if (!toolList) return base; const hasTodo = toolNames.includes("todo"); const hasSummon = toolNames.includes("summon"); let prompt = `${base}\n\nYou have access to the following tools:\n\n${toolList}\n\nWhen asked to work with files, use these tools. Always confirm what you did after completing an action.`; if (hasTodo) { prompt += `\n\n${TASK_MANAGEMENT_GUIDANCE}`; } if (hasSummon) { prompt += '\n\nYou have pre-configured subagent types. Use summon(agent="slug", task="...") to delegate specialized work to a subagent. Use list_files and read_file to inspect available agent definitions.'; } return prompt; } interface TabAgent { agent: Agent | null; status: AgentStatus; keyId: string | null; modelId: string | null; taskList: TaskList; _lastPermKey?: string; /** Ordered key+model fallback hierarchy from the agent definition. */ agentModels?: AgentModelEntry[]; /** Abort controller for cancelling a running agent. */ abortController?: AbortController; /** For child agents: resolves when the agent finishes its task. */ completionResolve?: ( result: { status: "done"; result: string } | { status: "error"; error: string }, ) => void; completionPromise?: Promise< { status: "done"; result: string } | { status: "error"; error: string } >; /** Accumulated final text output from the child agent. */ finalOutput?: string; /** Tools whitelist for child agents (set by summon). */ toolsOverride?: string[]; /** Working directory override for child agents. */ workingDirectoryOverride?: string; /** Queue of messages sent while the agent is running. */ messageQueue: QueuedMessage[]; /** Callbacks to wake up blocking tools waiting for queued messages. */ queueListeners: Array<() => void>; /** Store for shell commands backgrounded due to user interrupt. */ shellStore: BackgroundShellStore; /** Store for transcript requests backgrounded due to user interrupt. */ transcriptStore: BackgroundTranscriptStore; /** * In-flight assistant chunks for the active turn. `null` when no turn is * running. Out-of-band system events (config-reload, cancel, etc.) push * onto this list when present; it is exploded into chunk rows when the * turn flushes. */ currentChunks: Chunk[] | null; /** * Opaque id of the in-flight assistant turn, used as the `currentAssistantId` * in the WS status snapshot so a reconnecting frontend can align its local * streaming message. (No longer a DB row id — the turn is many chunk rows.) */ currentAssistantId: string | null; /** * `turn_id` shared by the current turn's user message and assistant chunk * rows. Set at the start of `processMessage`, cleared when the turn ends. */ currentTurnId: string | null; /** * Remaining consecutive agent-to-agent auto-wakes this tab will accept * before requiring human intervention (see `MAX_AGENT_AUTO_WAKES`). * Refilled to the max by any human-originated `deliverMessage`; decremented * each time an agent-originated `send_to_tab` wakes this tab from idle. When * it hits 0, further agent messages are queued but do NOT start a turn. */ autoWakeBudget: number; /** * True while this tab is the SOURCE of an in-flight compaction. New * messages are queued (not started) until compaction settles so the * conversation can't mutate mid-summary. */ compacting?: boolean; } export class AgentManager { private tabAgents: Map = new Map(); private messageCount = 0; private eventListeners: Set<(event: AgentEvent & { tabId: string }) => void> = new Set(); private permissionManager: PermissionManager | undefined; private config: DispatchConfig; private skillsData: { skills: SkillDefinition[]; mappings: AgentSkillMapping[] }; private modelRegistry: ModelRegistry | null = null; private configWatcher: { close(): void } | null = null; private skillsWatcher: { close(): void } | null = null; private claudeAccounts: ClaudeAccount[] = []; /** * Process-wide owner of LSP client lifecycles. Servers are declared in the * `dispatch.toml` of a tab's effective working directory; clients are * spawned lazily per (root + server) and reused across tabs/turns. Shut * down in `destroy()`. */ private lspManager: LspManager = new LspManager(); /** * Cache of resolved LSP servers per working directory, so we parse each * directory's `dispatch.toml` `[lsp]` block once. Cleared wholesale on any * config hot-reload (the watcher fires for the root config; directory-level * configs are re-read on demand after a clear). */ private lspServersByDir: Map = new Map(); /** * One file watcher per distinct SUBDIRECTORY config we've cached in * `lspServersByDir`. The main `configWatcher` only watches the root + * global `dispatch.toml`; a tab whose effective working directory is a * subdirectory with its own `dispatch.toml` needs its cache entry cleared * when THAT file changes. Keyed by directory; closed on full reload (the * cache is dropped wholesale then) and in `destroy()`. */ private lspDirWatchers: Map = new Map(); /** Root working directory watched by `configWatcher` (constructor). */ private rootWorkingDirectory = ""; constructor(permissionManager?: PermissionManager) { this.permissionManager = permissionManager; const workingDirectory = process.env.DISPATCH_WORKING_DIR ?? process.cwd(); this.rootWorkingDirectory = workingDirectory; // Load initial config this.config = loadConfig(workingDirectory); const { errors } = validateConfig(this.config); if (errors.length > 0) { for (const err of errors) { console.warn(`dispatch: config validation warning [${err.path}]: ${err.message}`); } } // Initialize model registry + resolver if config has models and keys this._initModelRegistry(this.config); // Load initial skills this.skillsData = loadSkills(workingDirectory); // Discover Claude accounts this._refreshClaudeAccounts(); // Wire route getters setConfigGetter(() => this.config); setSkillsGetter(() => this.skillsData); setModelsGetter(() => this.modelRegistry); setAccountsGetter(() => this.claudeAccounts); setTabsAgentManager(() => this); // Set up hot-reload watchers this.configWatcher = createConfigWatcher(workingDirectory, (newConfig) => { this.config = newConfig; const { errors: newErrors } = validateConfig(newConfig); if (newErrors.length > 0) { for (const err of newErrors) { console.warn(`dispatch: config validation warning [${err.path}]: ${err.message}`); } } // Update model registry with new config this._initModelRegistry(newConfig); // LSP server config may have changed — drop the per-directory cache // so the next tool build re-reads each working directory's // `dispatch.toml` `[lsp]` block. this.lspServersByDir.clear(); // Tear down the per-subdirectory LSP watchers too; they are lazily // re-registered by `getLspServersForDir` as directories are re-cached. for (const watcher of this.lspDirWatchers.values()) watcher.close(); this.lspDirWatchers.clear(); // Re-discover Claude accounts: a config reload may accompany freshly // imported credentials, and (critically) lets a process that failed // account discovery at boot recover without a full restart. this._refreshClaudeAccounts(); // Invalidate cached agents so next message uses updated config for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); this.routeSystemEventToTab(tabId, "config-reload", "Configuration reloaded"); } }); this.skillsWatcher = createSkillsWatcher(workingDirectory, (result) => { this.skillsData = result; // Invalidate cached agents so next message uses updated skills for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); this.routeSystemEventToTab(tabId, "config-reload", "Skills reloaded"); } }); } private _refreshClaudeAccounts(): void { try { this.claudeAccounts = getClaudeAccountsFromDB(); if (this.claudeAccounts.length > 0) { console.log(`dispatch: discovered ${this.claudeAccounts.length} Claude account(s)`); } } catch (err) { console.warn( `dispatch: failed to discover Claude accounts: ${err instanceof Error ? err.message : String(err)}`, ); } } /** * Resolve (and cache) the LSP servers configured for a working directory. * * LSP config is resolved by `loadConfig`, which merges the HOME-directory * global `dispatch.toml` (`~/.config/dispatch/dispatch.toml`) underneath the * tab's effective working-directory `dispatch.toml` — local `[lsp.]` * entries override global ones sharing the same id, while global-only * servers stay active in every repository. We read+merge that config once * per directory and cache the resolved servers; the cache is cleared on * config hot-reload. Returns `[]` when neither config declares an `[lsp]` * block (the common case). */ private getLspServersForDir(dir: string): ResolvedLspServer[] { const cached = this.lspServersByDir.get(dir); if (cached) return cached; let servers: ResolvedLspServer[] = []; try { const dirConfig = loadConfig(dir); servers = resolveServersFromConfig(dirConfig.lsp); } catch (err) { console.warn( `dispatch: failed to load LSP config for ${dir}: ${err instanceof Error ? err.message : String(err)}`, ); servers = []; } this.lspServersByDir.set(dir, servers); // Hot-reload for SUBDIRECTORY configs: the root/global watcher in the // constructor does not cover a nested `dispatch.toml`. Register a // one-per-dir watcher the first time we cache a directory so editing // its config invalidates just this entry (and cached agents) without a // restart. The root working directory is already covered by // `configWatcher`, so skip it to avoid a redundant watch. this.ensureLspDirWatcher(dir); return servers; } /** * Register (once) a file watcher on `/dispatch.toml` so a change to a * subdirectory config invalidates that directory's LSP cache entry and * any cached agents. No-op for the root working directory (already watched * by `configWatcher`) and for directories already being watched. */ private ensureLspDirWatcher(dir: string): void { if (dir === this.rootWorkingDirectory) return; if (this.lspDirWatchers.has(dir)) return; const watcher = watchDirConfig(dir, () => { // Drop just this directory's resolved servers; the next tool build // re-reads (and re-merges global) for it. this.lspServersByDir.delete(dir); // Invalidate cached agents so the next message rebuilds tools with // the updated server set. for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); this.routeSystemEventToTab(tabId, "config-reload", "Configuration reloaded"); } }); this.lspDirWatchers.set(dir, watcher); } /** * Build the `onAfterWrite` hook for `createWriteFileTool` when the tab's * working directory has LSP servers configured. The hook touches the * just-written file through the LSP and returns a formatted diagnostics * block (the written file's errors first, then a small cap of other-file * errors) — opencode's diagnostics-on-write pattern. Returns `undefined` * when no server matches, so writes stay zero-overhead for non-LSP files. */ private buildAfterWriteHook( workingDirectory: string, servers: ResolvedLspServer[], ): ((absolutePath: string) => Promise) | undefined { if (servers.length === 0) return undefined; const manager = this.lspManager; return async (absolutePath: string): Promise => { if (!manager.hasServerForFile(absolutePath, servers)) return ""; await manager.touchFile({ file: absolutePath, root: workingDirectory, servers, mode: "document", }); const diagnostics = manager.getDiagnostics({ root: workingDirectory, servers, file: absolutePath, }); let output = ""; let otherFileCount = 0; for (const [file, issues] of Object.entries(diagnostics)) { const current = file === absolutePath; if (!current && otherFileCount >= MAX_LSP_OTHER_FILE_DIAGNOSTICS) continue; const block = reportDiagnostics(file, issues); if (!block) continue; if (current) { output += `${output ? "\n\n" : ""}LSP errors detected in this file, please fix:\n${block}`; } else { otherFileCount++; output += `${output ? "\n\n" : ""}LSP errors detected in other files:\n${block}`; } } return output; }; } private _initModelRegistry(config: DispatchConfig): void { if (config.keys) { if (this.modelRegistry) { this.modelRegistry.updateConfig(config.keys); } else { this.modelRegistry = new ModelRegistry(config.keys); } } else { this.modelRegistry = null; } } getPermissionManager(): PermissionManager | undefined { return this.permissionManager; } /** Get the TaskList for a specific tab (creates the tab entry if missing). */ getTaskList(tabId: string): TaskList { return this._getOrCreateTabAgent(tabId).taskList; } getClaudeAccounts(): ClaudeAccount[] { return this.claudeAccounts; } /** Get or create the TabAgent entry for a tab (without creating an Agent). */ private _getOrCreateTabAgent(tabId: string): TabAgent { let tabAgent = this.tabAgents.get(tabId); if (!tabAgent) { const taskList = new TaskList(); taskList.onChange((tasks) => { this.emit({ type: "task-list-update", tasks }, tabId); }); tabAgent = { agent: null, status: "idle", keyId: null, modelId: null, taskList, messageQueue: [], queueListeners: [], shellStore: new BackgroundShellStore(), transcriptStore: new BackgroundTranscriptStore(), currentChunks: null, currentAssistantId: null, currentTurnId: null, autoWakeBudget: MAX_AGENT_AUTO_WAKES, }; this.tabAgents.set(tabId, tabAgent); } return tabAgent; } private async getOrCreateAgentForTab( tabId: string, keyId?: string, modelId?: string, ): Promise { const tabAgent = this._getOrCreateTabAgent(tabId); // Determine effective override: use provided values, or fall back to stored per-tab values const effectiveKeyId = keyId ?? tabAgent.keyId; const effectiveModelId = modelId ?? tabAgent.modelId; // Read tool permission settings from DB (default: read=allow, edit=ask, bash=ask, summon=ask, web=ask, youtube=ask) const permRead = getSetting("perm_read") !== "ask"; const permEdit = getSetting("perm_edit") === "allow"; const permBash = getSetting("perm_bash") === "allow"; const permSummon = getSetting("perm_summon") === "allow"; const permUserAgent = getSetting("perm_user_agent") === "allow"; const permSendToTab = getSetting("perm_send_to_tab") === "allow"; const permReadTab = getSetting("perm_read_tab") === "allow"; const permWebSearch = getSetting("perm_web_search") === "allow"; const permSearchCode = getSetting("perm_search_code") === "allow"; const permKeyUsage = getSetting("perm_key_usage") === "allow"; const permYoutubeTranscribe = getSetting("perm_youtube_transcribe") === "allow"; const permLsp = getSetting("perm_lsp") === "allow"; const sysPrompt = getSetting("system_prompt") ?? ""; const permKey = `${permRead}:${permEdit}:${permBash}:${permSummon}:${permUserAgent}:${permSendToTab}:${permReadTab}:${permWebSearch}:${permYoutubeTranscribe}:${permSearchCode}:${permKeyUsage}:${permLsp}:${sysPrompt}`; // If the override differs or permissions changed, invalidate the cached agent if ( tabAgent.agent && (effectiveKeyId !== tabAgent.keyId || effectiveModelId !== tabAgent.modelId || permKey !== tabAgent._lastPermKey) ) { tabAgent.agent = null; } if (!tabAgent.agent) { const defaultWorkDir = process.env.DISPATCH_WORKING_DIR ?? process.cwd(); let workingDirectory = tabAgent.workingDirectoryOverride ?? defaultWorkDir; // Expand ~ to home directory if (workingDirectory === "~" || workingDirectory.startsWith("~/")) { const { homedir } = await import("node:os"); const { join } = await import("node:path"); workingDirectory = join(homedir(), workingDirectory.slice(1)); } // Resolve relative paths against the default working directory // (e.g. subagent cwd "./subtask" resolves relative to the parent's effective dir) { const { isAbsolute, resolve } = await import("node:path"); if (!isAbsolute(workingDirectory)) { workingDirectory = resolve(defaultWorkDir, workingDirectory); } } // Auto-create the working directory if it doesn't exist try { const { mkdirSync, existsSync } = await import("node:fs"); if (!existsSync(workingDirectory)) { mkdirSync(workingDirectory, { recursive: true }); } } catch { // Ignore — tool execution will surface the error naturally } // Resolve LSP servers for this working directory once (cached). // Drives both diagnostics-on-write (the write_file hook) and the // optional `lsp` tool. Empty for directories with no `[lsp]` block. const lspServers = this.getLspServersForDir(workingDirectory); const afterWriteHook = this.buildAfterWriteHook(workingDirectory, lspServers); // Build tools list — child agents use their toolsOverride whitelist, // parent agents use permission settings from DB const toolEntries: Array<{ name: string; tool: ReturnType }> = []; if (tabAgent.toolsOverride) { // Child agent: use explicit tool whitelist const allowed = new Set(tabAgent.toolsOverride); if (allowed.has("read_file")) { toolEntries.push({ name: "read_file", tool: createReadFileTool(workingDirectory) }); // read_file_slice is a companion to read_file — only useful for // inspecting long lines that read_file truncated. Ship them together. toolEntries.push({ name: "read_file_slice", tool: createReadFileSliceTool(workingDirectory), }); // list_files is bundled with read access if (allowed.has("list_files")) { toolEntries.push({ name: "list_files", tool: createListFilesTool(workingDirectory) }); } } if (allowed.has("list_files") && !allowed.has("read_file")) { toolEntries.push({ name: "list_files", tool: createListFilesTool(workingDirectory) }); } if (allowed.has("write_file")) { toolEntries.push({ name: "write_file", tool: createWriteFileTool(workingDirectory, afterWriteHook), }); } if (allowed.has("run_shell")) { toolEntries.push({ name: "run_shell", tool: createRunShellTool(workingDirectory, tabAgent.shellStore), }); } if (allowed.has("search_code")) { toolEntries.push({ name: "search_code", tool: createSearchCodeTool(workingDirectory), }); } if (allowed.has("web_search")) { toolEntries.push({ name: "web_search", tool: createWebSearchTool() }); } if (allowed.has("key_usage")) { toolEntries.push({ name: "key_usage", tool: this.buildKeyUsageTool() }); } if (allowed.has("lsp") && lspServers.length > 0) { toolEntries.push({ name: "lsp", tool: createLspTool(() => ({ manager: this.lspManager, workingDirectory, servers: lspServers, })), }); } if (allowed.has("youtube_transcribe")) { toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool(tabAgent.transcriptStore), }); } if (allowed.has("todo")) { toolEntries.push({ name: "todo", tool: createTaskListTool(tabAgent.taskList) }); } if (allowed.has("summon")) { const childParentAllowedTools = new Set(toolEntries.map((e) => e.name)); const allAgentDefs = loadAgents(workingDirectory); const availableSubagents = toAvailableSubagents( allAgentDefs, GLOBAL_AGENTS_DIR, workingDirectory, ); const availableUserAgents = toAvailableUserAgents( allAgentDefs, GLOBAL_AGENTS_DIR, workingDirectory, ); const agentDirPaths = getAgentDirPaths(workingDirectory); toolEntries.push({ name: "summon", tool: createSummonTool( workingDirectory, { spawn: (opts) => this.spawnChildAgent({ ...opts, parentKeyId: tabAgent.keyId, parentModelId: tabAgent.modelId, parentAllowedTools: childParentAllowedTools, parentTabId: tabId, }), getResult: (id) => this.getChildResult(id), }, availableSubagents, availableUserAgents, agentDirPaths, permUserAgent, ), }); } if (allowed.has("retrieve")) { toolEntries.push({ name: "retrieve", tool: createRetrieveTool({ getResult: (id) => tabAgent.shellStore.has(id) ? tabAgent.shellStore.getResult(id) : tabAgent.transcriptStore.has(id) ? tabAgent.transcriptStore.getResult(id) : this.getChildResult(id), }), }); } // Tab-to-tab communication — gated on the child whitelist. if (allowed.has("send_to_tab") || allowed.has("read_tab")) { for (const entry of this.buildTabCommToolEntries(tabId, allowed.has("read_tab"))) { if (allowed.has(entry.name)) toolEntries.push(entry); } } } else { // Parent agent: use permission settings from DB if (permRead) { toolEntries.push({ name: "read_file", tool: createReadFileTool(workingDirectory) }); toolEntries.push({ name: "read_file_slice", tool: createReadFileSliceTool(workingDirectory), }); toolEntries.push({ name: "list_files", tool: createListFilesTool(workingDirectory) }); } if (permEdit) { toolEntries.push({ name: "write_file", tool: createWriteFileTool(workingDirectory, afterWriteHook), }); } if (permBash) { toolEntries.push({ name: "run_shell", tool: createRunShellTool(workingDirectory, tabAgent.shellStore), }); } if (permSearchCode) { toolEntries.push({ name: "search_code", tool: createSearchCodeTool(workingDirectory), }); } if (permWebSearch) { toolEntries.push({ name: "web_search", tool: createWebSearchTool() }); } if (permKeyUsage) { toolEntries.push({ name: "key_usage", tool: this.buildKeyUsageTool() }); } // The `lsp` tool exposes diagnostics + navigation on demand. It is // gated by `perm_lsp` AND requires at least one server configured // in the working directory's `dispatch.toml`. if (permLsp && lspServers.length > 0) { toolEntries.push({ name: "lsp", tool: createLspTool(() => ({ manager: this.lspManager, workingDirectory, servers: lspServers, })), }); } if (permYoutubeTranscribe) { toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool(tabAgent.transcriptStore), }); } toolEntries.push({ name: "todo", tool: createTaskListTool(tabAgent.taskList) }); // The `summon` tool is registered when EITHER the subagent // permission (`perm_summon`) OR the user-agent permission // (`perm_user_agent`) is granted — the two are independent. // `perm_summon` enables ordinary subagent spawning; granting // only `perm_user_agent` exposes summon in user-agent-only mode // (spawns top-level user agents exclusively). if (permSummon || permUserAgent) { // Capture parent's allowed tool names for child permission enforcement const parentAllowedTools = new Set(toolEntries.map((e) => e.name)); const allAgentDefs = loadAgents(workingDirectory); const availableSubagents = toAvailableSubagents( allAgentDefs, GLOBAL_AGENTS_DIR, workingDirectory, ); const availableUserAgents = toAvailableUserAgents( allAgentDefs, GLOBAL_AGENTS_DIR, workingDirectory, ); const agentDirPaths = getAgentDirPaths(workingDirectory); toolEntries.push({ name: "summon", tool: createSummonTool( workingDirectory, { spawn: (opts) => this.spawnChildAgent({ ...opts, parentKeyId: tabAgent.keyId, parentModelId: tabAgent.modelId, parentAllowedTools, parentTabId: tabId, }), getResult: (id) => this.getChildResult(id), }, availableSubagents, availableUserAgents, agentDirPaths, permUserAgent, permSummon, ), }); // `retrieve` collects subagent results. User agents are // fire-and-forget, so it is bundled with the subagent // permission only — a user-agent-only grant doesn't get it. if (permSummon) { toolEntries.push({ name: "retrieve", tool: createRetrieveTool({ getResult: (id) => tabAgent.shellStore.has(id) ? tabAgent.shellStore.getResult(id) : tabAgent.transcriptStore.has(id) ? tabAgent.transcriptStore.getResult(id) : this.getChildResult(id), }), }); } } if (permSendToTab || permReadTab) { const tabCommAllowed = new Set(); if (permSendToTab) tabCommAllowed.add("send_to_tab"); if (permReadTab) tabCommAllowed.add("read_tab"); for (const entry of this.buildTabCommToolEntries(tabId, permReadTab)) { if (tabCommAllowed.has(entry.name)) toolEntries.push(entry); } } } const tools = toolEntries.map((e) => e.tool); const toolNames = toolEntries.map((e) => e.name); tabAgent._lastPermKey = permKey; const ruleset = configToRuleset(this.config); // Try to resolve model from registry, fall back to env vars let apiKey = ""; let model = "deepseek-v4-flash"; let baseURL = "https://opencode.ai/zen/go/v1"; let provider: string | undefined; let claudeCredentials: { accessToken: string } | undefined; let useOverride = false; if (effectiveKeyId && effectiveModelId && this.modelRegistry) { // Direct override: look up the key by id in the registry const keyState = this.modelRegistry .getKeys() .find((k) => k.definition.id === effectiveKeyId); if (keyState) { const key = keyState.definition; if (key.provider === "anthropic") { // Anthropic provider: resolve credentials from Claude accounts const credFile = key.credentials_file; const findAccount = () => this.claudeAccounts.find((a) => a.id === effectiveKeyId) ?? (credFile ? this.claudeAccounts.find((a) => a.source === credFile) : this.claudeAccounts[0]); let account = findAccount(); // Self-heal: account discovery runs once at construction and can // fail at boot (e.g. the data dir isn't mounted yet and // getDatabase() throws EACCES), leaving claudeAccounts empty for // the process lifetime. If the lookup fails, re-run discovery now // that the DB is reachable and retry before giving up. if (!account) { this._refreshClaudeAccounts(); account = findAccount(); } if (account) { const creds = refreshAccountCredentials(account); if (creds && creds.expiresAt > Date.now() + 60_000) { claudeCredentials = { accessToken: creds.accessToken }; apiKey = creds.accessToken; baseURL = key.base_url; model = effectiveModelId; provider = "anthropic"; tabAgent.keyId = effectiveKeyId; tabAgent.modelId = effectiveModelId; useOverride = true; } else { // Token expired — await the async refresh const fresh = await refreshAccountCredentialsAsync(account); if (fresh && fresh.expiresAt > Date.now() + 60_000) { account.credentials = fresh; claudeCredentials = { accessToken: fresh.accessToken }; apiKey = fresh.accessToken; baseURL = key.base_url; model = effectiveModelId; provider = "anthropic"; tabAgent.keyId = effectiveKeyId; tabAgent.modelId = effectiveModelId; useOverride = true; } else { console.warn( `dispatch: unable to refresh Claude credentials for "${account.label}" — using stale token`, ); claudeCredentials = { accessToken: account.credentials.accessToken }; apiKey = account.credentials.accessToken; baseURL = key.base_url; model = effectiveModelId; provider = "anthropic"; tabAgent.keyId = effectiveKeyId; tabAgent.modelId = effectiveModelId; useOverride = true; } } } else { console.warn(`dispatch: no Claude credentials found for key "${key.id}"`); } } else { // Standard key: resolve from env var const envKey = resolveApiKey(key.id, key.env); if (envKey) { apiKey = envKey; baseURL = key.base_url; model = effectiveModelId; // OpenCode Go splits its catalog across two endpoints: // `/chat/completions` — GLM, Kimi, DeepSeek, MiMo (OpenAI-compatible) // `/messages` — MiniMax, Qwen (Anthropic-format) // The configured key has provider="opencode-go" which defaults to // the OpenAI-compatible path. When the selected model lives on the // `/messages` route, route through the API-key Anthropic provider // instead so the SDK targets the correct endpoint and protocol. if (key.provider === "opencode-go" && isOpencodeGoAnthropicModel(model)) { provider = "opencode-anthropic"; } tabAgent.keyId = effectiveKeyId; tabAgent.modelId = effectiveModelId; useOverride = true; } else { console.warn( `dispatch: env var "${key.env}" not set for key "${key.id}", falling back to env vars`, ); // Apply the correct model + baseURL even when the key // is unavailable so the request at least targets the // right endpoint and produces a diagnosable auth error // instead of silently routing to the default OpenCode Go // endpoint (which may serve a different model). baseURL = key.base_url; model = effectiveModelId; tabAgent.keyId = effectiveKeyId; tabAgent.modelId = effectiveModelId; useOverride = true; } } } else { console.warn(`dispatch: key "${effectiveKeyId}" not found in model registry`); } } if (!useOverride) { // Clear any previous override when falling back to default resolution tabAgent.keyId = null; tabAgent.modelId = null; } const customSystemPrompt = getSetting("system_prompt") || undefined; tabAgent.agent = new Agent( { model, apiKey, baseURL, systemPrompt: buildSystemPrompt(toolNames, customSystemPrompt), tools, workingDirectory, permissionChecker: this.permissionManager ?? undefined, ruleset, provider, tabId, ...(claudeCredentials ? { claudeCredentials } : {}), }, { dequeueMessages: () => this.dequeueMessages(tabId), waitForQueuedMessage: () => this.waitForQueuedMessage(tabId), }, ); // Pre-populate the Agent's in-memory message history from the DB // so prior turns survive Agent recreation. The Agent is // constructed fresh here in three scenarios that ALL discard // the previous in-memory `messages` array: // 1. First call for this tab (no prior Agent existed) // 2. Model/key/permission/working-directory change — the // invalidation gate above set `tabAgent.agent = null`. // This is the model-switcher-slider case: without this // pre-population, DeepSeek would see zero context after // switching from Opus mid-conversation. // 3. Config or skills reload (configWatcher / skillsWatcher // also null out `tabAgent.agent`). // // Boundary semantics: `processMessage` appends the current turn's // user message (as a chunk row) BEFORE calling this function, so the // grouped history ends in `[..., u_current]`. In the fallback retry // path the previous attempt may also have flushed a partial assistant // turn, so it can end `[..., u_current, partial_a]`. Either way, we // walk backwards to the most recent user-role message and load only // strictly-prior messages: `agent.run()` pushes the current user // message itself, so including it here would duplicate it. // // `toModelMessages` already filters out `role === "system"` // rows and strips `error` / `system` chunks, so it's safe to // load system messages verbatim. try { const rows = getMessagesForTab(tabId); let cutIdx = rows.length; for (let i = rows.length - 1; i >= 0; i--) { const row = rows[i]; if (row && row.role === "user") { cutIdx = i; break; } } if (cutIdx > 0) { tabAgent.agent.messages = rows .slice(0, cutIdx) .map((r) => ({ role: r.role, chunks: r.chunks })); } } catch { // DB read failed — leave `messages: []`. The agent still // works, just without prior history (matches pre-fix // behaviour, so this is no worse than what we had before). } } return tabAgent.agent; } /** * Resolve connection parameters (apiKey / baseURL / model / provider / * Claude OAuth credentials) for a key+model pair WITHOUT mutating any tab * state. Mirrors the resolution in `getOrCreateAgentForTab` (Anthropic * account refresh, env-var keys, OpenCode-Go anthropic-route detection) but * is side-effect-free so it can be reused by compaction. Returns `null` when * the key/model can't be resolved to a usable connection. */ private async resolveConnection( keyId: string, modelId: string, ): Promise<{ apiKey: string; baseURL: string; model: string; provider?: string; claudeCredentials?: { accessToken: string }; } | null> { if (!keyId || !modelId || !this.modelRegistry) return null; const keyState = this.modelRegistry.getKeys().find((k) => k.definition.id === keyId); if (!keyState) return null; const key = keyState.definition; if (key.provider === "anthropic") { const credFile = key.credentials_file; const findAccount = () => this.claudeAccounts.find((a) => a.id === keyId) ?? (credFile ? this.claudeAccounts.find((a) => a.source === credFile) : this.claudeAccounts[0]); let account = findAccount(); if (!account) { this._refreshClaudeAccounts(); account = findAccount(); } if (!account) return null; let creds = refreshAccountCredentials(account); if (!creds || creds.expiresAt <= Date.now() + 60_000) { const fresh = await refreshAccountCredentialsAsync(account); if (fresh) { account.credentials = fresh; creds = fresh; } } const accessToken = creds?.accessToken ?? account.credentials.accessToken; return { apiKey: accessToken, baseURL: key.base_url, model: modelId, provider: "anthropic", claudeCredentials: { accessToken }, }; } // Standard key resolved from env var. const envKey = resolveApiKey(key.id, key.env); if (!envKey) return null; let provider: string | undefined; if (key.provider === "opencode-go" && isOpencodeGoAnthropicModel(modelId)) { provider = "opencode-anthropic"; } return { apiKey: envKey, baseURL: key.base_url, model: modelId, provider }; } /** * Resolve the compactor model: the configured `compaction_model_*` setting * when present, otherwise fall back to the source tab's own key+model. Used * to run the summary generation request. */ private resolveCompactorKeyModel(sourceTabId: string): { keyId: string; modelId: string } | null { const cfgKey = getSetting("compaction_model_key_id"); const cfgModel = getSetting("compaction_model_id"); if (cfgKey && cfgModel) return { keyId: cfgKey, modelId: cfgModel }; const tabAgent = this.tabAgents.get(sourceTabId); const row = getTab(sourceTabId); const keyId = tabAgent?.keyId ?? row?.keyId ?? null; const modelId = tabAgent?.modelId ?? row?.modelId ?? null; if (keyId && modelId) return { keyId, modelId }; return null; } /** * Run a one-shot, tool-less summary generation using a transient Agent. The * Agent loop handles Claude-OAuth billing/identity/caching correctly. The * prompt is the entire summary request (transcript + template); no tools are * registered so the model can only produce text. Returns the concatenated * assistant text, or throws on error/abort. */ private async generateSummary( conn: { apiKey: string; baseURL: string; model: string; provider?: string; claudeCredentials?: { accessToken: string }; }, prompt: string, abortSignal: AbortSignal, ): Promise { const agent = new Agent({ model: conn.model, apiKey: conn.apiKey, baseURL: conn.baseURL, systemPrompt: "You are a conversation-summarization assistant. Follow the user's instructions and output ONLY the requested Markdown summary.", tools: [], workingDirectory: process.env.DISPATCH_WORKING_DIR ?? process.cwd(), provider: conn.provider, ...(conn.claudeCredentials ? { claudeCredentials: conn.claudeCredentials } : {}), }); let out = ""; let errored: string | null = null; for await (const event of agent.run(prompt, { abortSignal })) { if (abortSignal.aborted) break; if (event.type === "text-delta") out += event.delta; else if (event.type === "error") errored = event.error; } if (abortSignal.aborted) throw new Error("Compaction cancelled"); if (errored) throw new Error(errored); const trimmed = out.trim(); if (!trimmed) throw new Error("Compaction produced an empty summary"); return trimmed; } /** * Compact a conversation (UI-driven). Summarizes the older "head" of * `sourceTabId` into an anchored Markdown summary while preserving the last * N turns verbatim, then performs the id-relocation the product requires: * * - The FULL pre-compaction history is moved to a fresh `backupTabId` * (so nothing is destroyed — fully reversible). * - `sourceTabId` (the canonical id, with its key/model/working-dir/agent * and the global tool permissions intact) is re-seeded with the summary * turn + the preserved tail. * * `tempTabId` is the frontend placeholder tab hosting the "compacting…" * message; it is discarded on completion. Cancellation = the caller aborts * via `tempTabId`'s abort controller (e.g. closing the placeholder tab). * * Returns when the compaction settles; emits `compaction-started`, * `compaction-complete`, or `compaction-error`. */ async compactTab(tempTabId: string, sourceTabId: string): Promise { const tempAgent = this._getOrCreateTabAgent(tempTabId); const abortController = new AbortController(); tempAgent.abortController = abortController; const fail = (error: string): void => { const src = this.tabAgents.get(sourceTabId); if (src) src.compacting = false; this.emit({ type: "compaction-error", tempTabId, sourceTabId, error }, tempTabId); // Drain anything queued on the source while it was locked. this.continueFromQueue(sourceTabId); }; try { // Refuse to compact a running tab (turn must have ended). if (this.getTabStatus(sourceTabId) === "running") { fail("Cannot compact while a turn is in progress."); return; } // Lock the source so new messages queue instead of starting turns. const sourceAgent = this._getOrCreateTabAgent(sourceTabId); sourceAgent.compacting = true; this.emit({ type: "compaction-started", tempTabId, sourceTabId }, tempTabId); // Read the full history as grouped messages (preserves turnId/seq). const rows = groupRowsToMessages(getChunksForTab(sourceTabId)); const { tail, prompt } = buildCompactionRequest({ messages: rows }); if (!prompt) { fail("Not enough conversation history to compact."); return; } // Resolve the compactor model (configured, else source tab's own). const compactor = this.resolveCompactorKeyModel(sourceTabId); if (!compactor) { fail("No model available to run compaction. Configure a compaction model in Settings."); return; } const conn = await this.resolveConnection(compactor.keyId, compactor.modelId); if (!conn) { fail("Could not resolve credentials for the compaction model."); return; } // Generate the summary (abortable). const summary = await this.generateSummary(conn, prompt, abortController.signal); if (abortController.signal.aborted) { fail("Compaction cancelled"); return; } // Relocate the FULL history to a backup tab, then re-seed the source. const sourceRow = getTab(sourceTabId); const backupTabId = crypto.randomUUID(); const baseTitle = sourceRow?.title ?? "Conversation"; const backupTitle = `${baseTitle} (pre-compaction)`; createTab(backupTabId, backupTitle, { keyId: sourceRow?.keyId ?? null, modelId: sourceRow?.modelId ?? null, }); rekeyChunks(sourceTabId, backupTabId); // Re-seed the canonical (source) id: a summary user turn followed by // the preserved tail rows (turnId/step/role/type/data preserved). const summaryTurnId = crypto.randomUUID(); appendChunks(sourceTabId, explodeUserText(summaryTurnId, buildSummaryTurnText(summary))); for (const msg of tail) { const drafts = explodeTurn(msg.turnId, msg.chunks); if (msg.role === "user") { // groupRowsToMessages collapses a user message to a single text // chunk; explodeTurn only handles assistant/system shapes, so // rebuild the user row explicitly. const text = msg.chunks.find((c) => c.type === "text"); appendChunks( sourceTabId, explodeUserText(msg.turnId, text && text.type === "text" ? text.text : ""), ); continue; } if (drafts.length > 0) appendChunks(sourceTabId, drafts); } // Reset the source Agent so its in-memory history reloads from the // freshly re-seeded chunk log on the next turn. sourceAgent.agent = null; sourceAgent.compacting = false; this.emit( { type: "compaction-complete", tempTabId, sourceTabId, backupTabId, backupTitle }, sourceTabId, ); // Drain any messages queued while the source was locked. this.continueFromQueue(sourceTabId); } catch (err) { if (abortController.signal.aborted) { fail("Compaction cancelled"); return; } fail(err instanceof Error ? err.message : String(err)); } finally { // The placeholder tab is transient; drop its in-memory agent state. this.tabAgents.delete(tempTabId); } } getTabStatus(tabId: string): AgentStatus { return this.tabAgents.get(tabId)?.status ?? "idle"; } /** * Prompt-cache WARMING for an idle tab (see `Agent.warmCache`). * * Reconstructs the tab's genuine conversation from the persisted chunk log, * resolves the SAME agent (model/key/tools/system prompt) the next real turn * would use, and replays the exact cached prefix plus one trivial throwaway * turn so the provider's ~5-min prompt-cache TTL is refreshed. The warming * request and its response are NOT persisted, NOT emitted, and NOT folded * into the real usage aggregate — its `usage` is returned to the caller so a * warming-only "last request" cache rate can be shown without polluting the * real Cache Rate metric. * * Refuses to fire while the tab is generating (`running`): the prefix would * be mid-mutation and the request would contend with the live turn. Callers * gate on idle anyway; this is defence in depth. * * Returns `{ ok: true, usage }` on success or `{ ok: false, error }` so the * route can surface a debug-strip error string. Never throws. */ async warmCacheForTab( tabId: string, opts: { keyId?: string; modelId?: string; agentModels?: AgentModelEntry[]; reasoningEffort?: ReasoningEffort; } = {}, ): Promise<{ ok: true; usage: UsageData } | { ok: false; error: string }> { if (this.getTabStatus(tabId) === "running") { return { ok: false, error: "tab is generating" }; } try { const tabAgent = this._getOrCreateTabAgent(tabId); if (opts.agentModels) tabAgent.agentModels = opts.agentModels; // Resolve the agent the next REAL turn would use. The fallback chain's // first entry mirrors `processMessage`'s primary attempt; we only warm // the primary (warming a fallback model would write a DIFFERENT prefix). const fallbackSequence = this.buildFallbackSequence(tabAgent, opts.keyId, opts.modelId); const primary = fallbackSequence[0]; const agent = await this.getOrCreateAgentForTab( tabId, primary?.key_id || opts.keyId, primary?.model_id || opts.modelId, ); // Resolve the SAME reasoning effort the next real turn would use: // per-model (agent definition) → per-tab selector → Agent default. // This drives the thinking providerOptions, which is an Anthropic // message-cache key — warming MUST match it or it warms a different // cache bucket than the real turn reads (the 0%-on-switch bug). const effort = primary?.effort ?? opts.reasoningEffort; // Rebuild the genuine history exactly as `getOrCreateAgentForTab`'s // pre-population does, but keep the FULL history (no trailing-user // trim): warming replays the complete cached prefix as-is. let history: ChatMessage[] = []; try { history = getMessagesForTab(tabId).map((r) => ({ role: r.role, chunks: r.chunks })); } catch { // DB read failed — warm with whatever in-memory history the agent has. history = [...agent.messages]; } const usage = await agent.warmCache(history, { ...(effort ? { reasoningEffort: effort } : {}), }); return { ok: true, usage }; } catch (err) { return { ok: false, error: err instanceof Error ? err.message : String(err) }; } } /** * Snapshot of every tab the manager is currently tracking. Sent on WS * connect and via GET /status so a freshly-loaded frontend can * reconstruct any in-flight assistant turn without missing the chunks * that arrived before its WS handshake completed. * * For each running tab, the snapshot includes: * - status: "running" * - currentChunks: a defensive shallow copy of `tabAgent.currentChunks` * (the live chunk array the streaming loop appends to). The * consumer owns this copy and may mutate it freely. * - currentAssistantId: the DB id of the in-flight assistant message * row. The frontend aligns its local assistant message id with * this so the next `done` event lands on the right message. * * Every tab additionally carries its `tasks` (the current todo list) when * non-empty, so a reloaded frontend rehydrates the Tasks panel from the * backend rather than blanking it. * * For idle/error tabs, only `status` (plus any `tasks`) is present. Tabs not in * `this.tabAgents` (e.g. tabs in the DB that have never been touched * since server start) are absent from the returned record — the * caller infers their status from the DB row (always "idle" at rest). */ getAllStatuses(): Record { const result: Record = {}; for (const [tabId, tabAgent] of this.tabAgents.entries()) { const snap: TabStatusSnapshot = { status: tabAgent.status }; // Include the tab's todo list (for ALL tabs, not just running ones) // so a reloaded frontend rehydrates the Tasks panel from the backend // instead of blanking it. Omit when empty to keep the payload lean. const tasks = tabAgent.taskList.getTasks(); if (tasks.length > 0) { snap.tasks = tasks; } if (tabAgent.status === "running") { if (tabAgent.currentChunks) { // Defensive shallow copy: callers may serialize/mutate. snap.currentChunks = [...tabAgent.currentChunks]; } if (tabAgent.currentAssistantId) { snap.currentAssistantId = tabAgent.currentAssistantId; } if (tabAgent.currentTurnId) { snap.currentTurnId = tabAgent.currentTurnId; } } result[tabId] = snap; } return result; } /** @deprecated Use getTabStatus(tabId) instead */ getStatus(): AgentStatus { // Return running if any tab is running, otherwise idle for (const tabAgent of this.tabAgents.values()) { if (tabAgent.status === "running") return "running"; } return "idle"; } getMessageCount(): number { return this.messageCount; } onEvent(listener: (event: AgentEvent & { tabId: string }) => void): () => void { this.eventListeners.add(listener); return () => { this.eventListeners.delete(listener); }; } private emit(event: AgentEvent, tabId: string): void { for (const listener of this.eventListeners) { listener({ ...event, tabId } as AgentEvent & { tabId: string }); } } /** * Persist a system chunk (notice / model-changed / config-reload / * cancelled) to a tab's history. * * If an assistant turn is in flight (`currentChunks` is non-null), the * chunk is folded into the in-flight chunk list; it is exploded into a * `system` chunk row when the turn flushes. * * Otherwise we append a standalone `system` chunk row immediately. Adjacent * system rows are coalesced back into one system message at group time * (`groupRowsToMessages`). */ private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void { const tabAgent = this.tabAgents.get(tabId); // Turn in flight → fold into the in-flight chunk list; it is exploded // into chunk rows (including this system chunk) when the turn flushes. if (tabAgent?.currentChunks) { tabAgent.currentChunks.push({ type: "system", kind, text }); return; } // No turn in flight → persist a standalone system chunk row immediately. try { const turnId = tabAgent?.currentTurnId ?? crypto.randomUUID(); appendChunks(tabId, explodeTurn(turnId, [{ type: "system", kind, text }])); } catch { // DB not available (e.g. tab not yet created) — drop silently. } } stopTab(tabId: string): void { const tabAgent = this.tabAgents.get(tabId); if (tabAgent) { // If a turn is in flight, drop a `cancelled` system chunk into the // in-flight chunk list so the user sees an explicit "Generation // cancelled by user" marker at the cancellation point. It is // persisted (as a chunk row) when `processMessage` flushes the // aborted turn. if (tabAgent.currentChunks) { tabAgent.currentChunks.push({ type: "system", kind: "cancelled", text: "Generation cancelled by user", }); } tabAgent.abortController?.abort(); tabAgent.status = "idle"; this.emit({ type: "status", status: "idle" }, tabId); tabAgent.agent = null; // Resolve any pending completion promise so retrieve doesn't hang tabAgent.completionResolve?.({ status: "error", error: "Agent was stopped." }); } } deleteTab(tabId: string): void { this.stopTab(tabId); this.tabAgents.delete(tabId); // Drop any spilled tool-output files this tab accumulated. Best-effort — // errors are swallowed inside the helper. See packages/core/src/tools/truncate.ts. clearSpillForTab(tabId); } /** * Spawn a child agent in a new tab. Returns the tab ID (agent_id). * The child runs asynchronously — use getChildResult to await completion. */ async spawnChildAgent(options: { task: string; tools: string[]; workingDirectory?: string; /** * Optional slug of an `AgentDefinition` to apply. When set, the * definition's `tools`, `models`, and `cwd` take precedence over * the `tools`/`workingDirectory` passed in `options`. Tools are * still intersected with `parentAllowedTools` to prevent a * subagent from gaining capabilities its parent doesn't have. */ agentSlug?: string; parentKeyId?: string | null; parentModelId?: string | null; parentAllowedTools?: Set; parentTabId?: string; /** * When true, spawn as an independent top-level "user agent" tab * instead of a subagent child tab. User agents have no parent, * are persistent, and cannot be retrieved (fire-and-forget). */ topLevel?: boolean; }): Promise { const tabId = crypto.randomUUID(); const title = options.task.length > 50 ? `${options.task.slice(0, 47)}...` : options.task; // Validate working directory is within the parent agent's effective CWD const defaultWorkDir = process.env.DISPATCH_WORKING_DIR ?? process.cwd(); let parentEffectiveDir = options.topLevel ? defaultWorkDir : options.parentTabId ? (this.tabAgents.get(options.parentTabId)?.workingDirectoryOverride ?? defaultWorkDir) : defaultWorkDir; // Expand ~ in parent dir if (parentEffectiveDir === "~" || parentEffectiveDir.startsWith("~/")) { const { homedir } = await import("node:os"); const { join } = await import("node:path"); parentEffectiveDir = join(homedir(), parentEffectiveDir.slice(1)); } // Resolve the agent definition (if a slug was supplied) BEFORE // computing the effective working directory and tool whitelist. // The definition's cwd/tools take precedence over the caller's // `workingDirectory`/`tools` parameters, mirroring how a top-level // tab picking the same definition would behave. let agentDef: ReturnType = null; if (options.agentSlug) { agentDef = loadAgent(options.agentSlug, parentEffectiveDir); if (!agentDef) { const allDefs = loadAgents(parentEffectiveDir); if (options.topLevel) { const userAgents = allDefs .filter((d) => !d.is_subagent) .map((d) => `${d.slug} (${d.name})`); const hint = userAgents.length > 0 ? ` Available user agents: ${userAgents.join(", ")}.` : " No user agent definitions exist yet."; throw new Error(`Agent definition not found: "${options.agentSlug}".${hint}`); } else { const subagents = allDefs .filter((d) => d.is_subagent) .map((d) => `${d.slug} (${d.name})`); const hint = subagents.length > 0 ? ` Available subagents: ${subagents.join(", ")}.` : " No subagent definitions exist yet."; throw new Error(`Agent definition not found: "${options.agentSlug}".${hint}`); } } // Validate that the definition type matches the spawn mode: // subagent slugs can't be used with top_level=true, and // user-agent slugs can't be used without top_level=true. if (options.topLevel && agentDef.is_subagent) { throw new Error( `Cannot spawn user agent: "${options.agentSlug}" is a subagent definition. Use a non-subagent definition for top_level=true.`, ); } if (!options.topLevel && !agentDef.is_subagent) { throw new Error( `Cannot spawn subagent: "${options.agentSlug}" is a user agent definition. Set top_level=true to spawn it as an independent tab, or use a subagent definition.`, ); } } // Resolve child working directory. // Subagents are validated to stay within the parent's effective dir. // User agents (topLevel) are free to use any directory. const requestedDir = agentDef?.cwd ?? options.workingDirectory; let resolvedWorkingDirectory = requestedDir; if (requestedDir) { const { isAbsolute, relative, resolve, join } = await import("node:path"); // Expand ~ in child working directory let childDir = requestedDir; if (childDir === "~" || childDir.startsWith("~/")) { const { homedir } = await import("node:os"); childDir = join(homedir(), childDir.slice(1)); } if (options.topLevel) { // User agents: resolve freely, no containment check resolvedWorkingDirectory = resolve(defaultWorkDir, childDir); } else { // Subagents: validate within parent's directory const parentDir = resolve(parentEffectiveDir); const resolved = resolve(parentDir, childDir); const rel = relative(parentDir, resolved); const isOutside = rel.startsWith("..") || isAbsolute(rel); if (isOutside) { throw new Error( `Working directory "${requestedDir}" is outside the parent's working directory "${parentDir}".`, ); } // Store the resolved absolute path so downstream code doesn't // re-resolve against the wrong base directory resolvedWorkingDirectory = resolved; } } // Determine the child's tool whitelist. When an agent definition // was supplied, expand its short permission-group names // (read/edit/bash) into concrete tool names. Otherwise use the // `tools` parameter verbatim. Either way, intersect with // parentAllowedTools so a subagent can't gain capabilities the // parent doesn't have — even an agent definition can't escalate. const baseTools = agentDef ? expandAgentToolNames(agentDef.tools) : options.tools; let childTools = baseTools; if (options.parentAllowedTools) { childTools = baseTools.filter((t) => options.parentAllowedTools?.has(t)); } // Create the tab agent entry with overrides const tabAgent = this._getOrCreateTabAgent(tabId); tabAgent.toolsOverride = childTools; tabAgent.workingDirectoryOverride = resolvedWorkingDirectory; tabAgent.finalOutput = ""; const primary = agentDef?.models[0]; if (agentDef && primary) { // The agent definition specifies its own model fallback chain. // Set keyId/modelId to the primary (first) model in the chain so // the frontend can display the concrete key/model this subagent // was configured with, while `agentModels` drives the fallback // sequence (matches how a top-level tab using this definition // would be configured). tabAgent.keyId = primary.key_id; tabAgent.modelId = primary.model_id; tabAgent.agentModels = agentDef.models; } else { // No definition (or definition has no models) → inherit from // the parent like before. tabAgent.keyId = options.parentKeyId ?? null; tabAgent.modelId = options.parentModelId ?? null; if (options.parentTabId) { const parentAgent = this.tabAgents.get(options.parentTabId); if (parentAgent?.agentModels) { tabAgent.agentModels = parentAgent.agentModels; } } } // Set up completion tracking — user agents are fire-and-forget, // so only subagents get completion promises. if (!options.topLevel) { tabAgent.completionPromise = new Promise((resolve) => { tabAgent.completionResolve = resolve; }); } // Create tab in DB try { const { createTab } = await import("@dispatch/core"); createTab(tabId, title, { keyId: tabAgent.keyId, modelId: tabAgent.modelId, parentTabId: options.topLevel ? undefined : options.parentTabId, }); } catch { // Continue even if DB fails } // Notify the frontend about the new tab this.emit( { type: "tab-created", id: tabId, title, keyId: tabAgent.keyId, modelId: tabAgent.modelId, parentTabId: options.topLevel ? null : (options.parentTabId ?? null), agentSlug: options.agentSlug ?? null, workingDirectory: resolvedWorkingDirectory ?? null, agentModels: tabAgent.agentModels ?? null, }, tabId, ); // Start the child agent in the background this.processMessage( tabId, options.task, options.parentKeyId ?? undefined, options.parentModelId ?? undefined, ).catch((err) => { const errorMsg = err instanceof Error ? err.message : String(err); tabAgent.completionResolve?.({ status: "error", error: errorMsg }); }); return tabId; } /** * Wait for a child agent to finish and return its result. * Blocks until the child completes or errors. */ async getChildResult( agentId: string, ): Promise<{ status: "done"; result: string } | { status: "error"; error: string }> { const tabAgent = this.tabAgents.get(agentId); if (!tabAgent) { return { status: "error", error: `No agent found with id '${agentId}'` }; } if (!tabAgent.completionPromise) { // Not a child agent or already completed if (tabAgent.status === "idle") { return { status: "done", result: tabAgent.finalOutput ?? "(no output)" }; } if (tabAgent.status === "running") { return { status: "error", error: "This is a user agent (top-level tab) and cannot be retrieved. User agents are fire-and-forget.", }; } return { status: "error", error: "Agent has no completion tracking. It may not have been spawned via summon.", }; } return tabAgent.completionPromise; } // ─── Tab-to-tab communication ─────────────────────────────────── // // `send_to_tab` / `read_tab` let an agent message a peer tab by its short // handle (a git-style prefix of the tab UUID). Delivery reuses the exact // running→queue / idle→new-turn routing that `POST /chat` uses (see // `deliverMessage`), so an agent message behaves identically to a user one. /** * Build the `key_usage` tool, wired to the live model registry (key states) * and the discovered Claude accounts. The tool fetches usage live with a * cache fallback (anthropic) or a live scrape (opencode-go), reporting * remaining headroom, reset times, and data freshness per key. */ private buildKeyUsageTool(): ReturnType { return createKeyUsageTool({ listKeys: () => this.modelRegistry?.getKeys() ?? [], listClaudeAccounts: () => this.claudeAccounts, }); } /** * Build the `send_to_tab` + `read_tab` tool entries for `tabId`. Shared by * both tool-construction paths (child whitelist + permission-gated parent). * `selfHandle` is computed once so the calling tab can stamp provenance and * reject self-sends. * * `canReadTab` reflects whether THIS tab will also be granted `read_tab` * (the permissions are split). It is forwarded into `send_to_tab` so the * tool only points the agent at `read_tab` when it actually has it — never * advertising a tool the agent wasn't granted. */ private buildTabCommToolEntries( tabId: string, canReadTab: boolean, ): Array<{ name: string; tool: ReturnType }> { const selfHandle = shortestUniquePrefix(tabId); return [ { name: "send_to_tab", tool: createSendToTabTool({ resolveShortId: (prefix) => this.resolveTabHandle(prefix), // origin: "agent" subjects this to the receiver's auto-wake // budget so agent↔agent loops are bounded (see deliverMessage). deliver: (targetId, message) => this.deliverMessage(targetId, message, { origin: "agent" }), listOpenHandles: () => this.listOpenHandles(tabId), self: { id: tabId, handle: selfHandle }, canReadTab, }), }, { name: "read_tab", tool: createReadTabTool({ resolveShortId: (prefix) => this.resolveTabHandle(prefix), getLastResponse: (targetId) => this.getLastTabResponse(targetId), listOpenHandles: () => this.listOpenHandles(tabId), }), }, ]; } /** * Project a core `ResolveTabPrefixResult` down to the tool-facing * `TabResolution` (minimal `{ id, title, handle }` refs). Each match's * `handle` is recomputed via `shortestUniquePrefix` so the value the tool * echoes back always matches what the UI currently shows. */ private resolveTabHandle(prefix: string): TabResolution { const res = resolveTabPrefix(prefix); if (res.status === "none") return { status: "none" }; if (res.status === "ok") { return { status: "ok", tab: { id: res.tab.id, title: res.tab.title, handle: shortestUniquePrefix(res.tab.id), }, }; } return { status: "ambiguous", matches: res.matches.map((t) => ({ id: t.id, title: t.title, handle: shortestUniquePrefix(t.id), })), }; } /** Snapshot of open tabs as `{ handle, title }`, excluding `exceptId` * (typically the caller's own tab). Drives the "available tabs" hints. */ private listOpenHandles(exceptId?: string): Array<{ handle: string; title: string }> { return listOpenTabs() .filter((t) => t.id !== exceptId) .map((t) => ({ handle: shortestUniquePrefix(t.id), title: t.title })); } /** * Return a tab's most recent COMPLETED assistant turn as flat text, plus * its current status. Reads the persisted chunk log (source of truth) and * grabs the last `role === "assistant"` group's text chunks. `text` is null * when no completed assistant turn exists yet. */ getLastTabResponse(tabId: string): { text: string | null; status: AgentStatus } { const status = this.getTabStatus(tabId); try { const messages = getMessagesForTab(tabId); for (let i = messages.length - 1; i >= 0; i--) { const msg = messages[i]; if (!msg || msg.role !== "assistant") continue; const text = msg.chunks .filter((c): c is { type: "text"; text: string } => c.type === "text") .map((c) => c.text) .join("") .trim(); if (text.length > 0) return { text, status }; } } catch { // DB unavailable / tab unknown — fall through to null. } return { text: null, status }; } /** * Deliver `message` to `tabId`, choosing the SAME routing as `POST /chat`: * - target running → queue it (consumed like a user interrupt). * - target idle/errored → wake it and start a new turn. * * Returns quickly; does NOT block on the turn. Both the HTTP `/chat` path * and the `send_to_tab` tool call through here so the running/idle decision * lives in exactly one place. * * `opts` carries the per-request knobs `/chat` forwards (key/model, agent * fallback chain, reasoning effort, working dir, an explicit queue id). The * `send_to_tab` tool passes none of these — for a cold wake (a tab not in * `tabAgents`, e.g. after a server restart) the key/model are hydrated from * the live `TabAgent` if present, else from the persisted tab row. (A cold * tab keeps its stored key/model but not its full agent-definition fallback * chain — see plan notes.) */ deliverMessage( tabId: string, message: string, opts: { keyId?: string; modelId?: string; agentModels?: AgentModelEntry[]; reasoningEffort?: ReasoningEffort; workingDirectory?: string; queueId?: string; /** * Ephemeral ordered multimodal content (image/pdf attachments) for a * FRESH human turn. Forwarded to `processMessage` → `agent.run` only * when the tab is idle (a started turn); never carried into the queue * path (attachments require a fresh turn — the caller guards that). */ content?: UserContentPart[]; /** * Who is sending this message. `"human"` (default) is unrestricted * and REFILLS the target's agent-to-agent auto-wake budget. `"agent"` * (from the `send_to_tab` tool) is governed by that budget: an * agent-originated wake of an idle tab consumes one unit, and once the * budget is exhausted the message is queued WITHOUT starting a turn * (returned as `suppressed`) so a runaway A↔B loop can't spend tokens * forever with no human in the loop. */ origin?: "human" | "agent"; } = {}, ): { status: "queued"; messageId: string } | { status: "started" } | { status: "suppressed" } { const origin = opts.origin ?? "human"; // A human touching the tab clears any accumulated agent-wake throttle: // the conversation is back under human supervision, so peers get a fresh // budget of auto-wakes again. if (origin === "human") { this._getOrCreateTabAgent(tabId).autoWakeBudget = MAX_AGENT_AUTO_WAKES; } if (this.getTabStatus(tabId) === "running") { // Busy target → always queue (consumed like a user interrupt), // regardless of origin. Queuing does not itself start a turn, so it // can't drive a runaway loop; we don't spend budget here. const { messageId } = this.queueMessage(tabId, message, opts.queueId); return { status: "queued", messageId }; } // Tab is mid-compaction → hold the message (queue, never start a turn) // until compaction settles. continueFromQueue (called after compaction) // drains it onto the compacted continuation. if (this.tabAgents.get(tabId)?.compacting) { const { messageId } = this.queueMessage(tabId, message, opts.queueId); return { status: "queued", messageId }; } // Idle/errored target → this delivery would WAKE the tab (start a turn). // For agent-originated wakes, enforce the auto-wake budget first. if (origin === "agent") { const target = this._getOrCreateTabAgent(tabId); if (target.autoWakeBudget <= 0) { // Budget exhausted: preserve the message (queue it, never drop) // but do NOT wake the tab. A human message will refill the budget // and the queued message will be seen on the next human turn. this.queueMessage(tabId, message, opts.queueId); const notice = `Automatic agent-to-agent message limit reached for this tab ` + `(${MAX_AGENT_AUTO_WAKES} consecutive). Further messages from other tabs ` + `are held until you send a message here.`; this.emit({ type: "notice", message: notice }, tabId); this.routeSystemEventToTab(tabId, "notice", notice); return { status: "suppressed" }; } target.autoWakeBudget -= 1; } // Resolve key/model: explicit opts win, then the live tab agent's, then // the persisted row's. const tabAgent = this.tabAgents.get(tabId); let keyId = opts.keyId ?? tabAgent?.keyId ?? undefined; let modelId = opts.modelId ?? tabAgent?.modelId ?? undefined; const agentModels = opts.agentModels ?? tabAgent?.agentModels; if (!keyId || !modelId) { const row = getTab(tabId); if (row) { keyId = keyId ?? row.keyId ?? undefined; modelId = modelId ?? row.modelId ?? undefined; } } this.processMessage( tabId, message, keyId, modelId, opts.reasoningEffort, opts.workingDirectory, agentModels, opts.content, ).catch((err) => { console.error(`[dispatch] deliverMessage processMessage error for tab ${tabId}:`, err); }); return { status: "started" }; } async processMessage( tabId: string, message: string, keyId?: string, modelId?: string, reasoningEffort?: ReasoningEffort, workingDirectory?: string, agentModels?: AgentModelEntry[], content?: UserContentPart[], ): Promise { const tabAgent = this._getOrCreateTabAgent(tabId); // Apply working directory override from frontend if provided if (workingDirectory !== undefined) { const prevDir = tabAgent.workingDirectoryOverride; tabAgent.workingDirectoryOverride = workingDirectory || undefined; // Invalidate cached agent if working directory changed if (prevDir !== tabAgent.workingDirectoryOverride) { tabAgent.agent = null; } } tabAgent.abortController = new AbortController(); tabAgent.status = "running"; this.messageCount += 1; // Persist the user message as a chunk row (once, before any fallback // retry). The whole turn — this user message plus the assistant's // chunk rows — shares one `turn_id`. const turnId = crypto.randomUUID(); tabAgent.currentTurnId = turnId; // Announce the turn so the frontend can tag its live chunks with this // turn_id (stable render keys → flicker-free reconcile when the turn // seals). Emitted before any content delta. this.emit({ type: "turn-start", turnId }, tabId); appendChunks(tabId, explodeUserText(turnId, message)); // Store agent models on the tab if provided (defines fallback order) if (agentModels) { tabAgent.agentModels = agentModels; } // Build the fallback sequence: the agent's models list in order, or a single manual entry const fallbackSequence = this.buildFallbackSequence(tabAgent, keyId, modelId); const maxFallbackAttempts = fallbackSequence.length; let processError: string | null = null; let allOutput = ""; let currentKeyId: string | undefined; let currentModelId: string | undefined; for (let fallbackIdx = 0; fallbackIdx < maxFallbackAttempts; fallbackIdx++) { const entry = fallbackSequence[fallbackIdx]; if (!entry) break; // unreachable: loop bound guarantees defined, satisfies TS // Convert empty strings (used when caller omitted keyId/modelId in // manual mode) to undefined so `getOrCreateAgentForTab` falls back // to the tabAgent's stored defaults via the `?? tabAgent.keyId` chain. currentKeyId = entry.key_id || undefined; currentModelId = entry.model_id || undefined; // Effort precedence: per-model (agent definition) → per-tab selector // (the `reasoningEffort` arg) → the Agent's own DEFAULT_REASONING_EFFORT // floor (applied inside `agent.run`). const effortForEntry = entry.effort ?? reasoningEffort; allOutput = ""; // Single ordered chunk list accumulating this attempt's assistant // turn (text / thinking / tool-batch / error / system), folded from // the stream via the shared `appendEventToChunks` helper. const chunks: Chunk[] = []; // Per-attempt usage accumulator. Reset each fallback attempt so a // superseded (rate-limited) attempt's usage is discarded alongside its // `chunks`. One `usage` event → one UsageData row. const usageRows: UsageData[] = []; const assistantId = crypto.randomUUID(); let assistantPersisted = false; tabAgent.currentChunks = chunks; tabAgent.currentAssistantId = assistantId; // Write-on-seal: explode the accumulated turn into flat chunk rows // ONCE, when the turn settles. `explodeTurn` splits each step's // `tool-batch` into separate `tool_call` + `tool_result` rows and // tags every row with `turn_id` + derived `step`. const flushAssistant = (): void => { if (assistantPersisted) return; // Append usage as extra drafts in the SAME appendChunks call as the // turn's content rows: one atomic write, one fsync, contiguous seqs. // Usage rows are an invisible side channel (excluded from // getChunksForTab); `step` is cosmetic for usage (never grouped). const drafts = explodeTurn(turnId, chunks); for (const u of usageRows) { drafts.push({ turnId, step: 0, role: "assistant", type: "usage", data: u }); } if (drafts.length === 0) return; appendChunks(tabId, drafts); assistantPersisted = true; }; let attemptError: string | null = null; try { const agent = await this.getOrCreateAgentForTab(tabId, currentKeyId, currentModelId); // Ensure tab exists in DB (frontend may have failed to create it) try { const { getDatabase } = await import("@dispatch/core"); const db = getDatabase(); const exists = db.query("SELECT 1 FROM tabs WHERE id = $id").get({ $id: tabId }); if (!exists) { const { createTab } = await import("@dispatch/core"); createTab(tabId, "New Tab", { keyId: currentKeyId ?? null, modelId: currentModelId ?? null, }); } } catch { // Best-effort — if this fails, chunk persistence will throw and we'll catch it below } for await (const event of agent.run(message, { ...(effortForEntry ? { reasoningEffort: effortForEntry } : {}), abortSignal: tabAgent.abortController?.signal, ...(content ? { content } : {}), })) { // Stop processing if the tab was aborted (closed/stopped). // stopTab() already injected a `cancelled` system chunk into // `chunks` before flipping the abort flag, so we just need // to flush and exit. if (tabAgent.abortController?.signal.aborted) break; if (event.type === "error") { attemptError = event.error; // Record the error as a chunk so it's part of the // persisted turn history. appendEventToChunks(chunks, event); break; } if (event.type === "status") { tabAgent.status = event.status; } this.emit(event, tabId); // For diagnostics / child agent result harvesting, keep a // flat string copy of plain text output. if (event.type === "text-delta") { allOutput += event.delta; } // Capture per-step usage as a side-channel row to persist with the // turn (one row per `usage` event). The live `this.emit(event)` // above still drives in-session accumulation; this is the reload- // persistence path. `appendEventToChunks` intentionally ignores // `usage`, so it never becomes message content. if (event.type === "usage") { usageRows.push({ ...event.usage }); } // Route every content-bearing event through the shared helper. // `appendEventToChunks` ignores lifecycle events (status / done // / task-list-update / tab-created / message-* / etc), so it's // safe to call unconditionally. Persistence happens once, after // the loop, so we never write a partial turn that a fallback // retry would then duplicate. appendEventToChunks(chunks, event); } } catch (err) { console.error(`[dispatch] processMessage error for tab ${tabId}:`, err); attemptError = err instanceof Error ? err.message : String(err); } // Decide whether a fallback retry will supersede this attempt. const isRetryable = attemptError !== null && (attemptError.includes("status=429") || attemptError.toLowerCase().includes("rate limit") || attemptError.toLowerCase().includes("rate_limit") || attemptError.toLowerCase().includes("usage limit") || attemptError.toLowerCase().includes("exhausted")); const nextEntry = fallbackSequence[fallbackIdx + 1]; const willRetry = Boolean(isRetryable && this.modelRegistry && tabAgent.keyId && nextEntry); // Persist this attempt's turn — unless a retry will replace it, in // which case the partial (and its error chunk) is discarded so the // next attempt's chunks don't merge with a failed one. On success, // abort, or a final error, the turn is flushed exactly once. if (!willRetry) { flushAssistant(); } tabAgent.currentChunks = null; tabAgent.currentAssistantId = null; // No error — success if (!attemptError) { processError = null; break; } if (willRetry && nextEntry && tabAgent.keyId) { this.modelRegistry?.markKeyExhausted(tabAgent.keyId, attemptError); const fallbackMsg = `Key "${tabAgent.keyId}" rate limited. ` + `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; console.warn(`[dispatch] ${fallbackMsg}`); // Persist the notice + model-change as standalone system chunk // rows (no turn in flight now — currentChunks was just cleared). this.emit({ type: "notice", message: fallbackMsg }, tabId); this.routeSystemEventToTab(tabId, "notice", fallbackMsg); this.emit( { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, tabId, ); this.routeSystemEventToTab( tabId, "model-changed", `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, ); tabAgent.agent = null; continue; } // All fallbacks exhausted or non-retryable error processError = attemptError; tabAgent.status = "error"; this.emit({ type: "error", error: attemptError }, tabId); this.emit({ type: "status", status: "error" }, tabId); break; } // Turn fully settled and its chunks are now persisted (flushAssistant ran // above). Signal the frontend that the turn's rows — with real seqs — are // durable so it can fold its live representation into the sealed log. // Emitted AFTER status:idle/error (which fire before the DB write). // Carry the authoritative usage aggregate (read AFTER the usage rows were // persisted) so the frontend reconciles its live cacheStats to the DB truth // — self-healing the live overshoot from a discarded rate-limited attempt. let usageStats: UsageStats | null = null; try { usageStats = getUsageStatsForTab(tabId); } catch { // DB read failed — omit reconciliation rather than crash the turn. } this.emit({ type: "turn-sealed", turnId, usageStats }, tabId); // Turn fully settled — clear the shared turn id. tabAgent.currentTurnId = null; // Resolve completion promise for child agents if (processError === null) { tabAgent.finalOutput = allOutput; tabAgent.completionResolve?.({ status: "done", result: allOutput || "(no output)" }); } else { tabAgent.completionResolve?.({ status: "error", error: processError }); } // The turn has fully settled. If messages piled up on the queue during it // and were NOT injected as a mid-turn interrupt (they arrived after the // last tool call, or this turn had no tool calls), kick off a fresh turn // to answer them instead of letting them sit unanswered — the queue is // consumed, not just appended. Only on a clean finish: a turn the user // explicitly stopped, or one that errored out, leaves its queue intact // for the next deliberate send (see continueFromQueue). if (processError === null) { this.continueFromQueue(tabId); } } /** * Start a new turn for any messages that accumulated on `tabId`'s queue * during the turn that just finished. This is what makes a queued message * (from a user OR another agent via send_to_tab) actually get a response * after the agent's current turn ends, rather than waiting forever. * * Loop safety: a queued-then-continued turn draws from the SAME * `autoWakeBudget` that bounds agent-to-agent wakes. Every human-originated * message refills that budget when it is delivered (see deliverMessage), so * human conversations are never throttled; only a runaway agent<->agent * chain (A queues B, B queues A, ...) is capped. When the budget is spent * the messages stay queued and a notice is emitted; the next human message * refills the budget and starts their turn. */ private continueFromQueue(tabId: string): void { const tabAgent = this.tabAgents.get(tabId); if (!tabAgent) return; if (tabAgent.messageQueue.length === 0) return; // Never auto-continue a turn the user stopped or one that errored. if (tabAgent.status === "error") return; if (tabAgent.abortController?.signal.aborted) return; if (tabAgent.autoWakeBudget <= 0) { // Budget spent — hold the queued messages (don't drop them) until a // human message refills the budget. Prevents unbounded agent loops. const notice = `Automatic continuation limit reached for this tab ` + `(${MAX_AGENT_AUTO_WAKES} consecutive turns). Queued messages are held ` + `until you send a message here.`; this.emit({ type: "notice", message: notice }, tabId); this.routeSystemEventToTab(tabId, "notice", notice); return; } tabAgent.autoWakeBudget -= 1; // Drain the queue as a "continuation" so the frontend folds the pending // queued bubbles into this NEW turn's initiating user row (rather than // into a running turn's tool result, which is the "interrupt" case). const drained = this.dequeueMessages(tabId, "continuation"); if (drained.length === 0) return; const message = drained.map((m) => m.message).join("\n---\n"); // Reuse the tab's resolved key/model/fallback chain — the continuation is // the same conversation, just a new turn. Fire-and-forget: if more // messages arrive during it, its own tail will continue the chain. this.processMessage( tabId, message, tabAgent.keyId ?? undefined, tabAgent.modelId ?? undefined, undefined, undefined, tabAgent.agentModels, ).catch((err) => { console.error(`[dispatch] continueFromQueue processMessage error for tab ${tabId}:`, err); }); } private buildFallbackSequence( tabAgent: TabAgent, keyId?: string, modelId?: string, ): AgentModelEntry[] { // Agent mode: use the agent's configured fallback hierarchy in strict order const models = tabAgent.agentModels; if (models && models.length > 0) { const startIdx = models.findIndex((m) => m.key_id === keyId && m.model_id === modelId); return startIdx >= 0 ? models.slice(startIdx) : models; } // Manual mode: no fallback — just the selected key/model pair. // Always return at least one entry so `processMessage` runs the agent // once (empty strings let `getOrCreateAgentForTab` fall back to the // tabAgent's stored defaults or environment-driven config). return [{ key_id: keyId ?? "", model_id: modelId ?? "" }]; } queueMessage(tabId: string, message: string, clientId?: string): { messageId: string } { const tabAgent = this.tabAgents.get(tabId); if (!tabAgent) throw new Error("Tab not found"); const id = clientId || crypto.randomUUID(); const queued: QueuedMessage = { id, message, timestamp: Date.now() }; tabAgent.messageQueue.push(queued); // Wake up any blocking tools waiting for queue for (const listener of tabAgent.queueListeners) { listener(); } tabAgent.queueListeners = []; this.emit({ type: "message-queued", tabId, messageId: id, message }, tabId); return { messageId: id }; } cancelQueuedMessage(tabId: string, messageId: string): boolean { const tabAgent = this.tabAgents.get(tabId); if (!tabAgent) return false; const idx = tabAgent.messageQueue.findIndex((m) => m.id === messageId); if (idx === -1) return false; tabAgent.messageQueue.splice(idx, 1); this.emit({ type: "message-cancelled", tabId, messageId }, tabId); return true; } dequeueMessages( tabId: string, reason: "interrupt" | "continuation" = "interrupt", ): QueuedMessage[] { const tabAgent = this.tabAgents.get(tabId); if (!tabAgent) return []; const messages = [...tabAgent.messageQueue]; tabAgent.messageQueue = []; if (messages.length > 0) { this.emit( { type: "message-consumed", tabId, messageIds: messages.map((m) => m.id), reason }, tabId, ); } return messages; } waitForQueuedMessage(tabId: string): { promise: Promise; cancel: () => void } { const tabAgent = this.tabAgents.get(tabId); if (!tabAgent) return { promise: Promise.resolve(), cancel: () => {} }; if (tabAgent.messageQueue.length > 0) return { promise: Promise.resolve(), cancel: () => {} }; let listener: (() => void) | null = null; const promise = new Promise((resolve) => { listener = resolve; tabAgent.queueListeners.push(resolve); }); const cancel = () => { if (listener) { tabAgent.queueListeners = tabAgent.queueListeners.filter((l) => l !== listener); listener = null; } }; return { promise, cancel }; } destroy(): void { this.configWatcher?.close(); this.skillsWatcher?.close(); for (const watcher of this.lspDirWatchers.values()) watcher.close(); this.lspDirWatchers.clear(); // Shut down all long-lived LSP server processes. Fire-and-forget: the // promise is detached so `destroy()` stays synchronous (matching its // existing contract), but every client gets `shutdown()` called. void this.lspManager.shutdownAll(); } }