summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--notes/wishlist.md4
-rw-r--r--packages/api/src/agent-manager.ts79
-rw-r--r--packages/api/tests/agent-manager.test.ts125
-rw-r--r--packages/core/src/agent/agent.tsbin57628 -> 57720 bytes
-rw-r--r--packages/core/src/types/index.ts16
-rw-r--r--packages/core/tests/agent/agent.test.ts73
-rw-r--r--packages/frontend/src/lib/tabs.svelte.ts46
-rw-r--r--packages/frontend/src/lib/types.ts16
-rw-r--r--packages/frontend/tests/chat-store.test.ts82
9 files changed, 432 insertions, 9 deletions
diff --git a/notes/wishlist.md b/notes/wishlist.md
index 8c3f5ef..f4ecbeb 100644
--- a/notes/wishlist.md
+++ b/notes/wishlist.md
@@ -19,8 +19,6 @@
- **Track token usage in a tab.** Display token usage (e.g. prompt/completion/total tokens) for the chat within each tab.
-- **Fix queue not being consumed after the AI finishes its turn.** When the AI completes its turn, a queued user message is just attached to the chat without continuing the conversation — the turn ends instead of consuming the queue and generating a response. The queued message should kick off a new turn.
-
- **Compaction tool.** A tool to compact/summarize the conversation history to reduce context size while preserving important information.
- **Make the plus button on tabs always on top and to the left.** The "+" button for creating new tabs is currently mixed in with the scrollable tab list. It should be fixed/absolute positioned at the top-left of the tab bar so it's always visible regardless of horizontal scrolling.
@@ -45,5 +43,3 @@
- **"User agents" — summon counterpart to subagents.** Currently agents can summon subagents which are owned by a parent tab (they appear indented under the parent in the tab bar). Add a "user agent" summon variant that spawns a standard top-level tab owned by the user rather than by another tab. This gives agents the ability to open new independent tabs (like a user would), enabling more complex multi-agent workflows where spawned agents persist as first-class tabs.
- **Fix key switching not migrating context correctly.** When switching API keys (e.g. hitting usage limits on one key and switching to another), the new agent appears to receive only the initial system prompt — all subsequent thinking, tool calls, and conversation history are lost. The full chat context including all turns needs to be properly passed to the new key/model so the conversation continues seamlessly.
-
-- **Worktree workflow skill.** A skill for orchestration agents that enables executing several plans or feature implementations in parallel using git worktrees. Each worktree gets its own isolated checkout of the repo at a given branch/commit, so multiple agents can work independently on different features without stepping on each other. The orchestration agent manages creating worktrees, assigning tasks to sub-agents per worktree, reviewing results, and merging back.
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index 517c661..5a0ffdf 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -1628,6 +1628,75 @@ export class AgentManager {
} else {
tabAgent.completionResolve?.({ status: "error", error: processError });
}
+
+ // The turn has fully settled. If messages piled up on the queue during it
+ // and were NOT injected as a mid-turn interrupt (they arrived after the
+ // last tool call, or this turn had no tool calls), kick off a fresh turn
+ // to answer them instead of letting them sit unanswered — the queue is
+ // consumed, not just appended. Only on a clean finish: a turn the user
+ // explicitly stopped, or one that errored out, leaves its queue intact
+ // for the next deliberate send (see continueFromQueue).
+ if (processError === null) {
+ this.continueFromQueue(tabId);
+ }
+ }
+
+ /**
+ * Start a new turn for any messages that accumulated on `tabId`'s queue
+ * during the turn that just finished. This is what makes a queued message
+ * (from a user OR another agent via send_to_tab) actually get a response
+ * after the agent's current turn ends, rather than waiting forever.
+ *
+ * Loop safety: a queued-then-continued turn draws from the SAME
+ * `autoWakeBudget` that bounds agent-to-agent wakes. Every human-originated
+ * message refills that budget when it is delivered (see deliverMessage), so
+ * human conversations are never throttled; only a runaway agent<->agent
+ * chain (A queues B, B queues A, ...) is capped. When the budget is spent
+ * the messages stay queued and a notice is emitted; the next human message
+ * refills the budget and starts their turn.
+ */
+ private continueFromQueue(tabId: string): void {
+ const tabAgent = this.tabAgents.get(tabId);
+ if (!tabAgent) return;
+ if (tabAgent.messageQueue.length === 0) return;
+ // Never auto-continue a turn the user stopped or one that errored.
+ if (tabAgent.status === "error") return;
+ if (tabAgent.abortController?.signal.aborted) return;
+
+ if (tabAgent.autoWakeBudget <= 0) {
+ // Budget spent — hold the queued messages (don't drop them) until a
+ // human message refills the budget. Prevents unbounded agent loops.
+ const notice =
+ `Automatic continuation limit reached for this tab ` +
+ `(${MAX_AGENT_AUTO_WAKES} consecutive turns). Queued messages are held ` +
+ `until you send a message here.`;
+ this.emit({ type: "notice", message: notice }, tabId);
+ this.routeSystemEventToTab(tabId, "notice", notice);
+ return;
+ }
+ tabAgent.autoWakeBudget -= 1;
+
+ // Drain the queue as a "continuation" so the frontend folds the pending
+ // queued bubbles into this NEW turn's initiating user row (rather than
+ // into a running turn's tool result, which is the "interrupt" case).
+ const drained = this.dequeueMessages(tabId, "continuation");
+ if (drained.length === 0) return;
+ const message = drained.map((m) => m.message).join("\n---\n");
+
+ // Reuse the tab's resolved key/model/fallback chain — the continuation is
+ // the same conversation, just a new turn. Fire-and-forget: if more
+ // messages arrive during it, its own tail will continue the chain.
+ this.processMessage(
+ tabId,
+ message,
+ tabAgent.keyId ?? undefined,
+ tabAgent.modelId ?? undefined,
+ undefined,
+ undefined,
+ tabAgent.agentModels,
+ ).catch((err) => {
+ console.error(`[dispatch] continueFromQueue processMessage error for tab ${tabId}:`, err);
+ });
}
private buildFallbackSequence(
@@ -1673,13 +1742,19 @@ export class AgentManager {
return true;
}
- dequeueMessages(tabId: string): QueuedMessage[] {
+ dequeueMessages(
+ tabId: string,
+ reason: "interrupt" | "continuation" = "interrupt",
+ ): QueuedMessage[] {
const tabAgent = this.tabAgents.get(tabId);
if (!tabAgent) return [];
const messages = [...tabAgent.messageQueue];
tabAgent.messageQueue = [];
if (messages.length > 0) {
- this.emit({ type: "message-consumed", tabId, messageIds: messages.map((m) => m.id) }, tabId);
+ this.emit(
+ { type: "message-consumed", tabId, messageIds: messages.map((m) => m.id), reason },
+ tabId,
+ );
}
return messages;
}
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index 1358eb1..b9b4510 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -1129,6 +1129,131 @@ describe("AgentManager", () => {
});
});
+ describe("queue continuation after a turn ends", () => {
+ // A run generator that enqueues `msg` (as if a user/agent sent it mid-turn)
+ // exactly once, then streams a normal short reply. Used to simulate a
+ // message landing on the queue while the agent is busy.
+ function runThatEnqueues(manager: AgentManager, tabId: string, msg: string): RunGen {
+ let enqueued = false;
+ return async function* () {
+ yield { type: "status", status: "running" } as const;
+ if (!enqueued) {
+ enqueued = true;
+ manager.queueMessage(tabId, msg);
+ }
+ yield { type: "text-delta", delta: "reply" } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "reply" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ };
+ }
+
+ it("starts a NEW turn for a message queued during the turn (the bug fix)", async () => {
+ const manager = new AgentManager();
+ const processSpy = vi.spyOn(manager, "processMessage");
+ setRunImpl(runThatEnqueues(manager, "tab-cont", "follow-up question"));
+
+ await manager.processMessage("tab-cont", "first");
+ // Let the fire-and-forget continuation turn run to completion.
+ await new Promise<void>((r) => setTimeout(r, 50));
+
+ // processMessage called twice: the original turn + the continuation.
+ expect(processSpy).toHaveBeenCalledTimes(2);
+ expect(processSpy.mock.calls[1]?.[0]).toBe("tab-cont");
+ expect(processSpy.mock.calls[1]?.[1]).toBe("follow-up question");
+
+ // Queue is drained and the tab is idle again.
+ const inner = manager as unknown as {
+ tabAgents: Map<string, { messageQueue: unknown[] }>;
+ };
+ expect(inner.tabAgents.get("tab-cont")?.messageQueue).toHaveLength(0);
+ expect(manager.getTabStatus("tab-cont")).toBe("idle");
+ });
+
+ it('emits message-consumed with reason "continuation" when draining between turns', async () => {
+ const manager = new AgentManager();
+ const events: AgentEvent[] = [];
+ manager.onEvent((e) => events.push(e));
+ setRunImpl(runThatEnqueues(manager, "tab-evt", "next"));
+
+ await manager.processMessage("tab-evt", "first");
+ await new Promise<void>((r) => setTimeout(r, 50));
+
+ const consumed = events.find((e) => e.type === "message-consumed") as
+ | (AgentEvent & { reason?: string })
+ | undefined;
+ expect(consumed).toBeDefined();
+ expect(consumed?.reason).toBe("continuation");
+ });
+
+ it("does NOT continue when the queue is empty after a clean turn", async () => {
+ const manager = new AgentManager();
+ const processSpy = vi.spyOn(manager, "processMessage");
+
+ await manager.processMessage("tab-noqueue", "only message");
+ await new Promise<void>((r) => setTimeout(r, 30));
+
+ expect(processSpy).toHaveBeenCalledTimes(1); // no continuation
+ });
+
+ it("does NOT continue a turn the user stopped (queue is preserved)", async () => {
+ const manager = new AgentManager();
+ const processSpy = vi.spyOn(manager, "processMessage");
+ // Run that enqueues then aborts itself via stopTab to mimic a user stop.
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ manager.queueMessage("tab-stop", "should wait");
+ manager.stopTab("tab-stop");
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [] },
+ } as const;
+ });
+
+ await manager.processMessage("tab-stop", "go");
+ await new Promise<void>((r) => setTimeout(r, 30));
+
+ // Only the original turn ran; the queued message is preserved, unanswered.
+ expect(processSpy).toHaveBeenCalledTimes(1);
+ const inner = manager as unknown as {
+ tabAgents: Map<string, { messageQueue: unknown[] }>;
+ };
+ expect(inner.tabAgents.get("tab-stop")?.messageQueue).toHaveLength(1);
+ });
+
+ it("bounds runaway agent<->agent continuation via the auto-wake budget", async () => {
+ const manager = new AgentManager();
+ // A run that ALWAYS enqueues another message → would loop forever
+ // without the budget cap.
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ manager.queueMessage("tab-loop", "again and again");
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "r" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+ const processSpy = vi.spyOn(manager, "processMessage");
+
+ await manager.processMessage("tab-loop", "kick off");
+ await new Promise<void>((r) => setTimeout(r, 120));
+
+ // 1 original + at most MAX_AGENT_AUTO_WAKES (6) continuations = 7.
+ // Crucially BOUNDED, not infinite.
+ expect(processSpy.mock.calls.length).toBeLessThanOrEqual(7);
+ expect(processSpy.mock.calls.length).toBeGreaterThan(1);
+ // Budget spent; the last queued message is held, not answered.
+ const inner = manager as unknown as {
+ tabAgents: Map<string, { autoWakeBudget: number; messageQueue: unknown[] }>;
+ };
+ expect(inner.tabAgents.get("tab-loop")?.autoWakeBudget).toBe(0);
+ expect(inner.tabAgents.get("tab-loop")?.messageQueue.length).toBeGreaterThan(0);
+ });
+ });
+
describe("getLastTabResponse", () => {
it("returns the most recent assistant turn's text and current status", () => {
const manager = new AgentManager();
diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts
index c2dfef1..c6f1322 100644
--- a/packages/core/src/agent/agent.ts
+++ b/packages/core/src/agent/agent.ts
Binary files differ
diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts
index a4230ca..ced3dc2 100644
--- a/packages/core/src/types/index.ts
+++ b/packages/core/src/types/index.ts
@@ -275,7 +275,21 @@ export type AgentEvent =
agentModels?: Array<{ key_id: string; model_id: string }> | null;
}
| { type: "message-queued"; tabId: string; messageId: string; message: string }
- | { type: "message-consumed"; tabId: string; messageIds: string[] }
+ | {
+ type: "message-consumed";
+ tabId: string;
+ messageIds: string[];
+ /**
+ * Why the queue was drained:
+ * - "interrupt": consumed mid-turn, folded into a running turn's tool
+ * result as a [USER INTERRUPT]. The optimistic bubble collapses into
+ * that sealed turn.
+ * - "continuation": consumed between turns to START a new turn. The
+ * optimistic bubble becomes that new turn's initiating user row.
+ * Absent ⇒ treat as "interrupt" (back-compat).
+ */
+ reason?: "interrupt" | "continuation";
+ }
| { type: "message-cancelled"; tabId: string; messageId: string };
// ─── Tool Types ──────────────────────────────────────────────────
diff --git a/packages/core/tests/agent/agent.test.ts b/packages/core/tests/agent/agent.test.ts
index 7560443..188129c 100644
--- a/packages/core/tests/agent/agent.test.ts
+++ b/packages/core/tests/agent/agent.test.ts
@@ -219,6 +219,79 @@ describe("Agent", () => {
});
});
+ it("does NOT swallow trailing queued messages into history at turn end", async () => {
+ // Regression for the "queue not consumed after the turn ends" bug. A
+ // message that lands on the queue after the last tool call (here: a
+ // no-tool turn) must be LEFT on the queue for the orchestrator to start
+ // a new turn — not silently appended to history with no response.
+ vi.mocked(streamText).mockReturnValue(
+ makeMockStreamResult([{ type: "text-delta", id: "t0", text: "done" }, finishStop]),
+ );
+
+ const queue = [{ id: "q1", message: "answer me next", timestamp: 1 }];
+ const dequeueMessages = vi.fn(() => queue.splice(0, queue.length));
+ const agent = new Agent(makeConfig(), {
+ dequeueMessages,
+ waitForQueuedMessage: () => ({ promise: Promise.resolve(), cancel: () => {} }),
+ });
+
+ const before = agent.messages.length;
+ for await (const _ of agent.run("hello")) {
+ // consume
+ }
+
+ // The agent appended exactly the user turn + its own assistant reply;
+ // it did NOT drain the queue or append a trailing user message for it.
+ expect(dequeueMessages).not.toHaveBeenCalled();
+ expect(queue).toHaveLength(1);
+ const added = agent.messages.slice(before);
+ expect(added.map((m) => m.role)).toEqual(["user", "assistant"]);
+ expect(
+ added.some((m) => m.chunks.some((c) => c.type === "text" && c.text === "answer me next")),
+ ).toBe(false);
+ });
+
+ it("still injects a mid-turn queued message into the last tool result", async () => {
+ // The interrupt path (site 1) must be untouched by the turn-end fix: a
+ // message present DURING a tool batch is folded into that batch's last
+ // tool result as a [USER INTERRUPT], and the agent loops back to the LLM.
+ vi.mocked(streamText)
+ .mockReturnValueOnce(
+ makeMockStreamResult([
+ { type: "tool-call", toolCallId: "tc1", toolName: "read_file", input: { path: "a.txt" } },
+ finishToolCalls,
+ ]),
+ )
+ .mockReturnValueOnce(
+ makeMockStreamResult([{ type: "text-delta", id: "t0", text: "ok" }, finishStop]),
+ );
+
+ const queue = [{ id: "q1", message: "stop and do X", timestamp: 1 }];
+ const dequeueMessages = vi.fn(() => queue.splice(0, queue.length));
+ const toolDef = {
+ name: "read_file",
+ description: "reads a file",
+ parameters: z.object({ path: z.string() }),
+ execute: async () => "file contents",
+ };
+ const agent = new Agent(makeConfig({ tools: [toolDef] }), {
+ dequeueMessages,
+ waitForQueuedMessage: () => ({ promise: Promise.resolve(), cancel: () => {} }),
+ });
+
+ const events: AgentEvent[] = [];
+ for await (const event of agent.run("read it")) {
+ events.push(event);
+ }
+
+ expect(dequeueMessages).toHaveBeenCalled();
+ const toolResult = events.find((e) => e.type === "tool-result") as
+ | (AgentEvent & { toolResult: { result: string } })
+ | undefined;
+ expect(toolResult?.toolResult.result).toContain("[USER INTERRUPT]");
+ expect(toolResult?.toolResult.result).toContain("stop and do X");
+ });
+
it("yields reasoning-delta events", async () => {
vi.mocked(streamText).mockReturnValue(
makeMockStreamResult([
diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts
index 317de8d..3fd7e5f 100644
--- a/packages/frontend/src/lib/tabs.svelte.ts
+++ b/packages/frontend/src/lib/tabs.svelte.ts
@@ -1223,7 +1223,11 @@ export function createTabStore() {
}
case "message-consumed": {
if (!tabId) break;
- const mcEvent = event as AgentEvent & { tabId: string; messageIds: string[] };
+ const mcEvent = event as AgentEvent & {
+ tabId: string;
+ messageIds: string[];
+ reason?: "interrupt" | "continuation";
+ };
const mcTab = getTabById(tabId);
if (!mcTab) break;
// Track recently consumed IDs so sendMessage can detect early consumption
@@ -1234,6 +1238,46 @@ export function createTabStore() {
updateTab(tabId, {
queuedMessages: mcTab.queuedMessages.filter((m) => !mcEvent.messageIds.includes(m.id)),
});
+
+ // "continuation" — these queued messages are draining BETWEEN turns
+ // to START a fresh turn (the "queue consumed after turn ends" path),
+ // not folding into a running turn's tool result. The backend joins
+ // them into ONE initiating user row, so we collapse the matching
+ // optimistic `queued-` bubbles into a single UNTAGGED user row. It
+ // stays untagged on purpose: the imminent `turn-start` tags it as
+ // this new turn's initiator (exactly like a normal send), and
+ // reconcile then folds it into the sealed turn. Leaving N separate
+ // untagged rows would strand all but the most-recent one (turn-start
+ // only tags one), so collapsing is required.
+ if (mcEvent.reason === "continuation") {
+ updateLive(tabId, (msgs) => {
+ const consumedTexts: string[] = [];
+ const rest: ChatMessage[] = [];
+ let firstConsumedIdx = -1;
+ for (const m of msgs) {
+ if (m.role === "user" && m.id.startsWith("queued-")) {
+ const queuedId = m.id.slice(7);
+ if (mcEvent.messageIds.includes(queuedId)) {
+ if (firstConsumedIdx === -1) firstConsumedIdx = rest.length;
+ const textChunk = m.chunks.find((c) => c.type === "text");
+ consumedTexts.push(textChunk && textChunk.type === "text" ? textChunk.text : "");
+ continue;
+ }
+ }
+ rest.push(m);
+ }
+ if (consumedTexts.length === 0) return msgs;
+ const initiator: ChatMessage = {
+ id: generateId(),
+ role: "user",
+ chunks: [{ type: "text", text: consumedTexts.join("\n---\n") }],
+ };
+ rest.splice(firstConsumedIdx === -1 ? rest.length : firstConsumedIdx, 0, initiator);
+ return rest;
+ });
+ break;
+ }
+
// Split the current assistant message: finalize it, then insert
// the consumed user messages after it. Subsequent streaming events
// will create a NEW assistant message block below.
diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts
index bede2cc..285b4d2 100644
--- a/packages/frontend/src/lib/types.ts
+++ b/packages/frontend/src/lib/types.ts
@@ -198,7 +198,21 @@ export type AgentEvent =
agentModels?: Array<{ key_id: string; model_id: string }> | null;
}
| { type: "message-queued"; tabId: string; messageId: string; message: string }
- | { type: "message-consumed"; tabId: string; messageIds: string[] }
+ | {
+ type: "message-consumed";
+ tabId: string;
+ messageIds: string[];
+ /**
+ * Why the queue was drained:
+ * - "interrupt": consumed mid-turn, folded into a running turn's tool
+ * result as a [USER INTERRUPT]. The optimistic bubble collapses into
+ * that sealed turn.
+ * - "continuation": consumed between turns to START a new turn. The
+ * optimistic bubble becomes that new turn's initiating user row.
+ * Absent ⇒ treat as "interrupt" (back-compat).
+ */
+ reason?: "interrupt" | "continuation";
+ }
| { type: "message-cancelled"; tabId: string; messageId: string };
export interface TaskItem {
diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts
index b9d37f2..ea718eb 100644
--- a/packages/frontend/tests/chat-store.test.ts
+++ b/packages/frontend/tests/chat-store.test.ts
@@ -1460,6 +1460,88 @@ describe("tabStore — chunk-native eviction / pagination / reconcile", () => {
expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]);
});
+ it("a continuation-consumed queued message becomes the next turn's initiator", async () => {
+ // The turn-end fix: a message queued during turn-a is drained AFTER the
+ // turn ends (reason: "continuation") to START turn-b. Its optimistic
+ // `queued-` bubble must collapse into a single UNTAGGED user row so the
+ // imminent turn-b `turn-start` tags it as that turn's initiator — and it
+ // then folds cleanly into turn-b's sealed chunks (no linger, no dup).
+ const sealedB = [
+ chunkRow("ua", "cc", 0, "turn-a", "user", "text", { text: "first" }),
+ chunkRow("aa", "cc", 1, "turn-a", "assistant", "text", { text: "first answer" }),
+ chunkRow("ub", "cc", 2, "turn-b", "user", "text", { text: "next please" }),
+ chunkRow("ab", "cc", 3, "turn-b", "assistant", "text", { text: "second answer" }),
+ ];
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.split("?")[0]?.endsWith("/tabs/cc/chunks"))
+ return Promise.resolve(chunksResponse(sealedB, 4));
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ store.handleEvent({
+ type: "tab-created",
+ id: "cc",
+ title: "CC",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ });
+ // Turn A streams; user queues a follow-up while it runs.
+ store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "cc" });
+ store.handleEvent({ type: "text-delta", delta: "first answer", tabId: "cc" });
+ store.handleEvent({
+ type: "message-queued",
+ tabId: "cc",
+ messageId: "q1",
+ message: "next please",
+ });
+ let tab = store.tabs.find((t) => t.id === "cc");
+ expect(tab?.live.some((m) => m.id === "queued-q1")).toBe(true);
+
+ // Turn A ends. The backend drains the queue as a CONTINUATION (not an
+ // interrupt) and emits message-consumed{reason:"continuation"}.
+ store.handleEvent({
+ type: "message-consumed",
+ tabId: "cc",
+ messageIds: ["q1"],
+ reason: "continuation",
+ });
+ tab = store.tabs.find((t) => t.id === "cc");
+ // The queued- bubble collapsed into ONE plain (untagged, un-prefixed) user row.
+ expect(tab?.live.some((m) => m.id === "queued-q1")).toBe(false);
+ const initiator = tab?.live.find((m) => m.role === "user");
+ expect(initiator).toBeTruthy();
+ expect(initiator?.id.startsWith("queued-")).toBe(false);
+ expect(initiator?.turnId).toBeUndefined();
+ expect(tab?.queuedMessages.some((m) => m.id === "q1")).toBe(false);
+
+ // turn-a seals first (it was the running turn when the queue drained).
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "cc" });
+ // Now turn-b starts — it must tag the collapsed initiator row.
+ store.handleEvent({ type: "turn-start", turnId: "turn-b", tabId: "cc" });
+ tab = store.tabs.find((t) => t.id === "cc");
+ const taggedInitiator = tab?.live.find((m) => m.role === "user" && m.turnId === "turn-b");
+ expect(taggedInitiator).toBeTruthy();
+
+ store.handleEvent({ type: "text-delta", delta: "second answer", tabId: "cc" });
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-b", tabId: "cc" });
+ await tick();
+ tab = store.tabs.find((t) => t.id === "cc");
+ // Both turns are durable; the live tail is empty (initiator folded into
+ // turn-b, no lingering/duplicated user bubble).
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1, 2, 3]);
+ expect(tab?.live.length).toBe(0);
+ expect(tab?.renderGroups.map((m) => m.role)).toEqual([
+ "user",
+ "assistant",
+ "user",
+ "assistant",
+ ]);
+ });
+
it("preserves a concurrent newer turn when an earlier deferred reconcile flushes", async () => {
const sealedA = [
chunkRow("ua", "c", 0, "turn-a", "user", "text", { text: "A?" }),