summaryrefslogtreecommitdiffhomepage
path: root/packages/api
diff options
context:
space:
mode:
Diffstat (limited to 'packages/api')
-rw-r--r--packages/api/src/agent-manager.ts221
-rw-r--r--packages/api/tests/agent-manager.test.ts15
-rw-r--r--packages/api/tests/routes.test.ts15
3 files changed, 184 insertions, 67 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index 487b09f..0f06683 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -3,9 +3,12 @@ import {
type AgentEvent,
type AgentSkillMapping,
type AgentStatus,
+ appendEventToChunks,
appendMessage,
+ applySystemEvent,
BackgroundShellStore,
BackgroundTranscriptStore,
+ type Chunk,
type ClaudeAccount,
clearSpillForTab,
configToRuleset,
@@ -23,6 +26,7 @@ import {
createYoutubeTranscribeTool,
type DispatchConfig,
getClaudeAccountsFromDB,
+ getMessagesForTab,
getSetting,
loadConfig,
loadSkills,
@@ -32,7 +36,9 @@ import {
refreshAccountCredentialsAsync,
resolveApiKey,
type SkillDefinition,
+ type SystemChunkKind,
TaskList,
+ updateMessage,
validateConfig,
} from "@dispatch/core";
import type { PermissionManager } from "./permission-manager.js";
@@ -160,6 +166,14 @@ interface TabAgent {
shellStore: BackgroundShellStore;
/** Store for transcript requests backgrounded due to user interrupt. */
transcriptStore: BackgroundTranscriptStore;
+ /**
+ * In-flight assistant message chunks for the active turn. `null` when
+ * no turn is running. Out-of-band system events (config-reload,
+ * cancel, etc.) target this list when present.
+ */
+ currentChunks: Chunk[] | null;
+ /** DB id of the in-flight assistant message (if persisted yet). */
+ currentAssistantId: string | null;
}
export class AgentManager {
@@ -222,9 +236,10 @@ export class AgentManager {
for (const tabAgent of this.tabAgents.values()) {
tabAgent.agent = null;
}
- // Emit config-reload to all tabs
+ // Emit config-reload to all tabs (and persist as a system chunk)
for (const tabId of this.tabAgents.keys()) {
this.emit({ type: "config-reload" }, tabId);
+ this.routeSystemEventToTab(tabId, "config-reload", "Configuration reloaded");
}
});
@@ -234,9 +249,10 @@ export class AgentManager {
for (const tabAgent of this.tabAgents.values()) {
tabAgent.agent = null;
}
- // Emit config-reload to all tabs
+ // Emit config-reload to all tabs (and persist as a system chunk)
for (const tabId of this.tabAgents.keys()) {
this.emit({ type: "config-reload" }, tabId);
+ this.routeSystemEventToTab(tabId, "config-reload", "Skills reloaded");
}
});
}
@@ -297,6 +313,8 @@ export class AgentManager {
queueListeners: [],
shellStore: new BackgroundShellStore(),
transcriptStore: new BackgroundTranscriptStore(),
+ currentChunks: null,
+ currentAssistantId: null,
};
this.tabAgents.set(tabId, tabAgent);
}
@@ -671,9 +689,80 @@ export class AgentManager {
}
}
+ /**
+ * Persist a system chunk to a tab's message history.
+ *
+ * If an assistant turn is in flight (`currentChunks` is non-null), the
+ * chunk is appended to the in-flight assistant message's chunk list —
+ * the final `appendMessage` / `updateMessage` call at end-of-turn picks
+ * it up automatically.
+ *
+ * Otherwise we load the tab's persisted messages, run them through
+ * `applySystemEvent` (which either appends to an existing trailing
+ * `role: "system"` message or creates a new one), then persist the
+ * delta via `appendMessage` / `updateMessage`.
+ */
+ private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void {
+ const tabAgent = this.tabAgents.get(tabId);
+
+ // Turn in flight → append directly to the in-flight chunk list.
+ // The chunk lands on the assistant message when it's persisted at
+ // turn-end (or the assistant message is updated mid-turn elsewhere).
+ if (tabAgent?.currentChunks) {
+ tabAgent.currentChunks.push({ type: "system", kind, text });
+ if (tabAgent.currentAssistantId) {
+ try {
+ updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks));
+ } catch {
+ // Best-effort — the final persistence in processMessage will
+ // flush the same chunks again.
+ }
+ }
+ return;
+ }
+
+ // No turn in flight → route via applySystemEvent against the
+ // persisted message list. Either appends to a trailing system
+ // message or creates a fresh one.
+ try {
+ const rows = getMessagesForTab(tabId);
+ const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks }));
+ const before = messages[messages.length - 1];
+ const { messageId } = applySystemEvent(messages, { kind, text });
+ const target = messages.find((m) => m.id === messageId);
+ if (!target) return;
+ if (before && before.id === messageId) {
+ // Appended to existing trailing system message.
+ updateMessage(messageId, JSON.stringify(target.chunks));
+ } else {
+ // Newly created system message.
+ appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks));
+ }
+ } catch {
+ // DB not available (e.g. tab not yet created) — drop silently.
+ }
+ }
+
stopTab(tabId: string): void {
const tabAgent = this.tabAgents.get(tabId);
if (tabAgent) {
+ // If a turn is in flight, drop a `cancelled` system chunk into
+ // the in-flight assistant message so the user sees an explicit
+ // "Generation cancelled by user" marker at the cancellation point.
+ if (tabAgent.currentChunks) {
+ tabAgent.currentChunks.push({
+ type: "system",
+ kind: "cancelled",
+ text: "Generation cancelled by user",
+ });
+ if (tabAgent.currentAssistantId) {
+ try {
+ updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks));
+ } catch {
+ // best-effort
+ }
+ }
+ }
tabAgent.abortController?.abort();
tabAgent.status = "idle";
tabAgent.agent = null;
@@ -891,15 +980,28 @@ export class AgentManager {
currentKeyId = entry.key_id || undefined;
currentModelId = entry.model_id || undefined;
allOutput = "";
- let assistantText = "";
- let assistantThinking = "";
- const assistantToolCalls: Array<{
- id: string;
- name: string;
- arguments: Record<string, unknown>;
- result?: string;
- isError?: boolean;
- }> = [];
+
+ // Single ordered chunk list for the assistant turn — replaces the
+ // previous (text + toolCalls + thinking) tri-accumulator pattern.
+ // Persisted progressively (insert on first chunk, update thereafter)
+ // so out-of-band routes (config-reload, cancel) see real DB rows.
+ const chunks: Chunk[] = [];
+ const assistantId = crypto.randomUUID();
+ let assistantPersisted = false;
+ tabAgent.currentChunks = chunks;
+ tabAgent.currentAssistantId = assistantId;
+
+ const flushAssistant = (): void => {
+ if (chunks.length === 0) return;
+ const json = JSON.stringify(chunks);
+ if (!assistantPersisted) {
+ appendMessage(tabId, assistantId, "assistant", json);
+ assistantPersisted = true;
+ } else {
+ updateMessage(assistantId, json);
+ }
+ };
+
let attemptError: string | null = null;
try {
@@ -925,11 +1027,17 @@ export class AgentManager {
message,
reasoningEffort ? { reasoningEffort } : undefined,
)) {
- // Stop processing if the tab was aborted (closed/stopped)
+ // Stop processing if the tab was aborted (closed/stopped).
+ // stopTab() already injected a `cancelled` system chunk into
+ // `chunks` before flipping the abort flag, so we just need
+ // to flush and exit.
if (tabAgent.abortController?.signal.aborted) break;
if (event.type === "error") {
attemptError = event.error;
+ // Record the error as a chunk so it's part of the
+ // persisted turn history.
+ appendEventToChunks(chunks, event);
break;
}
@@ -938,68 +1046,41 @@ export class AgentManager {
}
this.emit(event, tabId);
- // Accumulate content for DB persistence
+ // For diagnostics / child agent result harvesting, keep a
+ // flat string copy of plain text output.
if (event.type === "text-delta") {
- assistantText += event.delta;
allOutput += event.delta;
- } else if (event.type === "reasoning-delta") {
- assistantThinking += event.delta;
- } else if (event.type === "tool-call") {
- assistantToolCalls.push({
- id: event.toolCall.id,
- name: event.toolCall.name,
- arguments: event.toolCall.arguments,
- });
- } else if (event.type === "tool-result") {
- const tc = assistantToolCalls.find((t) => t.id === event.toolResult.toolCallId);
- if (tc) {
- tc.result = event.toolResult.result;
- tc.isError = event.toolResult.isError;
- }
- } else if (event.type === "done") {
- // Persist assistant message to DB
- const contentSegments: Array<Record<string, unknown>> = [];
- if (assistantText) contentSegments.push({ type: "text", text: assistantText });
- for (const tc of assistantToolCalls) {
- contentSegments.push({ type: "tool-call", ...tc });
- }
- if (contentSegments.length > 0) {
- appendMessage(
- tabId,
- crypto.randomUUID(),
- "assistant",
- JSON.stringify(contentSegments),
- assistantThinking || undefined,
- );
- }
- // Reset for next turn
- assistantText = "";
- assistantThinking = "";
- assistantToolCalls.length = 0;
}
+
+ if (event.type === "done") {
+ // End of turn — flush the accumulated chunks. Reset the
+ // in-flight pointers so out-of-band system events route
+ // through `applySystemEvent` against the persisted list
+ // instead of mutating a stale array.
+ flushAssistant();
+ chunks.length = 0;
+ assistantPersisted = true; // suppress post-loop flush
+ tabAgent.currentChunks = null;
+ tabAgent.currentAssistantId = null;
+ continue;
+ }
+
+ // Route every content-bearing event through the shared helper.
+ // `appendEventToChunks` ignores lifecycle events (status / done
+ // / task-list-update / tab-created / message-* / etc), so it's
+ // safe to call unconditionally.
+ appendEventToChunks(chunks, event);
}
} catch (err) {
console.error(`[dispatch] processMessage error for tab ${tabId}:`, err);
attemptError = err instanceof Error ? err.message : String(err);
}
- // Flush any accumulated assistant content from this attempt
- if (assistantText || assistantToolCalls.length > 0) {
- const contentSegments: Array<Record<string, unknown>> = [];
- if (assistantText) contentSegments.push({ type: "text", text: assistantText });
- for (const tc of assistantToolCalls) {
- contentSegments.push({ type: "tool-call", ...tc });
- }
- if (contentSegments.length > 0) {
- appendMessage(
- tabId,
- crypto.randomUUID(),
- "assistant",
- JSON.stringify(contentSegments),
- assistantThinking || undefined,
- );
- }
- }
+ // Flush any accumulated assistant content from this attempt (covers
+ // the abort/error/exception paths where we never saw a `done`).
+ flushAssistant();
+ tabAgent.currentChunks = null;
+ tabAgent.currentAssistantId = null;
// No error — success
if (!attemptError) {
@@ -1024,11 +1105,21 @@ export class AgentManager {
`Key "${tabAgent.keyId}" rate limited. ` +
`Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`;
console.warn(`[dispatch] ${fallbackMsg}`);
+ // Persist the notice + model-change as system chunks. We're
+ // between turns here (just flushed the previous assistant
+ // message), so the helper routes them into a `role: "system"`
+ // message via `applySystemEvent`.
this.emit({ type: "notice", message: fallbackMsg }, tabId);
+ this.routeSystemEventToTab(tabId, "notice", fallbackMsg);
this.emit(
{ type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id },
tabId,
);
+ this.routeSystemEventToTab(
+ tabId,
+ "model-changed",
+ `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`,
+ );
tabAgent.agent = null;
continue;
}
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index a1f2d75..28fa044 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -13,7 +13,10 @@ vi.mock("@dispatch/core", () => ({
yield { type: "text-delta", delta: "world" } as const;
yield {
type: "done",
- message: { role: "assistant", content: "Hello world" },
+ message: {
+ role: "assistant",
+ chunks: [{ type: "text", text: "Hello world" }],
+ },
} as const;
yield { type: "status", status: "idle" } as const;
}
@@ -178,6 +181,16 @@ vi.mock("@dispatch/core", () => ({
return null;
},
appendMessage() {},
+ updateMessage() {},
+ getMessagesForTab() {
+ return [];
+ },
+ appendEventToChunks(_chunks: unknown[], _event: unknown) {
+ // no-op stub; chunk accumulation isn't exercised in these unit tests
+ },
+ applySystemEvent(_messages: unknown[], _event: unknown) {
+ return { messageId: "mock-system-msg" };
+ },
BackgroundShellStore: class MockBackgroundShellStore {
has() {
return false;
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index 69a6676..49bae49 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -14,7 +14,10 @@ vi.mock("@dispatch/core", () => ({
yield { type: "text-delta", delta: "world" } as const;
yield {
type: "done",
- message: { role: "assistant", content: "Hello world" },
+ message: {
+ role: "assistant",
+ chunks: [{ type: "text", text: "Hello world" }],
+ },
} as const;
yield { type: "status", status: "idle" } as const;
}
@@ -179,6 +182,16 @@ vi.mock("@dispatch/core", () => ({
return null;
},
appendMessage() {},
+ updateMessage() {},
+ getMessagesForTab() {
+ return [];
+ },
+ appendEventToChunks(_chunks: unknown[], _event: unknown) {
+ // no-op stub
+ },
+ applySystemEvent(_messages: unknown[], _event: unknown) {
+ return { messageId: "mock-system-msg" };
+ },
BackgroundShellStore: class MockBackgroundShellStore {
has() {
return false;