summaryrefslogtreecommitdiffhomepage
path: root/packages/api
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-01 07:51:23 +0900
committerAdam Malczewski <[email protected]>2026-06-01 07:51:23 +0900
commitdbc3b36f6d94d719cb1a07074e5d74ce22d2fad3 (patch)
tree8454bd10191ce1ccbe9740b5778ee1e9679f6339 /packages/api
parent8b9533c22a47bbf6f916667e2c25d8e8e419da37 (diff)
downloaddispatch-dbc3b36f6d94d719cb1a07074e5d74ce22d2fad3.tar.gz
dispatch-dbc3b36f6d94d719cb1a07074e5d74ce22d2fad3.zip
fix(queue): consume queued messages after a turn ends (start a new turn)
A message queued while the agent was mid-turn was only handled if it arrived DURING a tool batch (injected as a [USER INTERRUPT]). If it landed after the last tool call — or the turn had no tools — the agent silently appended it to history and ended the turn with no response, so it sat there unanswered. This affected both user-queued messages and agent-queued ones (send_to_tab). - agent.ts: stop the end-of-turn drain that swallowed trailing queued messages into history. They now stay on the queue. - agent-manager: after a CLEAN turn settles, continueFromQueue() drains the queue and starts a fresh turn to answer it. Skipped on a user-stopped or errored turn (queue preserved for the next send). - Loop safety: continuation draws from the existing autoWakeBudget, so a runaway agent<->agent chain is bounded; human sends refill it, so human conversations are never throttled. - dequeueMessages now tags message-consumed with reason "interrupt" | "continuation"; the frontend collapses continuation- consumed queued bubbles into the next turn's initiator row (avoids the linger/dup traps documented in queue-interrupt-reconcile-edge-cases.md). - Tests: agent (no-swallow + interrupt regression), agent-manager (continuation, no-op when empty, user-stop preserves queue, bounded loop), frontend (continuation bubble becomes next initiator). - wishlist: remove the now-fixed item.
Diffstat (limited to 'packages/api')
-rw-r--r--packages/api/src/agent-manager.ts79
-rw-r--r--packages/api/tests/agent-manager.test.ts125
2 files changed, 202 insertions, 2 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index 517c661..5a0ffdf 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -1628,6 +1628,75 @@ export class AgentManager {
} else {
tabAgent.completionResolve?.({ status: "error", error: processError });
}
+
+ // The turn has fully settled. If messages piled up on the queue during it
+ // and were NOT injected as a mid-turn interrupt (they arrived after the
+ // last tool call, or this turn had no tool calls), kick off a fresh turn
+ // to answer them instead of letting them sit unanswered — the queue is
+ // consumed, not just appended. Only on a clean finish: a turn the user
+ // explicitly stopped, or one that errored out, leaves its queue intact
+ // for the next deliberate send (see continueFromQueue).
+ if (processError === null) {
+ this.continueFromQueue(tabId);
+ }
+ }
+
+ /**
+ * Start a new turn for any messages that accumulated on `tabId`'s queue
+ * during the turn that just finished. This is what makes a queued message
+ * (from a user OR another agent via send_to_tab) actually get a response
+ * after the agent's current turn ends, rather than waiting forever.
+ *
+ * Loop safety: a queued-then-continued turn draws from the SAME
+ * `autoWakeBudget` that bounds agent-to-agent wakes. Every human-originated
+ * message refills that budget when it is delivered (see deliverMessage), so
+ * human conversations are never throttled; only a runaway agent<->agent
+ * chain (A queues B, B queues A, ...) is capped. When the budget is spent
+ * the messages stay queued and a notice is emitted; the next human message
+ * refills the budget and starts their turn.
+ */
+ private continueFromQueue(tabId: string): void {
+ const tabAgent = this.tabAgents.get(tabId);
+ if (!tabAgent) return;
+ if (tabAgent.messageQueue.length === 0) return;
+ // Never auto-continue a turn the user stopped or one that errored.
+ if (tabAgent.status === "error") return;
+ if (tabAgent.abortController?.signal.aborted) return;
+
+ if (tabAgent.autoWakeBudget <= 0) {
+ // Budget spent — hold the queued messages (don't drop them) until a
+ // human message refills the budget. Prevents unbounded agent loops.
+ const notice =
+ `Automatic continuation limit reached for this tab ` +
+ `(${MAX_AGENT_AUTO_WAKES} consecutive turns). Queued messages are held ` +
+ `until you send a message here.`;
+ this.emit({ type: "notice", message: notice }, tabId);
+ this.routeSystemEventToTab(tabId, "notice", notice);
+ return;
+ }
+ tabAgent.autoWakeBudget -= 1;
+
+ // Drain the queue as a "continuation" so the frontend folds the pending
+ // queued bubbles into this NEW turn's initiating user row (rather than
+ // into a running turn's tool result, which is the "interrupt" case).
+ const drained = this.dequeueMessages(tabId, "continuation");
+ if (drained.length === 0) return;
+ const message = drained.map((m) => m.message).join("\n---\n");
+
+ // Reuse the tab's resolved key/model/fallback chain — the continuation is
+ // the same conversation, just a new turn. Fire-and-forget: if more
+ // messages arrive during it, its own tail will continue the chain.
+ this.processMessage(
+ tabId,
+ message,
+ tabAgent.keyId ?? undefined,
+ tabAgent.modelId ?? undefined,
+ undefined,
+ undefined,
+ tabAgent.agentModels,
+ ).catch((err) => {
+ console.error(`[dispatch] continueFromQueue processMessage error for tab ${tabId}:`, err);
+ });
}
private buildFallbackSequence(
@@ -1673,13 +1742,19 @@ export class AgentManager {
return true;
}
- dequeueMessages(tabId: string): QueuedMessage[] {
+ dequeueMessages(
+ tabId: string,
+ reason: "interrupt" | "continuation" = "interrupt",
+ ): QueuedMessage[] {
const tabAgent = this.tabAgents.get(tabId);
if (!tabAgent) return [];
const messages = [...tabAgent.messageQueue];
tabAgent.messageQueue = [];
if (messages.length > 0) {
- this.emit({ type: "message-consumed", tabId, messageIds: messages.map((m) => m.id) }, tabId);
+ this.emit(
+ { type: "message-consumed", tabId, messageIds: messages.map((m) => m.id), reason },
+ tabId,
+ );
}
return messages;
}
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index 1358eb1..b9b4510 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -1129,6 +1129,131 @@ describe("AgentManager", () => {
});
});
+ describe("queue continuation after a turn ends", () => {
+ // A run generator that enqueues `msg` (as if a user/agent sent it mid-turn)
+ // exactly once, then streams a normal short reply. Used to simulate a
+ // message landing on the queue while the agent is busy.
+ function runThatEnqueues(manager: AgentManager, tabId: string, msg: string): RunGen {
+ let enqueued = false;
+ return async function* () {
+ yield { type: "status", status: "running" } as const;
+ if (!enqueued) {
+ enqueued = true;
+ manager.queueMessage(tabId, msg);
+ }
+ yield { type: "text-delta", delta: "reply" } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "reply" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ };
+ }
+
+ it("starts a NEW turn for a message queued during the turn (the bug fix)", async () => {
+ const manager = new AgentManager();
+ const processSpy = vi.spyOn(manager, "processMessage");
+ setRunImpl(runThatEnqueues(manager, "tab-cont", "follow-up question"));
+
+ await manager.processMessage("tab-cont", "first");
+ // Let the fire-and-forget continuation turn run to completion.
+ await new Promise<void>((r) => setTimeout(r, 50));
+
+ // processMessage called twice: the original turn + the continuation.
+ expect(processSpy).toHaveBeenCalledTimes(2);
+ expect(processSpy.mock.calls[1]?.[0]).toBe("tab-cont");
+ expect(processSpy.mock.calls[1]?.[1]).toBe("follow-up question");
+
+ // Queue is drained and the tab is idle again.
+ const inner = manager as unknown as {
+ tabAgents: Map<string, { messageQueue: unknown[] }>;
+ };
+ expect(inner.tabAgents.get("tab-cont")?.messageQueue).toHaveLength(0);
+ expect(manager.getTabStatus("tab-cont")).toBe("idle");
+ });
+
+ it('emits message-consumed with reason "continuation" when draining between turns', async () => {
+ const manager = new AgentManager();
+ const events: AgentEvent[] = [];
+ manager.onEvent((e) => events.push(e));
+ setRunImpl(runThatEnqueues(manager, "tab-evt", "next"));
+
+ await manager.processMessage("tab-evt", "first");
+ await new Promise<void>((r) => setTimeout(r, 50));
+
+ const consumed = events.find((e) => e.type === "message-consumed") as
+ | (AgentEvent & { reason?: string })
+ | undefined;
+ expect(consumed).toBeDefined();
+ expect(consumed?.reason).toBe("continuation");
+ });
+
+ it("does NOT continue when the queue is empty after a clean turn", async () => {
+ const manager = new AgentManager();
+ const processSpy = vi.spyOn(manager, "processMessage");
+
+ await manager.processMessage("tab-noqueue", "only message");
+ await new Promise<void>((r) => setTimeout(r, 30));
+
+ expect(processSpy).toHaveBeenCalledTimes(1); // no continuation
+ });
+
+ it("does NOT continue a turn the user stopped (queue is preserved)", async () => {
+ const manager = new AgentManager();
+ const processSpy = vi.spyOn(manager, "processMessage");
+ // Run that enqueues then aborts itself via stopTab to mimic a user stop.
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ manager.queueMessage("tab-stop", "should wait");
+ manager.stopTab("tab-stop");
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [] },
+ } as const;
+ });
+
+ await manager.processMessage("tab-stop", "go");
+ await new Promise<void>((r) => setTimeout(r, 30));
+
+ // Only the original turn ran; the queued message is preserved, unanswered.
+ expect(processSpy).toHaveBeenCalledTimes(1);
+ const inner = manager as unknown as {
+ tabAgents: Map<string, { messageQueue: unknown[] }>;
+ };
+ expect(inner.tabAgents.get("tab-stop")?.messageQueue).toHaveLength(1);
+ });
+
+ it("bounds runaway agent<->agent continuation via the auto-wake budget", async () => {
+ const manager = new AgentManager();
+ // A run that ALWAYS enqueues another message → would loop forever
+ // without the budget cap.
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ manager.queueMessage("tab-loop", "again and again");
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "r" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+ const processSpy = vi.spyOn(manager, "processMessage");
+
+ await manager.processMessage("tab-loop", "kick off");
+ await new Promise<void>((r) => setTimeout(r, 120));
+
+ // 1 original + at most MAX_AGENT_AUTO_WAKES (6) continuations = 7.
+ // Crucially BOUNDED, not infinite.
+ expect(processSpy.mock.calls.length).toBeLessThanOrEqual(7);
+ expect(processSpy.mock.calls.length).toBeGreaterThan(1);
+ // Budget spent; the last queued message is held, not answered.
+ const inner = manager as unknown as {
+ tabAgents: Map<string, { autoWakeBudget: number; messageQueue: unknown[] }>;
+ };
+ expect(inner.tabAgents.get("tab-loop")?.autoWakeBudget).toBe(0);
+ expect(inner.tabAgents.get("tab-loop")?.messageQueue.length).toBeGreaterThan(0);
+ });
+ });
+
describe("getLastTabResponse", () => {
it("returns the most recent assistant turn's text and current status", () => {
const manager = new AgentManager();