diff options
| author | Adam Malczewski <[email protected]> | 2026-06-01 07:51:23 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-01 07:51:23 +0900 |
| commit | dbc3b36f6d94d719cb1a07074e5d74ce22d2fad3 (patch) | |
| tree | 8454bd10191ce1ccbe9740b5778ee1e9679f6339 /packages/api | |
| parent | 8b9533c22a47bbf6f916667e2c25d8e8e419da37 (diff) | |
| download | dispatch-dbc3b36f6d94d719cb1a07074e5d74ce22d2fad3.tar.gz dispatch-dbc3b36f6d94d719cb1a07074e5d74ce22d2fad3.zip | |
fix(queue): consume queued messages after a turn ends (start a new turn)
A message queued while the agent was mid-turn was only handled if it
arrived DURING a tool batch (injected as a [USER INTERRUPT]). If it
landed after the last tool call — or the turn had no tools — the agent
silently appended it to history and ended the turn with no response, so
it sat there unanswered. This affected both user-queued messages and
agent-queued ones (send_to_tab).
- agent.ts: stop the end-of-turn drain that swallowed trailing queued
messages into history. They now stay on the queue.
- agent-manager: after a CLEAN turn settles, continueFromQueue() drains
the queue and starts a fresh turn to answer it. Skipped on a
user-stopped or errored turn (queue preserved for the next send).
- Loop safety: continuation draws from the existing autoWakeBudget, so a
runaway agent<->agent chain is bounded; human sends refill it, so human
conversations are never throttled.
- dequeueMessages now tags message-consumed with reason
"interrupt" | "continuation"; the frontend collapses continuation-
consumed queued bubbles into the next turn's initiator row (avoids the
linger/dup traps documented in queue-interrupt-reconcile-edge-cases.md).
- Tests: agent (no-swallow + interrupt regression), agent-manager
(continuation, no-op when empty, user-stop preserves queue, bounded
loop), frontend (continuation bubble becomes next initiator).
- wishlist: remove the now-fixed item.
Diffstat (limited to 'packages/api')
| -rw-r--r-- | packages/api/src/agent-manager.ts | 79 | ||||
| -rw-r--r-- | packages/api/tests/agent-manager.test.ts | 125 |
2 files changed, 202 insertions, 2 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 517c661..5a0ffdf 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -1628,6 +1628,75 @@ export class AgentManager { } else { tabAgent.completionResolve?.({ status: "error", error: processError }); } + + // The turn has fully settled. If messages piled up on the queue during it + // and were NOT injected as a mid-turn interrupt (they arrived after the + // last tool call, or this turn had no tool calls), kick off a fresh turn + // to answer them instead of letting them sit unanswered — the queue is + // consumed, not just appended. Only on a clean finish: a turn the user + // explicitly stopped, or one that errored out, leaves its queue intact + // for the next deliberate send (see continueFromQueue). + if (processError === null) { + this.continueFromQueue(tabId); + } + } + + /** + * Start a new turn for any messages that accumulated on `tabId`'s queue + * during the turn that just finished. This is what makes a queued message + * (from a user OR another agent via send_to_tab) actually get a response + * after the agent's current turn ends, rather than waiting forever. + * + * Loop safety: a queued-then-continued turn draws from the SAME + * `autoWakeBudget` that bounds agent-to-agent wakes. Every human-originated + * message refills that budget when it is delivered (see deliverMessage), so + * human conversations are never throttled; only a runaway agent<->agent + * chain (A queues B, B queues A, ...) is capped. When the budget is spent + * the messages stay queued and a notice is emitted; the next human message + * refills the budget and starts their turn. + */ + private continueFromQueue(tabId: string): void { + const tabAgent = this.tabAgents.get(tabId); + if (!tabAgent) return; + if (tabAgent.messageQueue.length === 0) return; + // Never auto-continue a turn the user stopped or one that errored. + if (tabAgent.status === "error") return; + if (tabAgent.abortController?.signal.aborted) return; + + if (tabAgent.autoWakeBudget <= 0) { + // Budget spent — hold the queued messages (don't drop them) until a + // human message refills the budget. Prevents unbounded agent loops. + const notice = + `Automatic continuation limit reached for this tab ` + + `(${MAX_AGENT_AUTO_WAKES} consecutive turns). Queued messages are held ` + + `until you send a message here.`; + this.emit({ type: "notice", message: notice }, tabId); + this.routeSystemEventToTab(tabId, "notice", notice); + return; + } + tabAgent.autoWakeBudget -= 1; + + // Drain the queue as a "continuation" so the frontend folds the pending + // queued bubbles into this NEW turn's initiating user row (rather than + // into a running turn's tool result, which is the "interrupt" case). + const drained = this.dequeueMessages(tabId, "continuation"); + if (drained.length === 0) return; + const message = drained.map((m) => m.message).join("\n---\n"); + + // Reuse the tab's resolved key/model/fallback chain — the continuation is + // the same conversation, just a new turn. Fire-and-forget: if more + // messages arrive during it, its own tail will continue the chain. + this.processMessage( + tabId, + message, + tabAgent.keyId ?? undefined, + tabAgent.modelId ?? undefined, + undefined, + undefined, + tabAgent.agentModels, + ).catch((err) => { + console.error(`[dispatch] continueFromQueue processMessage error for tab ${tabId}:`, err); + }); } private buildFallbackSequence( @@ -1673,13 +1742,19 @@ export class AgentManager { return true; } - dequeueMessages(tabId: string): QueuedMessage[] { + dequeueMessages( + tabId: string, + reason: "interrupt" | "continuation" = "interrupt", + ): QueuedMessage[] { const tabAgent = this.tabAgents.get(tabId); if (!tabAgent) return []; const messages = [...tabAgent.messageQueue]; tabAgent.messageQueue = []; if (messages.length > 0) { - this.emit({ type: "message-consumed", tabId, messageIds: messages.map((m) => m.id) }, tabId); + this.emit( + { type: "message-consumed", tabId, messageIds: messages.map((m) => m.id), reason }, + tabId, + ); } return messages; } diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index 1358eb1..b9b4510 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -1129,6 +1129,131 @@ describe("AgentManager", () => { }); }); + describe("queue continuation after a turn ends", () => { + // A run generator that enqueues `msg` (as if a user/agent sent it mid-turn) + // exactly once, then streams a normal short reply. Used to simulate a + // message landing on the queue while the agent is busy. + function runThatEnqueues(manager: AgentManager, tabId: string, msg: string): RunGen { + let enqueued = false; + return async function* () { + yield { type: "status", status: "running" } as const; + if (!enqueued) { + enqueued = true; + manager.queueMessage(tabId, msg); + } + yield { type: "text-delta", delta: "reply" } as const; + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "reply" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }; + } + + it("starts a NEW turn for a message queued during the turn (the bug fix)", async () => { + const manager = new AgentManager(); + const processSpy = vi.spyOn(manager, "processMessage"); + setRunImpl(runThatEnqueues(manager, "tab-cont", "follow-up question")); + + await manager.processMessage("tab-cont", "first"); + // Let the fire-and-forget continuation turn run to completion. + await new Promise<void>((r) => setTimeout(r, 50)); + + // processMessage called twice: the original turn + the continuation. + expect(processSpy).toHaveBeenCalledTimes(2); + expect(processSpy.mock.calls[1]?.[0]).toBe("tab-cont"); + expect(processSpy.mock.calls[1]?.[1]).toBe("follow-up question"); + + // Queue is drained and the tab is idle again. + const inner = manager as unknown as { + tabAgents: Map<string, { messageQueue: unknown[] }>; + }; + expect(inner.tabAgents.get("tab-cont")?.messageQueue).toHaveLength(0); + expect(manager.getTabStatus("tab-cont")).toBe("idle"); + }); + + it('emits message-consumed with reason "continuation" when draining between turns', async () => { + const manager = new AgentManager(); + const events: AgentEvent[] = []; + manager.onEvent((e) => events.push(e)); + setRunImpl(runThatEnqueues(manager, "tab-evt", "next")); + + await manager.processMessage("tab-evt", "first"); + await new Promise<void>((r) => setTimeout(r, 50)); + + const consumed = events.find((e) => e.type === "message-consumed") as + | (AgentEvent & { reason?: string }) + | undefined; + expect(consumed).toBeDefined(); + expect(consumed?.reason).toBe("continuation"); + }); + + it("does NOT continue when the queue is empty after a clean turn", async () => { + const manager = new AgentManager(); + const processSpy = vi.spyOn(manager, "processMessage"); + + await manager.processMessage("tab-noqueue", "only message"); + await new Promise<void>((r) => setTimeout(r, 30)); + + expect(processSpy).toHaveBeenCalledTimes(1); // no continuation + }); + + it("does NOT continue a turn the user stopped (queue is preserved)", async () => { + const manager = new AgentManager(); + const processSpy = vi.spyOn(manager, "processMessage"); + // Run that enqueues then aborts itself via stopTab to mimic a user stop. + setRunImpl(async function* () { + yield { type: "status", status: "running" } as const; + manager.queueMessage("tab-stop", "should wait"); + manager.stopTab("tab-stop"); + yield { + type: "done", + message: { role: "assistant", chunks: [] }, + } as const; + }); + + await manager.processMessage("tab-stop", "go"); + await new Promise<void>((r) => setTimeout(r, 30)); + + // Only the original turn ran; the queued message is preserved, unanswered. + expect(processSpy).toHaveBeenCalledTimes(1); + const inner = manager as unknown as { + tabAgents: Map<string, { messageQueue: unknown[] }>; + }; + expect(inner.tabAgents.get("tab-stop")?.messageQueue).toHaveLength(1); + }); + + it("bounds runaway agent<->agent continuation via the auto-wake budget", async () => { + const manager = new AgentManager(); + // A run that ALWAYS enqueues another message → would loop forever + // without the budget cap. + setRunImpl(async function* () { + yield { type: "status", status: "running" } as const; + manager.queueMessage("tab-loop", "again and again"); + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "r" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }); + const processSpy = vi.spyOn(manager, "processMessage"); + + await manager.processMessage("tab-loop", "kick off"); + await new Promise<void>((r) => setTimeout(r, 120)); + + // 1 original + at most MAX_AGENT_AUTO_WAKES (6) continuations = 7. + // Crucially BOUNDED, not infinite. + expect(processSpy.mock.calls.length).toBeLessThanOrEqual(7); + expect(processSpy.mock.calls.length).toBeGreaterThan(1); + // Budget spent; the last queued message is held, not answered. + const inner = manager as unknown as { + tabAgents: Map<string, { autoWakeBudget: number; messageQueue: unknown[] }>; + }; + expect(inner.tabAgents.get("tab-loop")?.autoWakeBudget).toBe(0); + expect(inner.tabAgents.get("tab-loop")?.messageQueue.length).toBeGreaterThan(0); + }); + }); + describe("getLastTabResponse", () => { it("returns the most recent assistant turn's text and current status", () => { const manager = new AgentManager(); |
