summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-05-30 23:14:55 +0900
committerAdam Malczewski <[email protected]>2026-05-30 23:14:55 +0900
commit624b808da0f2f8bbad8a4fbbcca3f82f24ecfc47 (patch)
tree869d34092345344ff13953398f876c8b38c8116a
parentb19f1aafc43141a865ecd40a813ed3212e77d95e (diff)
downloaddispatch-624b808da0f2f8bbad8a4fbbcca3f82f24ecfc47.tar.gz
dispatch-624b808da0f2f8bbad8a4fbbcca3f82f24ecfc47.zip
feat(chunks): chunk-native frontend store with turn-sealed reconcile + per-chunk eviction
Replace the stored ChatMessage[] with a chunk-native model: tab.chunks (sealed ChunkRow[]) + tab.live (transient in-flight turn buffer) + derived tab.renderGroups. This enables per-chunk eviction (trimming WITHIN a large turn) and raw-chunk pagination (loadOlderChunks), removing the whole-message eviction limitation. Backend: - Emit turn-start/turn-sealed around each turn; expose currentTurnId in the status snapshot. turn-sealed fires after the durable write (status:idle fires before it). - New GET /tabs/:id/chunks raw paginated endpoint (limit/before). - Wrap appendChunks in a single SQLite transaction. Frontend: - turn-sealed drives a turn-aware reconcile that folds the sealed turn into chunks while preserving a concurrent newer in-flight turn and pending queued messages; deferred while the user is scrolled up. - Stable turn-scoped render keys (${turnId}:${role}:${n}) avoid remount/flash. Reconcile correctness (three review passes): - preserve a concurrent newer turn when an earlier deferred reconcile flushes; - keep optimistic queued user messages (no loss); - turn-start backfill skips pending queued rows and tags only the turn initiator; - bind consumed interrupt messages to the in-flight turn so they collapse on seal (no lingering/duplicated bubble). Tests: chat-store reconcile/eviction/pagination suite; api chunks endpoint + events.
-rw-r--r--packages/api/src/agent-manager.ts13
-rw-r--r--packages/api/src/routes/tabs.ts24
-rw-r--r--packages/api/tests/agent-manager.test.ts32
-rw-r--r--packages/api/tests/routes.test.ts13
-rw-r--r--packages/core/src/chunks/append.ts13
-rw-r--r--packages/core/src/db/chunks.ts58
-rw-r--r--packages/core/src/types/index.ts26
-rw-r--r--packages/frontend/src/lib/components/ChatPanel.svelte33
-rw-r--r--packages/frontend/src/lib/tabs.svelte.ts811
-rw-r--r--packages/frontend/src/lib/types.ts20
-rw-r--r--packages/frontend/tests/chat-store.test.ts483
11 files changed, 1099 insertions, 427 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index f7975d1..5f2027d 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -799,6 +799,9 @@ export class AgentManager {
if (tabAgent.currentAssistantId) {
snap.currentAssistantId = tabAgent.currentAssistantId;
}
+ if (tabAgent.currentTurnId) {
+ snap.currentTurnId = tabAgent.currentTurnId;
+ }
}
result[tabId] = snap;
}
@@ -1119,6 +1122,10 @@ export class AgentManager {
// chunk rows — shares one `turn_id`.
const turnId = crypto.randomUUID();
tabAgent.currentTurnId = turnId;
+ // Announce the turn so the frontend can tag its live chunks with this
+ // turn_id (stable render keys → flicker-free reconcile when the turn
+ // seals). Emitted before any content delta.
+ this.emit({ type: "turn-start", turnId }, tabId);
appendChunks(tabId, explodeUserText(turnId, message));
// Store agent models on the tab if provided (defines fallback order)
@@ -1284,6 +1291,12 @@ export class AgentManager {
this.emit({ type: "status", status: "error" }, tabId);
break;
}
+ // Turn fully settled and its chunks are now persisted (flushAssistant ran
+ // above). Signal the frontend that the turn's rows — with real seqs — are
+ // durable so it can fold its live representation into the sealed log.
+ // Emitted AFTER status:idle/error (which fire before the DB write).
+ this.emit({ type: "turn-sealed", turnId }, tabId);
+
// Turn fully settled — clear the shared turn id.
tabAgent.currentTurnId = null;
diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts
index e9265ec..b1e9659 100644
--- a/packages/api/src/routes/tabs.ts
+++ b/packages/api/src/routes/tabs.ts
@@ -93,6 +93,30 @@ tabsRoutes.get("/:id/messages", (c) => {
return c.json({ messages, total, oldestSeq });
});
+// Raw chunk window for a tab — the chunk-native frontend's load/paginate
+// source. Same `limit`/`before` chunk-`seq` windowing as `/messages`, but
+// returns the flat `ChunkRow[]` WITHOUT server-side grouping (the frontend
+// groups for render and evicts/paginates on the flat list). Dedupe on the
+// client by `seq` when overlap-fetching.
+tabsRoutes.get("/:id/chunks", (c) => {
+ const id = c.req.param("id");
+ const limitRaw = c.req.query("limit");
+ const beforeRaw = c.req.query("before");
+ const limit = limitRaw !== undefined ? Number(limitRaw) : undefined;
+ const before = beforeRaw !== undefined ? Number(beforeRaw) : undefined;
+ const options =
+ limit !== undefined || before !== undefined
+ ? {
+ ...(limit !== undefined && Number.isFinite(limit) ? { limit } : {}),
+ ...(before !== undefined && Number.isFinite(before) ? { before } : {}),
+ }
+ : undefined;
+ const chunks = getChunksForTab(id, options);
+ const oldestSeq = chunks.length > 0 ? (chunks[0]?.seq ?? null) : null;
+ const total = getTotalChunkCount(id);
+ return c.json({ chunks, total, oldestSeq });
+});
+
tabsRoutes.patch("/:id", async (c) => {
const id = c.req.param("id");
const body = await c.req.json<{
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index 6b016db..4415bbb 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -340,15 +340,43 @@ describe("AgentManager", () => {
await manager.processMessage("tab-1", "test");
expect(events.length).toBeGreaterThan(0);
- expect(events[0]).toMatchObject({ type: "status", status: "running" });
+ // A turn now opens with `turn-start`, immediately followed by the
+ // agent's `status: running`.
+ expect(events[0]).toMatchObject({ type: "turn-start" });
+ expect(events[1]).toMatchObject({ type: "status", status: "running" });
+ // A turn now closes with `turn-sealed` (emitted after the DB write, which
+ // is after the agent's final `status: idle`).
const lastEvent = events[events.length - 1];
- expect(lastEvent).toMatchObject({ type: "status", status: "idle" });
+ expect(lastEvent).toMatchObject({ type: "turn-sealed" });
+ expect(events.some((e) => e.type === "status" && e.status === "idle")).toBe(true);
const doneEvent = events.find((e) => e.type === "done");
expect(doneEvent).toBeDefined();
});
+ it("emits a turn-start with a turnId before any content event", async () => {
+ const manager = new AgentManager();
+ const events: AgentEvent[] = [];
+ manager.onEvent((event) => {
+ events.push(event);
+ });
+
+ await manager.processMessage("tab-turnstart", "go");
+
+ const turnStartIdx = events.findIndex((e) => e.type === "turn-start");
+ expect(turnStartIdx).toBeGreaterThanOrEqual(0);
+ const turnStart = events[turnStartIdx] as Extract<AgentEvent, { type: "turn-start" }>;
+ expect(typeof turnStart.turnId).toBe("string");
+ expect(turnStart.turnId.length).toBeGreaterThan(0);
+
+ // Must precede the first content delta.
+ const firstContentIdx = events.findIndex(
+ (e) => e.type === "text-delta" || e.type === "reasoning-delta",
+ );
+ expect(firstContentIdx).toBeGreaterThan(turnStartIdx);
+ });
+
it("emits text-delta events during processMessage", async () => {
const manager = new AgentManager();
const events: AgentEvent[] = [];
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index f4de845..4b8dd40 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -335,6 +335,19 @@ describe("POST /chat", () => {
});
});
+describe("GET /tabs/:id/chunks", () => {
+ it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => {
+ const res = await app.request("/tabs/tab-x/chunks?limit=50");
+ expect(res.status).toBe(200);
+ const body = await res.json();
+ // Mocked getChunksForTab returns [] → empty window, null cursor.
+ expect(Array.isArray(body.chunks)).toBe(true);
+ expect(body.chunks).toEqual([]);
+ expect(body.total).toBe(0);
+ expect(body.oldestSeq).toBeNull();
+ });
+});
+
describe("POST /chat/stop", () => {
it("returns 200 with success: true for valid tabId", async () => {
const res = await app.request("/chat/stop", {
diff --git a/packages/core/src/chunks/append.ts b/packages/core/src/chunks/append.ts
index baccd10..4bc6b5f 100644
--- a/packages/core/src/chunks/append.ts
+++ b/packages/core/src/chunks/append.ts
@@ -6,7 +6,7 @@
* backend (agent + agent-manager) and the frontend store call this helper
* so the wire format stays in lockstep across the boundary.
*
- * Open/close rules — see plan-chunk-refactor.md for the full table.
+ * Open/close rules — see notes/plan-chunk-refactor.md for the full table.
*
* | Chunk | Opens on | Coalesces |
* |---------------|-----------------------------------------------------------------|------------------------------------------------------------|
@@ -35,9 +35,10 @@
* (no unsealed thinking chunk) are dropped.
*
* Ignored events:
- * - `status`, `done`, `usage`, `task-list-update`, `tab-created`,
- * `message-queued`, `message-consumed`, `message-cancelled` — these are
- * control / lifecycle events, not message content.
+ * - `status`, `turn-start`, `turn-sealed`, `done`, `usage`,
+ * `task-list-update`, `tab-created`, `message-queued`, `message-consumed`,
+ * `message-cancelled` — these are control / lifecycle events, not message
+ * content.
*/
import type {
@@ -200,6 +201,8 @@ export function appendEventToChunks(chunks: Chunk[], event: AgentEvent): void {
// Lifecycle / control events — no chunk emitted.
case "status":
+ case "turn-start":
+ case "turn-sealed":
case "done":
case "usage":
case "task-list-update":
@@ -251,7 +254,7 @@ export interface SystemEventLike {
* in flight*. (When a turn IS in flight, the caller should instead use
* `appendEventToChunks` against the in-flight message's chunks directly.)
*
- * Routing rules (from plan-chunk-refactor.md):
+ * Routing rules (from notes/plan-chunk-refactor.md):
*
* 1. Most recent message is `role: "system"` → append a `system` chunk
* to it. (Note: a second consecutive system event creates a second
diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts
index 6841eb5..077259d 100644
--- a/packages/core/src/db/chunks.ts
+++ b/packages/core/src/db/chunks.ts
@@ -52,32 +52,38 @@ export function appendChunks(tabId: string, drafts: ChunkRowDraft[]): ChunkRow[]
VALUES ($id, $tabId, $seq, $turnId, $step, $role, $type, $dataJson, $now)`,
);
const out: ChunkRow[] = [];
- for (const draft of drafts) {
- const id = randomUUID();
- insert.run({
- $id: id,
- $tabId: tabId,
- $seq: seq,
- $turnId: draft.turnId,
- $step: draft.step,
- $role: draft.role,
- $type: draft.type,
- $dataJson: JSON.stringify(draft.data),
- $now: now,
- });
- out.push({
- id,
- tabId,
- seq,
- turnId: draft.turnId,
- step: draft.step,
- role: draft.role,
- type: draft.type,
- data: draft.data,
- createdAt: now,
- });
- seq++;
- }
+ // Wrap the whole batch in one transaction: a turn's chunks are persisted in
+ // a single `appendChunks` call, so this is one fsync per turn instead of one
+ // per row — the chosen low-IO write strategy for constrained backends.
+ const insertAll = db.transaction(() => {
+ for (const draft of drafts) {
+ const id = randomUUID();
+ insert.run({
+ $id: id,
+ $tabId: tabId,
+ $seq: seq,
+ $turnId: draft.turnId,
+ $step: draft.step,
+ $role: draft.role,
+ $type: draft.type,
+ $dataJson: JSON.stringify(draft.data),
+ $now: now,
+ });
+ out.push({
+ id,
+ tabId,
+ seq,
+ turnId: draft.turnId,
+ step: draft.step,
+ role: draft.role,
+ type: draft.type,
+ data: draft.data,
+ createdAt: now,
+ });
+ seq++;
+ }
+ });
+ insertAll();
return out;
}
diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts
index 3fcdb40..a4230ca 100644
--- a/packages/core/src/types/index.ts
+++ b/packages/core/src/types/index.ts
@@ -10,7 +10,7 @@ export type MessageRole = "user" | "assistant" | "system";
* preserves the actual temporal ordering of text, reasoning, tool calls,
* system notices, and errors as they arrived from the model.
*
- * Coalescing rules (see plan-chunk-refactor.md):
+ * Coalescing rules (see notes/plan-chunk-refactor.md):
* - `text` and `thinking` coalesce on consecutive same-type deltas.
* - `tool-batch` coalesces on consecutive `tool-call` events
* (appends a new entry to `calls`).
@@ -199,10 +199,34 @@ export interface TabStatusSnapshot {
status: AgentStatus;
currentChunks?: Chunk[];
currentAssistantId?: string;
+ /**
+ * `turn_id` of the in-flight turn. Present iff `status === "running"`.
+ * Lets a frontend that reconnects mid-stream key its live chunks the same
+ * way `turn-start` would, so they reconcile cleanly when the turn seals.
+ */
+ currentTurnId?: string;
}
export type AgentEvent =
| { type: "status"; status: AgentStatus }
+ /**
+ * Emitted once at the start of a turn (`processMessage`), before any
+ * content deltas. Carries the `turn_id` shared by this turn's user message
+ * and every assistant/tool chunk row. The frontend tags its in-flight
+ * (live) chunks with this id so they key-match the sealed rows on
+ * turn-completion reconcile (no remount/flicker). Display/sync only — not
+ * conversation content.
+ */
+ | { type: "turn-start"; turnId: string }
+ /**
+ * Emitted once after a turn has fully settled AND its chunks have been
+ * persisted (after `flushAssistant`). Signals the frontend that the turn's
+ * rows — with real `seq`s — are now durable and can be reloaded, so it can
+ * fold its transient live representation into the sealed chunk log. Emitted
+ * after `status: idle`/`error` (which fire before the DB write). Display/sync
+ * only — not conversation content.
+ */
+ | { type: "turn-sealed"; turnId: string }
| { type: "text-delta"; delta: string }
| { type: "reasoning-delta"; delta: string }
/**
diff --git a/packages/frontend/src/lib/components/ChatPanel.svelte b/packages/frontend/src/lib/components/ChatPanel.svelte
index 8170d43..abdec17 100644
--- a/packages/frontend/src/lib/components/ChatPanel.svelte
+++ b/packages/frontend/src/lib/components/ChatPanel.svelte
@@ -8,9 +8,26 @@ let userScrolledUp = $state(false);
let isAutoScrolling = false;
let isLoadingMore = $state(false);
-const messages = $derived(tabStore.activeTab?.messages ?? []);
+const renderGroups = $derived(tabStore.activeTab?.renderGroups ?? []);
const activeTabId = $derived(tabStore.activeTab?.id);
+// Stable, turn-scoped render keys. A bubble's identity is `${turnId}:${role}:${n}`
+// (n = its index among same-(turn,role) messages) rather than the underlying
+// row/client id, so when the live turn reconciles into sealed chunk rows the
+// bubble keeps its identity and does NOT remount (no flash). Falls back to the
+// message id for anything without a turnId (e.g. optimistic/queued messages
+// before turn-start, standalone system notices).
+const keyedMessages = $derived.by(() => {
+ const counts = new Map<string, number>();
+ return renderGroups.map((m) => {
+ if (!m.turnId) return { m, key: m.id };
+ const base = `${m.turnId}:${m.role}`;
+ const n = counts.get(base) ?? 0;
+ counts.set(base, n + 1);
+ return { m, key: `${base}:${n}` };
+ });
+});
+
function isNearBottom(el: HTMLElement): boolean {
return el.scrollHeight - el.scrollTop - el.clientHeight < 64;
}
@@ -35,7 +52,7 @@ async function onNearTop() {
const prevScrollHeight = messagesEl?.scrollHeight ?? 0;
const prevScrollTop = messagesEl?.scrollTop ?? 0;
try {
- await tabStore.loadMoreMessages(tab.id);
+ await tabStore.loadOlderChunks(tab.id);
// Wait for Svelte to flush the prepended messages into the DOM.
// Reading `scrollHeight` synchronously after the await would observe
// the OLD layout (reactive updates are batched), so the scroll
@@ -66,7 +83,7 @@ function handleScroll() {
if (activeTabId) tabStore.setScrolledUp(activeTabId, userScrolledUp);
// User just scrolled back to the bottom manually — safe to evict now.
if (wasScrolledUp && !userScrolledUp && activeTabId) {
- tabStore.evictMessages(activeTabId);
+ tabStore.evictChunks(activeTabId);
}
// Near the top — pull in older history.
if (userScrolledUp && messagesEl.scrollTop < 200) {
@@ -79,13 +96,13 @@ function resumeAutoScroll() {
isAutoScrolling = true;
if (activeTabId) {
tabStore.setScrolledUp(activeTabId, false);
- tabStore.evictMessages(activeTabId);
+ tabStore.evictChunks(activeTabId);
}
scrollToBottom(true);
}
$effect(() => {
- const count = messages.length;
+ const count = renderGroups.length;
void count;
if (messagesEl) {
untrack(() => {
@@ -121,13 +138,13 @@ $effect(() => {
{#if isLoadingMore}
<div class="text-center text-xs text-base-content/40 py-2">Loading earlier messages...</div>
{/if}
- {#if messages.length === 0}
+ {#if renderGroups.length === 0}
<div class="flex items-center justify-center h-full text-base-content/40 text-sm">
Send a message to start a conversation
</div>
{/if}
- {#each messages as message (message.id)}
- <ChatMessageComponent {message} tabId={activeTabId} />
+ {#each keyedMessages as { m, key } (key)}
+ <ChatMessageComponent message={m} tabId={activeTabId} />
{/each}
</div>
diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts
index c57f800..e303c80 100644
--- a/packages/frontend/src/lib/tabs.svelte.ts
+++ b/packages/frontend/src/lib/tabs.svelte.ts
@@ -8,6 +8,10 @@ import {
type IdentifiedMessage,
type SystemEventLike,
} from "@dispatch/core/src/chunks/append.js";
+// DB-free; safe in the browser bundle. The flat chunk log is the frontend's
+// source of truth for HISTORY; `groupRowsToMessages` derives render bubbles.
+import { groupRowsToMessages, type MessageRow } from "@dispatch/core/src/chunks/transform.js";
+import type { ChunkRow } from "@dispatch/core/src/types/index.js";
import { config } from "./config.js";
import { appSettings } from "./settings.svelte.js";
import type {
@@ -35,34 +39,118 @@ function generateId(): string {
});
}
+function makeDebugInfo(overrides: Partial<DebugInfo> = {}): DebugInfo {
+ return {
+ timestamp: new Date().toISOString(),
+ connectionStatus: wsClient.connectionStatus,
+ ...overrides,
+ };
+}
+
+// ─── Chunk-log → render projection ───────────────────────────────
+//
+// History lives as a flat `ChunkRow[]` (sealed, real seq). For rendering we
+// group it into bubbles with `groupRowsToMessages` (pairs tool_call+tool_result
+// by callId, wraps a turn's assistant chunks) — a pure, ephemeral view, never
+// stored as the source of truth.
+
+/** Map a grouped chunk-row message to a render `ChatMessage`. */
+function rowGroupToMessage(m: MessageRow): ChatMessage {
+ return {
+ id: m.id,
+ role: m.role,
+ chunks: m.chunks,
+ isStreaming: false,
+ seq: m.seq,
+ turnId: m.turnId,
+ };
+}
+
/**
- * Extract the smallest `seq` from a list of raw API message rows. The backend
- * tags each persisted message with a monotonic `seq`; we track the oldest one
- * currently loaded so `loadMoreMessages` can page backwards via `?before=seq`.
- * Returns null when no row carries a usable seq.
+ * The render view for a tab: grouped sealed chunks followed by the transient
+ * live tail (current unsealed turn). This is what the chat panel renders.
*/
-function oldestSeqOf(messages: Array<{ seq?: number }>): number | null {
+function deriveRenderGroups(chunks: ChunkRow[], live: ChatMessage[]): ChatMessage[] {
+ const sealed = groupRowsToMessages(chunks).map(rowGroupToMessage);
+ return live.length > 0 ? [...sealed, ...live] : sealed;
+}
+
+/** Total chunk count of the live tail (for the eviction budget). */
+function countLiveChunks(live: ChatMessage[]): number {
+ return live.reduce((sum, m) => sum + m.chunks.length, 0);
+}
+
+/** Smallest `seq` among sealed chunk rows, or null when empty. */
+function minSeqOf(chunks: ChunkRow[]): number | null {
let min: number | null = null;
- for (const m of messages) {
- if (typeof m.seq === "number" && (min === null || m.seq < min)) {
- min = m.seq;
- }
+ for (const c of chunks) {
+ if (typeof c.seq === "number" && (min === null || c.seq < min)) min = c.seq;
}
return min;
}
-function makeDebugInfo(overrides: Partial<DebugInfo> = {}): DebugInfo {
- return {
- timestamp: new Date().toISOString(),
- connectionStatus: wsClient.connectionStatus,
- ...overrides,
- };
+/** Merge older chunk rows into a window, dedupe by `seq`, keep ascending. */
+function mergeChunksBySeq(existing: ChunkRow[], incoming: ChunkRow[]): ChunkRow[] {
+ const bySeq = new Map<number, ChunkRow>();
+ for (const c of existing) bySeq.set(c.seq, c);
+ for (const c of incoming) bySeq.set(c.seq, c);
+ return [...bySeq.values()].sort((a, b) => a.seq - b.seq);
+}
+
+/** Fetch a raw chunk window from the backend (the chunk-native load source). */
+async function fetchChunkWindow(
+ tabId: string,
+ params: { limit?: number; before?: number } = {},
+): Promise<{ ok: boolean; chunks: ChunkRow[]; total: number; oldestSeq: number | null }> {
+ const qs = new URLSearchParams();
+ if (params.limit !== undefined) qs.set("limit", String(params.limit));
+ if (params.before !== undefined) qs.set("before", String(params.before));
+ const q = qs.toString();
+ try {
+ const res = await fetch(`${config.apiBase}/tabs/${tabId}/chunks${q ? `?${q}` : ""}`);
+ if (!res.ok) return { ok: false, chunks: [], total: 0, oldestSeq: null };
+ const data = (await res.json()) as {
+ chunks?: ChunkRow[];
+ total?: number;
+ oldestSeq?: number | null;
+ };
+ const chunks = Array.isArray(data.chunks) ? data.chunks : [];
+ return {
+ ok: true,
+ chunks,
+ total: data.total ?? chunks.length,
+ oldestSeq: data.oldestSeq ?? minSeqOf(chunks),
+ };
+ } catch {
+ return { ok: false, chunks: [], total: 0, oldestSeq: null };
+ }
}
export interface Tab {
id: string;
title: string;
- messages: ChatMessage[];
+ /**
+ * SEALED conversation history as a flat chunk log (real per-tab `seq`).
+ * The source of truth for history and the unit of eviction + pagination.
+ */
+ chunks: ChunkRow[];
+ /**
+ * Transient render buffer for the CURRENT (unsealed) turn only: the
+ * optimistic user message, the in-flight assistant turn (folded from
+ * stream deltas), queued/consumed user messages, interrupt splits. Tiny and
+ * short-lived — cleared and folded into `chunks` (via refetch) the moment
+ * the turn seals. NOT stored history.
+ */
+ live: ChatMessage[];
+ /**
+ * Materialized render projection = groupRowsToMessages(chunks) ++ live,
+ * recomputed by `updateTab` after any change to `chunks`/`live`. A derived
+ * cache for the view layer — NOT the source of truth, never the
+ * eviction/pagination unit.
+ */
+ renderGroups: ChatMessage[];
+ /** turn_id of the in-flight turn (stable render keys + reconcile). */
+ liveTurnId: string | null;
agentStatus: "idle" | "running" | "error";
keyId: string | null;
modelId: string | null;
@@ -70,26 +158,18 @@ export interface Tab {
currentAssistantId: string | null;
tasks: TaskItem[];
injectedSkills: string[];
- /** null = user-owned tab, string = spawned by that tab */
parentTabId: string | null;
- /** Persistent tabs stay until manually closed. Temp tabs disappear when agent finishes. */
persistent: boolean;
- /** Slug of the selected agent, or null for manual mode */
agentSlug: string | null;
- /** Scope of the selected agent */
agentScope: string | null;
- /** Ordered key+model fallback hierarchy from the selected agent */
agentModels: Array<{ key_id: string; model_id: string }> | null;
- /** Custom working directory override for this tab */
workingDirectory: string | null;
- /** Messages queued to be sent once the agent finishes its current run */
queuedMessages: QueuedMessage[];
- /** Max chunks to keep in memory before evicting oldest messages */
chunkLimit: number;
- /** Seq of the oldest message currently loaded, or null if unknown/none */
+ /** Smallest `seq` currently in `chunks` — the backward-pagination cursor. */
oldestLoadedSeq: number | null;
- /** Total number of messages for this tab on the backend */
- totalMessages: number;
+ /** Total chunk count for this tab on the backend (drives "more to load?"). */
+ totalChunks: number;
/**
* Cumulative prompt-cache token telemetry for this tab since the page
* loaded (in-memory only — resets on reload). Undefined until the first
@@ -126,6 +206,12 @@ export function createTabStore() {
// `setScrolledUp`. A `force` eviction ignores this set entirely.
const scrolledUpTabs = new Set<string>();
+ // tabId → the turn_id whose reconcile was deferred because the user was
+ // scrolled up. A Map (not a Set) so the deferred flush knows which turn
+ // sealed and can preserve a newer turn that started streaming meanwhile.
+ // Flushed when they return to the bottom so we don't yank their viewport.
+ const pendingReconcileTabs = new Map<string, string>();
+
// Clear any stale listeners from HMR reloads, then register
wsClient.clearCallbacks();
wsClient.onEvent((event) => {
@@ -164,7 +250,10 @@ export function createTabStore() {
const tab: Tab = {
id,
title,
- messages: [],
+ chunks: [],
+ live: [],
+ renderGroups: [],
+ liveTurnId: null,
agentStatus: "idle",
keyId: null,
modelId: null,
@@ -181,7 +270,7 @@ export function createTabStore() {
queuedMessages: [],
chunkLimit: appSettings.chunkLimit,
oldestLoadedSeq: null,
- totalMessages: 0,
+ totalChunks: 0,
};
tabs = [...tabs, tab];
activeTabId = id;
@@ -229,35 +318,17 @@ export function createTabStore() {
parentTabId?: string | null;
};
- const messagesRes = await fetch(`${config.apiBase}/tabs/${agentId}/messages?limit=100`);
- // `GET /messages` windows the flat chunk log (last N chunks) and
- // groups the rows into render messages (`groupRowsToMessages` in
- // packages/core/src/chunks/transform.ts), serving them as `chunks`
- // per message over the wire — NOT a raw JSON string.
- const messagesData = messagesRes.ok
- ? ((await messagesRes.json()) as {
- messages: Array<{
- id?: string;
- role: string;
- chunks?: Chunk[];
- seq?: number;
- }>;
- total?: number;
- })
- : { messages: [], total: 0 };
-
- const chatMessages: ChatMessage[] = messagesData.messages.map((m) => ({
- id: m.id ?? generateId(),
- role: m.role as ChatMessage["role"],
- chunks: Array.isArray(m.chunks) ? m.chunks : [],
- isStreaming: false,
- seq: m.seq,
- }));
+ // Load the tail of the flat chunk log (raw rows — the frontend groups
+ // for render and evicts/paginates on the flat list).
+ const win = await fetchChunkWindow(agentId, { limit: 100 });
const newTab: Tab = {
id: agentId,
title: tabData.title,
- messages: chatMessages,
+ chunks: win.chunks,
+ live: [],
+ renderGroups: deriveRenderGroups(win.chunks, []),
+ liveTurnId: null,
agentStatus: "idle",
keyId: tabData.keyId ?? null,
modelId: tabData.modelId ?? null,
@@ -273,12 +344,12 @@ export function createTabStore() {
workingDirectory: null,
queuedMessages: [],
chunkLimit: appSettings.chunkLimit,
- oldestLoadedSeq: oldestSeqOf(messagesData.messages),
- totalMessages: messagesData.total ?? messagesData.messages.length,
+ oldestLoadedSeq: win.oldestSeq,
+ totalChunks: win.total,
};
tabs = [...tabs, newTab];
activeTabId = agentId;
- evictMessages(agentId);
+ evictChunks(agentId);
} catch (err) {
console.error("openAgentTab failed:", err);
}
@@ -312,7 +383,17 @@ export function createTabStore() {
}
function updateTab(id: string, patch: Partial<Tab>): void {
- tabs = tabs.map((t) => (t.id === id ? { ...t, ...patch } : t));
+ tabs = tabs.map((t) => {
+ if (t.id !== id) return t;
+ const next = { ...t, ...patch };
+ // `renderGroups` is a derived cache: recompute it whenever its inputs
+ // (`chunks` / `live`) change so the view layer never reads a stale
+ // projection. Callers only ever mutate `chunks`/`live`.
+ if ("chunks" in patch || "live" in patch) {
+ next.renderGroups = deriveRenderGroups(next.chunks, next.live);
+ }
+ return next;
+ });
}
/**
@@ -321,26 +402,56 @@ export function createTabStore() {
* messages — we don't want to delete what they're currently looking at.
*/
function setScrolledUp(tabId: string, scrolledUp: boolean): void {
- if (scrolledUp) scrolledUpTabs.add(tabId);
- else scrolledUpTabs.delete(tabId);
+ if (scrolledUp) {
+ scrolledUpTabs.add(tabId);
+ } else {
+ scrolledUpTabs.delete(tabId);
+ // Returned to the bottom — run any reconcile we deferred while reading.
+ const deferredTurnId = pendingReconcileTabs.get(tabId);
+ if (deferredTurnId !== undefined) {
+ pendingReconcileTabs.delete(tabId);
+ reconcileSealedTurn(tabId, deferredTurnId);
+ }
+ }
}
/**
- * Trim a tab's in-memory message history down to its `chunkLimit`.
- *
- * Counts the total number of chunks across all messages and removes the
- * oldest messages (from the front of the array) until the count is at or
- * below the limit — but never drops below a coherent conversation bottom:
- * - the in-flight streaming assistant message is always pinned;
- * - the most recent user+assistant pair (last 2 messages) is always pinned.
- *
- * Evicted messages are fully removed from `tab.messages` — there is no
- * stub or hidden cache; scrolling up re-fetches them via `loadMoreMessages`.
- *
- * Eviction is suppressed while the user is scrolled up (reading history),
- * unless `force` is true.
+ * Drop up to `n` of the oldest chunks from the live tail (front-to-back
+ * across its messages), never removing the chunk currently being streamed
+ * (the last chunk of the in-flight assistant message). Emptied messages are
+ * dropped. Used only when a single in-flight turn alone exceeds the budget.
+ */
+ function trimLiveChunks(
+ live: ChatMessage[],
+ n: number,
+ streamingId: string | null,
+ ): ChatMessage[] {
+ let remaining = n;
+ const out = live.map((m) => ({ ...m, chunks: [...m.chunks] }));
+ for (const m of out) {
+ if (remaining <= 0) break;
+ const isStreamingMsg = m.id === streamingId || m.isStreaming === true;
+ while (m.chunks.length > 0 && remaining > 0) {
+ // Keep the last (open) chunk of the actively streaming message.
+ if (isStreamingMsg && m.chunks.length === 1) break;
+ m.chunks.shift();
+ remaining--;
+ }
+ }
+ return out.filter((m) => m.chunks.length > 0);
+ }
+
+ /**
+ * Bound a tab's in-memory footprint to `chunkLimit` by rolling eviction of
+ * the OLDEST chunks. Sealed history (`tab.chunks`) is trimmed from the front
+ * first; if a single in-flight turn alone still exceeds the budget, the
+ * oldest chunks of the live tail are trimmed too (never the chunk currently
+ * being streamed). Evicted sealed chunks are re-fetched on scroll-up via
+ * `loadOlderChunks`; live chunks that haven't sealed yet are recovered by
+ * the turn-completion reconcile once their write lands. Suppressed while
+ * scrolled up unless `force` is set.
*/
- function evictMessages(tabId: string, force = false): void {
+ function evictChunks(tabId: string, force = false): void {
const tab = getTabById(tabId);
if (!tab) return;
if (!force && scrolledUpTabs.has(tabId)) return;
@@ -348,132 +459,61 @@ export function createTabStore() {
const limit = appSettings.chunkLimit;
if (!Number.isFinite(limit) || limit <= 0) return;
- const countChunks = (msgs: ChatMessage[]) => msgs.reduce((sum, m) => sum + m.chunks.length, 0);
-
- // Work on a shallow copy we can shift from.
- const working = [...tab.messages];
- let total = countChunks(working);
- let removed = false;
-
- // `chunkLimit` is a SOFT target for trimming OLD history, not a hard
- // cap. We always keep at least the latest user+assistant pair (the
- // `working.length > 2` floor) so the view is never blank and an answer
- // always has its preceding question on screen. Consequently a single
- // very long turn (one message holding e.g. 150 chunks) can briefly push
- // the in-memory total above `limit` — that is intentional and accepted.
- // We never trim chunks from WITHIN a message: messages are the
- // persistence/pagination unit (the backend stores whole rows keyed by
- // `seq` and `loadMoreMessages` pages by whole messages via `?before=`),
- // so a partially-trimmed message would just be re-fetched from the DB
- // and bounce back, and each message's `role` would be lost in a flat
- // chunk array. Whole-message eviction from the front is the correct
- // granularity.
- while (total > limit && working.length > 2) {
- const candidate = working[0];
- if (!candidate) break;
- // Never evict the in-flight streaming assistant message.
- if (candidate.isStreaming || candidate.id === tab.currentAssistantId) break;
- working.shift();
- total -= candidate.chunks.length;
- removed = true;
+ let sealed = tab.chunks;
+ let live = tab.live;
+ let total = sealed.length + countLiveChunks(live);
+ if (total <= limit) return;
+
+ // 1. Drop oldest sealed chunk rows from the front.
+ if (sealed.length > 0) {
+ let dropTo = 0;
+ while (total > limit && dropTo < sealed.length) {
+ dropTo++;
+ total--;
+ }
+ if (dropTo > 0) sealed = sealed.slice(dropTo);
+ }
+
+ // 2. Still over budget → one live turn exceeds the limit on its own.
+ if (total > limit && live.length > 0) {
+ live = trimLiveChunks(live, total - limit, tab.currentAssistantId);
}
- if (!removed) return;
- // Recalculate oldestLoadedSeq from remaining messages after eviction so
- // the `?before=` pagination cursor doesn't point at an evicted seq.
- const remainingSeq = (working as Array<ChatMessage & { seq?: number }>).reduce<number | null>(
- (min, m) => (typeof m.seq === "number" && (min === null || m.seq < min) ? m.seq : min),
- null,
- );
updateTab(tabId, {
- messages: working,
- oldestLoadedSeq: remainingSeq ?? tab.oldestLoadedSeq,
+ chunks: sealed,
+ live,
+ oldestLoadedSeq: minSeqOf(sealed) ?? tab.oldestLoadedSeq,
});
}
/**
- * Fetch and prepend the next page of older messages for a tab. Called when
- * the user scrolls toward the top. Pages backwards using the oldest loaded
- * `seq` (`?before=`). Does NOT trigger eviction — the user is reading
- * history, so we keep everything they've pulled in.
+ * Fetch and prepend the next older page of CHUNKS (raw rows). Called when
+ * the user scrolls toward the top. Pages backward by the oldest loaded
+ * `seq` (`?before=`), dedupes by `seq`, and keeps the window seq-sorted —
+ * so a turn split across the window boundary regroups into one bubble with
+ * no special-casing. Does NOT evict (the user is reading history).
*/
- async function loadMoreMessages(tabId: string): Promise<void> {
+ async function loadOlderChunks(tabId: string): Promise<void> {
const tab = getTabById(tabId);
if (!tab) return;
-
- const beforeParam = tab.oldestLoadedSeq !== null ? `&before=${tab.oldestLoadedSeq}` : "";
- try {
- const res = await fetch(`${config.apiBase}/tabs/${tabId}/messages?limit=50${beforeParam}`);
- if (!res.ok) return;
- const data = (await res.json()) as {
- messages?: Array<{
- id?: string;
- role: string;
- chunks?: Chunk[];
- seq?: number;
- turnId?: string;
- }>;
- total?: number;
- oldestSeq?: number | null;
- };
- const rawMessages = data.messages ?? [];
- if (rawMessages.length === 0) {
- // Nothing older to load; record the total if provided.
- if (typeof data.total === "number") {
- updateTab(tabId, { totalMessages: data.total });
- }
- return;
- }
-
- const older: ChatMessage[] = rawMessages.map((m) => ({
- id: m.id ?? generateId(),
- role: m.role as ChatMessage["role"],
- chunks: Array.isArray(m.chunks) ? m.chunks : [],
- isStreaming: false,
- seq: m.seq,
- ...(m.turnId !== undefined ? { turnId: m.turnId } : {}),
- }));
-
- const current = getTabById(tabId);
- if (!current) return;
-
- // Chunk-granular pagination can split ONE turn across the window
- // boundary: the oldest message already loaded and the newest message
- // in this older page may share a turn_id. Merge them (older chunks
- // first) so the turn renders as one bubble instead of duplicating.
- const merged = [...current.messages];
- const lastOlder = older[older.length - 1];
- const firstCurrent = merged[0];
- if (
- lastOlder &&
- firstCurrent &&
- lastOlder.turnId !== undefined &&
- lastOlder.turnId === firstCurrent.turnId &&
- lastOlder.role === firstCurrent.role
- ) {
- older.pop();
- merged[0] = {
- ...firstCurrent,
- id: lastOlder.id,
- seq: lastOlder.seq,
- turnId: lastOlder.turnId,
- chunks: [...lastOlder.chunks, ...firstCurrent.chunks],
- };
- }
-
- // Avoid duplicating messages we already have loaded.
- const existingIds = new Set(merged.map((m) => m.id));
- const toPrepend = older.filter((m) => !existingIds.has(m.id));
-
- const newOldestSeq = data.oldestSeq ?? oldestSeqOf(rawMessages);
- updateTab(tabId, {
- messages: [...toPrepend, ...merged],
- oldestLoadedSeq: newOldestSeq ?? current.oldestLoadedSeq,
- totalMessages: data.total ?? current.totalMessages,
- });
- } catch (err) {
- console.warn("[loadMoreMessages] failed:", err);
+ const before = tab.oldestLoadedSeq;
+ const win = await fetchChunkWindow(tabId, {
+ limit: 50,
+ ...(before !== null ? { before } : {}),
+ });
+ const current = getTabById(tabId);
+ if (!current) return;
+ if (win.chunks.length === 0) {
+ // Nothing older; refresh the total if the backend reported a real one.
+ if (win.total > 0) updateTab(tabId, { totalChunks: win.total });
+ return;
}
+ const merged = mergeChunksBySeq(current.chunks, win.chunks);
+ updateTab(tabId, {
+ chunks: merged,
+ oldestLoadedSeq: minSeqOf(merged),
+ totalChunks: win.total,
+ });
}
function ensureAssistantMessage(tabId: string): ChatMessage | null {
@@ -481,7 +521,7 @@ export function createTabStore() {
if (!tab) return null;
if (tab.currentAssistantId) {
- const existing = tab.messages.find((m) => m.id === tab.currentAssistantId);
+ const existing = tab.live.find((m) => m.id === tab.currentAssistantId);
if (existing) return existing;
}
@@ -491,19 +531,24 @@ export function createTabStore() {
role: "assistant",
chunks: [],
isStreaming: true,
+ ...(tab.liveTurnId !== null ? { turnId: tab.liveTurnId } : {}),
};
updateTab(tabId, {
currentAssistantId: id,
- messages: [...tab.messages, newMsg],
+ live: [...tab.live, newMsg],
});
- evictMessages(tabId);
+ evictChunks(tabId);
return newMsg;
}
- function updateMessages(tabId: string, updater: (msgs: ChatMessage[]) => ChatMessage[]): void {
+ /**
+ * Update the live tail (the current unsealed turn). All streaming handlers
+ * operate here; sealed history (`tab.chunks`) is never touched by streaming.
+ */
+ function updateLive(tabId: string, updater: (live: ChatMessage[]) => ChatMessage[]): void {
const tab = getTabById(tabId);
if (!tab) return;
- updateTab(tabId, { messages: updater(tab.messages) });
+ updateTab(tabId, { live: updater(tab.live) });
}
/**
@@ -516,7 +561,7 @@ export function createTabStore() {
* `$state.snapshot` (Svelte's own safe clone — strips reactive proxies and
* falls back gracefully where native `structuredClone` would throw
* `DataCloneError` on a `$state` proxy), mutate the snapshot, then write
- * it back through `updateMessages`. The previous use of `structuredClone`
+ * it back through `updateLive`. The previous use of `structuredClone`
* here threw silently and was swallowed by the WS try/catch — left chunks
* empty for every streaming turn.
*/
@@ -526,7 +571,7 @@ export function createTabStore() {
if (!tab) return;
const currentId = tab.currentAssistantId;
if (!currentId) return;
- updateMessages(tabId, (msgs) =>
+ updateLive(tabId, (msgs) =>
msgs.map((m) => {
if (m.id !== currentId) return m;
const cloned = $state.snapshot(m.chunks) as Chunk[];
@@ -538,6 +583,8 @@ export function createTabStore() {
return { ...m, chunks: cloned, isStreaming: true };
}),
);
+ // A chunk may have just completed — keep the in-memory footprint bounded.
+ evictChunks(tabId);
}
/**
@@ -548,20 +595,22 @@ export function createTabStore() {
function routeSystemEvent(tabId: string, sysEvent: SystemEventLike): void {
const tab = getTabById(tabId);
if (!tab) return;
- // We need to mutate the messages array (applySystemEvent does in-place
- // push). Build a shallow-cloned IdentifiedMessage[] view via
- // `$state.snapshot` (safe against Svelte 5 reactive proxies; native
- // `structuredClone` would throw), run the helper, then write it back.
- const view: IdentifiedMessage[] = tab.messages.map((m) => ({
+ // Operate on the live tail (applySystemEvent appends a system chunk to
+ // the trailing system message or creates one). Build a shallow-cloned
+ // IdentifiedMessage[] view via `$state.snapshot` (safe against Svelte 5
+ // reactive proxies; native `structuredClone` would throw), run the
+ // helper, then write it back. The backend persists this system row too,
+ // so it reconciles into `chunks` on the next turn/load.
+ const view: IdentifiedMessage[] = tab.live.map((m) => ({
id: m.id,
role: m.role,
chunks: $state.snapshot(m.chunks) as Chunk[],
}));
applySystemEvent(view, sysEvent, generateId);
- // Reconcile: rebuild the ChatMessage array from the view, preserving
- // existing message metadata (isStreaming, debugInfo) where IDs match.
- const byId = new Map(tab.messages.map((m) => [m.id, m]));
+ // Reconcile: rebuild the live array from the view, preserving existing
+ // message metadata (isStreaming, debugInfo) where IDs match.
+ const byId = new Map(tab.live.map((m) => [m.id, m]));
const rebuilt: ChatMessage[] = view.map((v) => {
const existing = byId.get(v.id);
if (existing) {
@@ -574,40 +623,74 @@ export function createTabStore() {
isStreaming: false,
};
});
- updateTab(tabId, { messages: rebuilt });
+ updateTab(tabId, { live: rebuilt });
}
/**
- * Reload a tab's messages from the API. Used after a WS reconnect when
- * we detect the backend finished work while we were disconnected — the
- * persisted chunks are the source of truth; in-memory state may be
- * missing events.
+ * Reload a tab's chunk window from the API and fold the sealed turn out of
+ * the live tail. The persisted chunk log is the source of truth. Two modes:
+ * - turn-completion reconcile (`preserveActiveTurn=true`, `sealedTurnId`
+ * set): the just-sealed turn's rows now carry real seqs. Drop that turn
+ * from `live`, but PRESERVE (a) a newer turn that began streaming while a
+ * reconcile was deferred — the queued-message race — and (b) optimistic
+ * user messages not yet bound to a turn, so neither is wiped.
+ * - WS-reconnect desync (`preserveActiveTurn=false`): the backend has moved
+ * on and is idle, so trust the DB fully and clear the live tail.
+ * A failed fetch is a no-op (never wipes a populated tab).
*/
- async function reloadTabMessagesFromApi(tabId: string): Promise<void> {
- try {
- const res = await fetch(`${config.apiBase}/tabs/${tabId}/messages?limit=100`);
- if (!res.ok) return;
- const data = (await res.json()) as {
- messages: Array<{ id?: string; role: string; chunks?: Chunk[]; seq?: number }>;
- total?: number;
- };
- const reloaded: ChatMessage[] = data.messages.map((m) => ({
- id: m.id ?? generateId(),
- role: m.role as ChatMessage["role"],
- chunks: Array.isArray(m.chunks) ? m.chunks : [],
- isStreaming: false,
- seq: m.seq,
- }));
- updateTab(tabId, {
- messages: reloaded,
- currentAssistantId: null,
- oldestLoadedSeq: oldestSeqOf(data.messages),
- totalMessages: data.total ?? reloaded.length,
- });
- evictMessages(tabId);
- } catch (err) {
- console.warn("[reloadTabMessagesFromApi] failed:", err);
+ async function reloadChunksFromApi(
+ tabId: string,
+ preserveActiveTurn = false,
+ sealedTurnId?: string,
+ ): Promise<void> {
+ const win = await fetchChunkWindow(tabId, { limit: 100 });
+ if (!win.ok) return;
+ const current = getTabById(tabId);
+ if (!current) return;
+ // A turn that started streaming AFTER the one being reconciled must not be
+ // wiped — only the sealed turn folds into `chunks`.
+ const preserveTurnId =
+ preserveActiveTurn && current.liveTurnId !== null && current.liveTurnId !== sealedTurnId
+ ? current.liveTurnId
+ : null;
+ const keptLive = preserveActiveTurn
+ ? current.live.filter(
+ (m) =>
+ (preserveTurnId !== null && m.turnId === preserveTurnId) ||
+ // Optimistic / queued user messages not yet bound to a turn.
+ (m.turnId === undefined && m.role === "user"),
+ )
+ : [];
+ const stillActive = preserveTurnId !== null;
+ updateTab(tabId, {
+ chunks: win.chunks,
+ live: keptLive,
+ liveTurnId: stillActive ? current.liveTurnId : null,
+ currentAssistantId: stillActive ? current.currentAssistantId : null,
+ oldestLoadedSeq: win.oldestSeq,
+ totalChunks: win.total,
+ });
+ evictChunks(tabId);
+ }
+
+ /**
+ * Turn-completion reconcile. On `turn-sealed`, fold the just-finished turn
+ * (`sealedTurnId`) into the sealed log by reloading the chunk window (real
+ * seqs) and dropping that turn from the live tail — while preserving any
+ * newer in-flight turn and not-yet-sealed optimistic user messages. Deferred
+ * while the user is scrolled up so the viewport isn't disturbed; re-attempted
+ * (with the same `sealedTurnId`) when they return to the bottom.
+ */
+ function reconcileSealedTurn(tabId: string, sealedTurnId: string): void {
+ const tab = getTabById(tabId);
+ if (!tab) return;
+ if (tab.live.length === 0 && tab.liveTurnId === null) return;
+ if (scrolledUpTabs.has(tabId)) {
+ pendingReconcileTabs.set(tabId, sealedTurnId);
+ return;
}
+ pendingReconcileTabs.delete(tabId);
+ void reloadChunksFromApi(tabId, true, sealedTurnId);
}
/**
@@ -675,91 +758,51 @@ export function createTabStore() {
// Non-fatal: tabs still restore with idle status.
}
- // 3. For each tab, fetch its persisted messages in parallel.
- const messageFetches = tabRows.map(async (row) => {
- try {
- const res = await fetch(`${config.apiBase}/tabs/${row.id}/messages?limit=100`);
- if (!res.ok)
- return { id: row.id, messages: [] as ChatMessage[], total: 0, oldestSeq: null };
- const data = (await res.json()) as {
- messages?: Array<{ id?: string; role: string; chunks?: Chunk[]; seq?: number }>;
- total?: number;
- };
- const rawMessages = data.messages ?? [];
- const messages: ChatMessage[] = rawMessages.map((m) => ({
- id: m.id ?? generateId(),
- role: m.role as ChatMessage["role"],
- chunks: Array.isArray(m.chunks) ? m.chunks : [],
- isStreaming: false,
- seq: m.seq,
- }));
- return {
- id: row.id,
- messages,
- total: data.total ?? messages.length,
- oldestSeq: oldestSeqOf(rawMessages),
- };
- } catch {
- return { id: row.id, messages: [] as ChatMessage[], total: 0, oldestSeq: null };
- }
- });
-
- const messagesByTab = new Map<string, ChatMessage[]>();
- const totalByTab = new Map<string, number>();
- const oldestSeqByTab = new Map<string, number | null>();
- for (const result of await Promise.all(messageFetches)) {
- messagesByTab.set(result.id, result.messages);
- totalByTab.set(result.id, result.total);
- oldestSeqByTab.set(result.id, result.oldestSeq);
+ // 3. For each tab, fetch its chunk window (raw rows) in parallel.
+ type Win = { ok: boolean; chunks: ChunkRow[]; total: number; oldestSeq: number | null };
+ const winByTab = new Map<string, Win>();
+ for (const { id, win } of await Promise.all(
+ tabRows.map(async (row) => ({
+ id: row.id,
+ win: await fetchChunkWindow(row.id, { limit: 100 }),
+ })),
+ )) {
+ winByTab.set(id, win);
}
- // 4. Build the Tab objects, splicing in the in-flight snapshot for
- // running tabs.
+ // 4. Build the Tab objects, seeding the in-flight live turn for running
+ // tabs from the status snapshot (the unsealed turn isn't in the DB
+ // yet; it reconciles into `chunks` when `turn-sealed` arrives).
const restored: Tab[] = tabRows.map((row) => {
const snap = statusMap[row.id];
- const messages = messagesByTab.get(row.id) ?? [];
+ const win: Win = winByTab.get(row.id) ?? { ok: true, chunks: [], total: 0, oldestSeq: null };
const agentStatus: Tab["agentStatus"] = snap?.status ?? "idle";
let currentAssistantId: string | null = null;
- let finalMessages = messages;
+ let liveTurnId: string | null = null;
+ let live: ChatMessage[] = [];
if (agentStatus === "running" && snap?.currentAssistantId) {
currentAssistantId = snap.currentAssistantId;
- // Find or create the in-flight assistant message. If the DB
- // already has a row with this id (the backend appended on
- // first flush and we picked it up via /tabs/:id/messages),
- // merge the snapshot chunks on top — the snapshot is the
- // live source of truth and may have chunks the DB doesn't.
- // If there's no matching row, append a new in-flight
- // assistant message holding only the snapshot chunks.
- const existingIdx = finalMessages.findIndex((m) => m.id === snap.currentAssistantId);
- if (existingIdx >= 0) {
- finalMessages = finalMessages.map((m, i) =>
- i === existingIdx
- ? {
- ...m,
- chunks: snap.currentChunks ? [...snap.currentChunks] : m.chunks,
- isStreaming: true,
- }
- : m,
- );
- } else {
- finalMessages = [
- ...finalMessages,
- {
- id: snap.currentAssistantId,
- role: "assistant",
- chunks: snap.currentChunks ? [...snap.currentChunks] : [],
- isStreaming: true,
- },
- ];
- }
+ liveTurnId = snap.currentTurnId ?? null;
+ live = [
+ {
+ id: snap.currentAssistantId,
+ role: "assistant",
+ chunks: snap.currentChunks ? [...snap.currentChunks] : [],
+ isStreaming: true,
+ ...(liveTurnId !== null ? { turnId: liveTurnId } : {}),
+ },
+ ];
}
return {
id: row.id,
title: row.title,
- messages: finalMessages,
+ chunks: win.chunks,
+ live,
+ renderGroups: deriveRenderGroups(win.chunks, live),
+ liveTurnId,
agentStatus,
keyId: row.keyId ?? null,
modelId: row.modelId ?? null,
@@ -775,15 +818,15 @@ export function createTabStore() {
workingDirectory: null,
queuedMessages: [],
chunkLimit: appSettings.chunkLimit,
- oldestLoadedSeq: oldestSeqByTab.get(row.id) ?? null,
- totalMessages: totalByTab.get(row.id) ?? finalMessages.length,
+ oldestLoadedSeq: win.oldestSeq,
+ totalChunks: win.total,
};
});
tabs = restored;
// Trim each restored tab down to the chunk limit (user starts at bottom).
for (const t of restored) {
- evictMessages(t.id);
+ evictChunks(t.id);
}
// Activate the first restored tab (the list is already ordered by
// `position` from the backend).
@@ -796,18 +839,73 @@ export function createTabStore() {
switch (event.type) {
case "status": {
- if (tabId) {
- updateTab(tabId, { agentStatus: event.status });
- if (event.status === "idle" || event.status === "error") {
- updateTab(tabId, { currentAssistantId: null });
- const tab = getTabById(tabId);
- if (tab && !tab.persistent && tabId !== activeTabId) {
- tabs = tabs.filter((t) => t.id !== tabId);
- }
+ if (!tabId) break;
+ updateTab(tabId, { agentStatus: event.status });
+ if (event.status === "idle" || event.status === "error") {
+ // Stop the streaming cursor immediately; the fold of the live
+ // tail into the sealed chunk log happens on `turn-sealed`
+ // (after the DB write lands — status fires before it).
+ updateLive(tabId, (msgs) =>
+ msgs.map((m) => (m.isStreaming ? { ...m, isStreaming: false } : m)),
+ );
+ updateTab(tabId, { currentAssistantId: null });
+ const tab = getTabById(tabId);
+ if (tab && !tab.persistent && tabId !== activeTabId) {
+ tabs = tabs.filter((t) => t.id !== tabId);
}
}
break;
}
+ case "turn-start": {
+ if (!tabId) break;
+ const tsTab = getTabById(tabId);
+ // Tag the in-flight turn. Also backfill the turn_id onto THIS
+ // turn's initiating optimistic user message — it was created on
+ // send before the turn_id was known — so it key-matches the sealed
+ // user row after reconcile (flicker-free; no remount).
+ //
+ // A turn-start corresponds to exactly one persisted user row
+ // (processMessage → explodeUserText), and a queued message never
+ // gets its own turn-start (it is drained into a running turn via
+ // message-consumed). So the initiator is the single most-recent
+ // NON-queued untagged user row. We must NOT tag pending `queued-`
+ // rows: they belong to future turns, and tagging them here would
+ // wipe them from the UI when THIS turn seals (reconcile drops live
+ // rows bound to the sealed turn).
+ const taggedLive = tsTab
+ ? (() => {
+ const live = [...tsTab.live];
+ for (let i = live.length - 1; i >= 0; i--) {
+ const m = live[i];
+ // Stop at the first non-user row (assistant/system
+ // boundary): earlier user rows belong to prior turns.
+ if (!m || m.role !== "user") break;
+ // Skip past pending queued messages (future turns).
+ if (m.id.startsWith("queued-")) continue;
+ // Most-recent non-queued user row = this turn's
+ // initiator. Tag it once (if untagged), then stop.
+ if (m.turnId === undefined) {
+ live[i] = { ...m, turnId: event.turnId };
+ }
+ break;
+ }
+ return live;
+ })()
+ : undefined;
+ updateTab(tabId, {
+ liveTurnId: event.turnId,
+ ...(taggedLive ? { live: taggedLive } : {}),
+ });
+ break;
+ }
+ case "turn-sealed": {
+ if (!tabId) break;
+ // The turn's rows are now durable — fold THIS turn out of the live
+ // tail into the sealed chunk log (refetch real seqs), preserving any
+ // newer in-flight turn. Deferred while scrolled up.
+ reconcileSealedTurn(tabId, event.turnId);
+ break;
+ }
case "statuses": {
// WS (re)connect snapshot. The shape was widened to
// TabStatusSnapshot (status + optional currentChunks +
@@ -819,10 +917,10 @@ export function createTabStore() {
const backendStatus = snap?.status ?? "idle";
// Desync case: frontend thought it was streaming, backend
- // has already moved on. Pull the persisted chunks so the
- // final answer shows up.
+ // has already moved on. The turn is persisted now — reload
+ // the chunk window so the final answer shows up.
if (t.agentStatus === "running" && backendStatus !== "running") {
- void reloadTabMessagesFromApi(t.id);
+ void reloadChunksFromApi(t.id);
}
// Status alignment.
@@ -838,8 +936,11 @@ export function createTabStore() {
// in-memory currentChunks.
if (snap?.currentAssistantId) {
const targetId = snap.currentAssistantId;
- updateTab(t.id, { currentAssistantId: targetId });
- updateMessages(t.id, (msgs) => {
+ updateTab(t.id, {
+ currentAssistantId: targetId,
+ ...(snap.currentTurnId ? { liveTurnId: snap.currentTurnId } : {}),
+ });
+ updateLive(t.id, (msgs) => {
const idx = msgs.findIndex((m) => m.id === targetId);
if (idx >= 0) {
return msgs.map((m, i) =>
@@ -859,13 +960,14 @@ export function createTabStore() {
role: "assistant",
chunks: snap.currentChunks ? [...snap.currentChunks] : [],
isStreaming: true,
+ ...(snap.currentTurnId ? { turnId: snap.currentTurnId } : {}),
},
];
});
}
} else if (t.currentAssistantId) {
// Not running: clear streaming flags.
- updateMessages(t.id, (msgs) =>
+ updateLive(t.id, (msgs) =>
msgs.map((m) => (m.id === t.currentAssistantId ? { ...m, isStreaming: false } : m)),
);
updateTab(t.id, { currentAssistantId: null });
@@ -910,7 +1012,7 @@ export function createTabStore() {
if (!tabId) break;
const tab5 = getTabById(tabId);
if (!tab5) break;
- updateMessages(tabId, (msgs) =>
+ updateLive(tabId, (msgs) =>
msgs.map((m) => (m.id === tab5.currentAssistantId ? { ...m, isStreaming: false } : m)),
);
updateTab(tabId, { currentAssistantId: null });
@@ -925,7 +1027,7 @@ export function createTabStore() {
// assistant message via the shared helper. Mark debug info
// on the message for parity with the previous behavior.
applyChunkEvent(tabId, event);
- updateMessages(tabId, (msgs) =>
+ updateLive(tabId, (msgs) =>
msgs.map((m) =>
m.id === errTab.currentAssistantId
? {
@@ -946,7 +1048,7 @@ export function createTabStore() {
const afterTab = getTabById(tabId);
if (afterTab?.currentAssistantId) {
const newId = afterTab.currentAssistantId;
- updateMessages(tabId, (msgs) =>
+ updateLive(tabId, (msgs) =>
msgs.map((m) =>
m.id === newId
? {
@@ -1036,7 +1138,10 @@ export function createTabStore() {
const tab: Tab = {
id: newTabEvent.id,
title: newTabEvent.title,
- messages: [],
+ chunks: [],
+ live: [],
+ renderGroups: [],
+ liveTurnId: null,
agentStatus: "running",
keyId: newTabEvent.keyId ?? null,
modelId: newTabEvent.modelId ?? null,
@@ -1053,7 +1158,7 @@ export function createTabStore() {
queuedMessages: [],
chunkLimit: appSettings.chunkLimit,
oldestLoadedSeq: null,
- totalMessages: 0,
+ totalChunks: 0,
};
tabs = [...tabs, tab];
}
@@ -1075,9 +1180,9 @@ export function createTabStore() {
timestamp: Date.now(),
};
updateTab(tabId, { queuedMessages: [...mqTab.queuedMessages, qm] });
- // Also add as a user chat message if not already present
+ // Also add as a user message in the live tail if not present.
const tabAfterQm = getTabById(tabId);
- const existingMsg = tabAfterQm?.messages.find(
+ const existingMsg = tabAfterQm?.live.find(
(m) => m.id === `queued-${mqEvent.messageId}` || m.id === mqEvent.messageId,
);
if (!existingMsg) {
@@ -1086,7 +1191,7 @@ export function createTabStore() {
role: "user",
chunks: [{ type: "text", text: mqEvent.message }],
};
- updateTab(tabId, { messages: [...(tabAfterQm?.messages ?? []), userMsg] });
+ updateTab(tabId, { live: [...(tabAfterQm?.live ?? []), userMsg] });
}
}
// If alreadyQueued, the optimistic update already put everything in place with the
@@ -1110,7 +1215,7 @@ export function createTabStore() {
// the consumed user messages after it. Subsequent streaming events
// will create a NEW assistant message block below.
const currentAssistantId = mcTab.currentAssistantId;
- updateMessages(tabId, (msgs) => {
+ updateLive(tabId, (msgs) => {
// Extract consumed messages
const consumed: ChatMessage[] = [];
const rest: ChatMessage[] = [];
@@ -1118,7 +1223,20 @@ export function createTabStore() {
if (m.id.startsWith("queued-")) {
const queuedId = m.id.slice(7);
if (mcEvent.messageIds.includes(queuedId)) {
- consumed.push({ ...m, id: queuedId });
+ // Bind the consumed message to the in-flight turn that is
+ // consuming it. Stripping the `queued-` prefix alone leaves
+ // it an UNTAGGED user row, which reconcileSealedTurn KEEPS —
+ // so the interrupt bubble would linger in the live tail
+ // forever AND duplicate the `[USER INTERRUPT]` text the
+ // backend folds into the sealed tool-result chunk. Tagging
+ // it lets reconcile drop it on seal, collapsing to the
+ // persisted shape. (liveTurnId is set for the duration of a
+ // running turn, which is the only time a consume happens.)
+ consumed.push({
+ ...m,
+ id: queuedId,
+ ...(mcTab.liveTurnId !== null ? { turnId: mcTab.liveTurnId } : {}),
+ });
continue;
}
}
@@ -1154,7 +1272,7 @@ export function createTabStore() {
if (!cancelTab) break;
updateTab(tabId, {
queuedMessages: cancelTab.queuedMessages.filter((m) => m.id !== cancelEvent.messageId),
- messages: cancelTab.messages.filter(
+ live: cancelTab.live.filter(
(m) => !(m.role === "user" && m.id === `queued-${cancelEvent.messageId}`),
),
});
@@ -1377,8 +1495,11 @@ export function createTabStore() {
];
}
- updateTab(tab.id, { messages: [...tab.messages, userMsg] }); // Generate title from first user message
- if (tab.messages.length === 0 || (tab.messages.length === 1 && tab.title === "New Tab")) {
+ // Optimistically show the user's message in the live tail.
+ updateTab(tab.id, { live: [...tab.live, userMsg] });
+ // Generate a title from the first user message of an empty tab.
+ const isFirstMessage = tab.chunks.length === 0 && tab.live.length === 0;
+ if (isFirstMessage || tab.title === "New Tab") {
const titleText = text.length > 50 ? `${text.slice(0, 47)}...` : text;
updateTab(tab.id, { title: titleText });
fetch(`${config.apiBase}/tabs/${tab.id}`, {
@@ -1445,7 +1566,7 @@ export function createTabStore() {
queuedMessages: currentTab.queuedMessages.filter((m) => m.id !== queueId),
});
}
- updateMessages(tab.id, (msgs) =>
+ updateLive(tab.id, (msgs) =>
msgs.map((m) => (m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m)),
);
}
@@ -1466,7 +1587,7 @@ export function createTabStore() {
httpBody: body,
}),
};
- updateTab(tab.id, { messages: [...(getTabById(tab.id)?.messages ?? []), errMsg] });
+ updateTab(tab.id, { live: [...(getTabById(tab.id)?.live ?? []), errMsg] });
} else {
const responseData = (await res.json()) as { status: string; messageId?: string };
if (responseData.status === "queued" && responseData.messageId) {
@@ -1489,7 +1610,7 @@ export function createTabStore() {
});
}
// Restore the message to a normal (non-queued) ID
- updateMessages(tab.id, (msgs) =>
+ updateLive(tab.id, (msgs) =>
msgs.map((m) => (m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m)),
);
}
@@ -1504,7 +1625,7 @@ export function createTabStore() {
queuedMessages: currentTab.queuedMessages.filter((m) => m.id !== queueId),
});
}
- updateMessages(tab.id, (msgs) =>
+ updateLive(tab.id, (msgs) =>
msgs.map((m) => (m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m)),
);
}
@@ -1515,7 +1636,7 @@ export function createTabStore() {
isStreaming: false,
debugInfo: makeDebugInfo({ error: err instanceof Error ? err.message : String(err) }),
};
- updateTab(tab.id, { messages: [...(getTabById(tab.id)?.messages ?? []), errMsg] });
+ updateTab(tab.id, { live: [...(getTabById(tab.id)?.live ?? []), errMsg] });
}
}
@@ -1641,9 +1762,7 @@ export function createTabStore() {
if (tab) {
updateTab(tabId, {
queuedMessages: tab.queuedMessages.filter((m) => m.id !== messageId),
- messages: tab.messages.filter(
- (m) => !(m.role === "user" && m.id === `queued-${messageId}`),
- ),
+ live: tab.live.filter((m) => !(m.role === "user" && m.id === `queued-${messageId}`)),
});
}
try {
@@ -1684,7 +1803,7 @@ export function createTabStore() {
// for it — which is the canonical symptom of a wire-format / load
// failure. Always include this so bug reports are diagnosable from
// the paste alone, without DB access.
- const summarizeChunks = (chunks: (typeof tab.messages)[number]["chunks"]) => {
+ const summarizeChunks = (chunks: (typeof tab.renderGroups)[number]["chunks"]) => {
if (chunks.length === 0) return "chunks=0";
const parts = chunks.map((c) => {
if (c.type === "tool-batch") return `tool-batch[${c.calls.length}]`;
@@ -1713,7 +1832,7 @@ export function createTabStore() {
`Connected to backend: ${isConnected}`,
`Tab agentStatus: ${tab.agentStatus}`,
`Tab currentAssistantId: ${tab.currentAssistantId ?? "null"}`,
- `Messages in store: ${tab.messages.length}`,
+ `Render groups in store: ${tab.renderGroups.length}`,
`Queued messages: ${tab.queuedMessages.length}`,
`Persistent: ${tab.persistent}`,
`Working directory: ${tab.workingDirectory ?? "default"}`,
@@ -1723,7 +1842,7 @@ export function createTabStore() {
];
const TOOL_RESULT_MAX = 300;
- for (const msg of tab.messages) {
+ for (const msg of tab.renderGroups) {
const role = msg.role === "user" ? "User" : msg.role === "system" ? "System" : "Assistant";
const streamingFlag = msg.isStreaming ? ", streaming=true" : "";
// Inline message diagnostics — id, streaming, chunk summary —
@@ -1826,8 +1945,8 @@ export function createTabStore() {
// components — they should rely on the WS subscription instead.
handleEvent,
hydrateFromBackend,
- loadMoreMessages,
- evictMessages,
+ loadOlderChunks,
+ evictChunks,
setScrolledUp,
};
}
diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts
index 6e87aec..bede2cc 100644
--- a/packages/frontend/src/lib/types.ts
+++ b/packages/frontend/src/lib/types.ts
@@ -89,6 +89,11 @@ export interface SystemChunk {
text: string;
}
+/**
+ * A render bubble. NOT stored state — it's a derived projection of the flat
+ * chunk log (`groupRowsToMessages(tab.chunks)`) concatenated with the transient
+ * live tail. The store's source of truth for history is `tab.chunks: ChunkRow[]`.
+ */
export interface ChatMessage {
id: string;
role: "user" | "assistant" | "system";
@@ -97,10 +102,9 @@ export interface ChatMessage {
debugInfo?: DebugInfo;
seq?: number;
/**
- * turn_id of the chunk rows this message was grouped from (history loaded
- * from the backend). Used by `loadMoreMessages` to merge a turn that was
- * split across the chunk-pagination window boundary. Absent for live
- * (streaming) messages built client-side.
+ * turn_id of the chunk rows this message was grouped from (or the in-flight
+ * turn for a live message). Gives a stable, turn-scoped render key so the
+ * bubble doesn't remount when the live turn reconciles into sealed chunks.
*/
turnId?: string;
}
@@ -125,10 +129,18 @@ export interface TabStatusSnapshot {
status: "idle" | "running" | "error";
currentChunks?: Chunk[];
currentAssistantId?: string;
+ /** turn_id of the in-flight turn; present iff status === "running". */
+ currentTurnId?: string;
}
export type AgentEvent =
| { type: "status"; status: "idle" | "running" | "error" }
+ // Opens a turn before any content delta; carries the turn_id used to tag
+ // the live chunks so they reconcile cleanly when the turn seals.
+ | { type: "turn-start"; turnId: string }
+ // Fires after the turn settled AND its chunks were persisted (after the DB
+ // write, post status:idle). Triggers the frontend's reconcile-from-DB.
+ | { type: "turn-sealed"; turnId: string }
// Sent on every WS (re)connect: a snapshot of every tab the backend is
// currently tracking and its live status. The frontend uses this to
// detect desync after a reconnect (e.g. bun --watch restart killed the
diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts
index c485fc7..b9d37f2 100644
--- a/packages/frontend/tests/chat-store.test.ts
+++ b/packages/frontend/tests/chat-store.test.ts
@@ -71,6 +71,7 @@ beforeEach(() => {
);
});
+import { appSettings } from "../src/lib/settings.svelte.js";
import { createTabStore } from "../src/lib/tabs.svelte.js";
import type { Chunk, PermissionPrompt } from "../src/lib/types.js";
import { wsClient } from "../src/lib/ws.svelte.js";
@@ -92,17 +93,34 @@ async function setupStoreWithTab() {
*/
function getAssistantChunks(store: ReturnType<typeof createTabStore>): Chunk[] | undefined {
const tab = store.tabs[0];
- const assistant = tab?.messages.find((m) => m.role === "assistant");
+ const assistant = tab?.renderGroups.find((m) => m.role === "assistant");
return assistant?.chunks;
}
+/**
+ * Build a raw `ChunkRow` as the `GET /tabs/:id/chunks` endpoint returns them.
+ * The chunk-native store loads/paginates raw rows and groups them for render.
+ */
+function chunkRow(
+ id: string,
+ tabId: string,
+ seq: number,
+ turnId: string,
+ role: "user" | "assistant" | "tool" | "system",
+ type: "text" | "thinking" | "tool_call" | "tool_result" | "error" | "system",
+ data: unknown,
+ step = 0,
+): Record<string, unknown> {
+ return { id, tabId, seq, turnId, step, role, type, data, createdAt: seq };
+}
+
describe("tabStore — streaming chunk flow (real $state)", () => {
it("text-delta creates a streaming assistant message and appends deltas", async () => {
const { store, tabId } = await setupStoreWithTab();
store.handleEvent({ type: "text-delta", delta: "Hello", tabId });
- const assistant = store.tabs[0]?.messages.find((m) => m.role === "assistant");
+ const assistant = store.tabs[0]?.renderGroups.find((m) => m.role === "assistant");
expect(assistant).toBeDefined();
expect(assistant?.isStreaming).toBe(true);
expect(assistant?.chunks).toEqual([{ type: "text", text: "Hello" }]);
@@ -204,7 +222,7 @@ describe("tabStore — streaming chunk flow (real $state)", () => {
message: { role: "assistant", chunks: [] },
tabId,
} as Parameters<typeof store.handleEvent>[0]);
- const assistant = store.tabs[0]?.messages.find((m) => m.role === "assistant");
+ const assistant = store.tabs[0]?.renderGroups.find((m) => m.role === "assistant");
expect(assistant?.chunks).toEqual([{ type: "text", text: "partial" }]);
expect(assistant?.isStreaming).toBe(false);
expect(store.tabs[0]?.currentAssistantId).toBeNull();
@@ -315,7 +333,8 @@ describe("tabStore — streaming chunk flow (real $state)", () => {
it("error event with no in-flight turn opens a fresh assistant message", async () => {
const { store, tabId } = await setupStoreWithTab();
store.handleEvent({ type: "error", error: "boom", tabId });
- const assistantMessages = store.tabs[0]?.messages.filter((m) => m.role === "assistant") ?? [];
+ const assistantMessages =
+ store.tabs[0]?.renderGroups.filter((m) => m.role === "assistant") ?? [];
expect(assistantMessages.length).toBeGreaterThanOrEqual(1);
const errChunks = assistantMessages[assistantMessages.length - 1]?.chunks;
expect(errChunks).toHaveLength(1);
@@ -342,7 +361,7 @@ describe("tabStore — streaming chunk flow (real $state)", () => {
it("notice with no turn in flight creates a role:system message", async () => {
const { store, tabId } = await setupStoreWithTab();
store.handleEvent({ type: "notice", message: "standalone", tabId });
- const systemMsg = store.tabs[0]?.messages.find((m) => m.role === "system");
+ const systemMsg = store.tabs[0]?.renderGroups.find((m) => m.role === "system");
expect(systemMsg).toBeDefined();
const chunks = systemMsg?.chunks;
expect(chunks).toHaveLength(1);
@@ -355,7 +374,7 @@ describe("tabStore — streaming chunk flow (real $state)", () => {
const { store, tabId } = await setupStoreWithTab();
store.handleEvent({ type: "notice", message: "first", tabId });
store.handleEvent({ type: "notice", message: "second", tabId });
- const sysMsgs = store.tabs[0]?.messages.filter((m) => m.role === "system") ?? [];
+ const sysMsgs = store.tabs[0]?.renderGroups.filter((m) => m.role === "system") ?? [];
expect(sysMsgs).toHaveLength(1);
expect(sysMsgs[0]?.chunks).toHaveLength(2);
});
@@ -455,7 +474,7 @@ describe("tabStore — reactivity contract", () => {
expect(store.tabs[0]?.agentStatus).toBe("idle");
expect(store.tabs[0]?.currentAssistantId).toBeNull();
// The streaming flag on the in-flight message should be cleared.
- const assistant = store.tabs[0]?.messages.find((m) => m.role === "assistant");
+ const assistant = store.tabs[0]?.renderGroups.find((m) => m.role === "assistant");
expect(assistant?.isStreaming).toBe(false);
});
});
@@ -702,26 +721,24 @@ describe("hydrateFromBackend", () => {
json: () => Promise.resolve({ statuses: {} }),
});
}
- if (url.split("?")[0]?.endsWith("/tabs/t1/messages")) {
+ if (url.split("?")[0]?.endsWith("/tabs/t1/chunks")) {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
- messages: [
- { id: "m1", role: "user", chunks: [{ type: "text", text: "hello" }] },
- {
- id: "m2",
- role: "assistant",
- chunks: [{ type: "text", text: "hi back" }],
- },
+ chunks: [
+ chunkRow("m1", "t1", 0, "u1", "user", "text", { text: "hello" }),
+ chunkRow("m2", "t1", 1, "u1", "assistant", "text", { text: "hi back" }),
],
+ total: 2,
+ oldestSeq: 0,
}),
});
}
- if (url.split("?")[0]?.endsWith("/tabs/t2/messages")) {
+ if (url.split("?")[0]?.endsWith("/tabs/t2/chunks")) {
return Promise.resolve({
ok: true,
- json: () => Promise.resolve({ messages: [] }),
+ json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }),
});
}
return Promise.reject(new Error(`unexpected fetch ${url}`));
@@ -733,9 +750,9 @@ describe("hydrateFromBackend", () => {
expect(n).toBe(2);
expect(store.tabs.length).toBe(2);
expect(store.tabs[0]?.id).toBe("t1");
- expect(store.tabs[0]?.messages.length).toBe(2);
+ expect(store.tabs[0]?.renderGroups.length).toBe(2);
expect(store.tabs[1]?.id).toBe("t2");
- expect(store.tabs[1]?.messages.length).toBe(0);
+ expect(store.tabs[1]?.renderGroups.length).toBe(0);
expect(store.activeTabId).toBe("t1");
});
@@ -772,12 +789,14 @@ describe("hydrateFromBackend", () => {
}),
});
}
- if (url.split("?")[0]?.endsWith("/tabs/tr/messages")) {
+ if (url.split("?")[0]?.endsWith("/tabs/tr/chunks")) {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
- messages: [{ id: "u1", role: "user", chunks: [{ type: "text", text: "go" }] }],
+ chunks: [chunkRow("u1", "tr", 0, "turn-r", "user", "text", { text: "go" })],
+ total: 1,
+ oldestSeq: 0,
}),
});
}
@@ -792,8 +811,8 @@ describe("hydrateFromBackend", () => {
expect(tab?.agentStatus).toBe("running");
expect(tab?.currentAssistantId).toBe("live-msg-id");
// Two messages: the user message + the seeded in-flight assistant.
- expect(tab?.messages.length).toBe(2);
- const inflight = tab?.messages.find((m) => m.id === "live-msg-id");
+ expect(tab?.renderGroups.length).toBe(2);
+ const inflight = tab?.renderGroups.find((m) => m.id === "live-msg-id");
expect(inflight).toBeDefined();
expect(inflight?.isStreaming).toBe(true);
expect(inflight?.chunks).toEqual([
@@ -882,8 +901,11 @@ describe("hydrateFromBackend", () => {
if (url.endsWith("/status")) {
return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) });
}
- if (url.split("?")[0]?.endsWith("/tabs/ti/messages")) {
- return Promise.resolve({ ok: true, json: () => Promise.resolve({ messages: [] }) });
+ if (url.split("?")[0]?.endsWith("/tabs/ti/chunks")) {
+ return Promise.resolve({
+ ok: true,
+ json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }),
+ });
}
return Promise.reject(new Error(`unexpected fetch ${url}`));
}),
@@ -939,20 +961,22 @@ describe("hydrateFromBackend", () => {
if (url.endsWith("/status")) {
return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) });
}
- if (url.split("?")[0]?.endsWith("/tabs/tA/messages")) {
+ if (url.split("?")[0]?.endsWith("/tabs/tA/chunks")) {
return Promise.resolve({
ok: true,
json: () =>
Promise.resolve({
- messages: [{ id: "msg-a", role: "user", chunks: [{ type: "text", text: "ok" }] }],
+ chunks: [chunkRow("msg-a", "tA", 0, "turn-a", "user", "text", { text: "ok" })],
+ total: 1,
+ oldestSeq: 0,
}),
});
}
- if (url.split("?")[0]?.endsWith("/tabs/tB/messages")) {
+ if (url.split("?")[0]?.endsWith("/tabs/tB/chunks")) {
// HTTP error path: response is not ok.
return Promise.resolve({ ok: false, json: () => Promise.resolve({}) });
}
- if (url.split("?")[0]?.endsWith("/tabs/tC/messages")) {
+ if (url.split("?")[0]?.endsWith("/tabs/tC/chunks")) {
// Network error path: the fetch itself rejects.
return Promise.reject(new Error("simulated network failure"));
}
@@ -966,19 +990,19 @@ describe("hydrateFromBackend", () => {
// Healthy tab restored with its message.
const tA = store.tabs.find((t) => t.id === "tA");
- expect(tA?.messages.length).toBe(1);
- expect(tA?.messages[0]?.chunks).toEqual([{ type: "text", text: "ok" }]);
+ expect(tA?.renderGroups.length).toBe(1);
+ expect(tA?.renderGroups[0]?.chunks).toEqual([{ type: "text", text: "ok" }]);
// Both broken tabs restored with empty message lists — neither
// crashed the hydration nor leaked an error chunk into the UI.
const tB = store.tabs.find((t) => t.id === "tB");
expect(tB).toBeDefined();
- expect(tB?.messages.length).toBe(0);
+ expect(tB?.renderGroups.length).toBe(0);
expect(tB?.agentStatus).toBe("idle");
const tC = store.tabs.find((t) => t.id === "tC");
expect(tC).toBeDefined();
- expect(tC?.messages.length).toBe(0);
+ expect(tC?.renderGroups.length).toBe(0);
expect(tC?.agentStatus).toBe("idle");
});
});
@@ -1013,7 +1037,7 @@ describe("handleEvent statuses with TabStatusSnapshot", () => {
const tab = store.tabs.find((t) => t.id === tabId);
expect(tab?.agentStatus).toBe("running");
expect(tab?.currentAssistantId).toBe("live-x");
- const inflight = tab?.messages.find((m) => m.id === "live-x");
+ const inflight = tab?.renderGroups.find((m) => m.id === "live-x");
expect(inflight).toBeDefined();
expect(inflight?.chunks).toEqual([{ type: "text", text: "live data" }]);
expect(inflight?.isStreaming).toBe(true);
@@ -1046,7 +1070,7 @@ describe("handleEvent statuses with TabStatusSnapshot", () => {
const tab = store.tabs.find((t) => t.id === tabId);
expect(tab?.agentStatus).toBe("idle");
expect(tab?.currentAssistantId).toBeNull();
- const msgA = tab?.messages.find((m) => m.id === "msg-a");
+ const msgA = tab?.renderGroups.find((m) => m.id === "msg-a");
expect(msgA?.isStreaming).toBe(false);
});
});
@@ -1097,3 +1121,392 @@ describe("tabStore — cache rate (usage events)", () => {
expect(store.tabs[0]?.cacheStats).toBeUndefined();
});
});
+
+// ─── chunk-native store: eviction, pagination, reconcile ────────
+//
+// The store's source of truth for HISTORY is a flat ChunkRow[] (`tab.chunks`,
+// real seqs); the live turn is a transient tail (`tab.live`) folded from
+// stream deltas and reconciled into `chunks` on `turn-sealed`. `tab.renderGroups`
+// is a derived render projection. These tests drive the real store.
+
+describe("tabStore — chunk-native eviction / pagination / reconcile", () => {
+ function chunksResponse(rows: Array<Record<string, unknown>>, total?: number) {
+ const oldestSeq = rows.length > 0 ? ((rows[0]?.seq as number) ?? null) : null;
+ return {
+ ok: true,
+ json: () => Promise.resolve({ chunks: rows, total: total ?? rows.length, oldestSeq }),
+ };
+ }
+ function tabsListResponse(id: string) {
+ return {
+ ok: true,
+ json: () => Promise.resolve({ tabs: [{ id, title: id, parentTabId: null }] }),
+ };
+ }
+ function emptyStatuses() {
+ return { ok: true, json: () => Promise.resolve({ statuses: {} }) };
+ }
+ const tick = () => new Promise((r) => setTimeout(r, 0));
+
+ it("evicts sealed chunks to chunkLimit, trimming WITHIN one large turn", async () => {
+ appSettings.chunkLimit = 5;
+ try {
+ // One turn of 10 chunk rows (the pathological single big turn).
+ const rows = Array.from({ length: 10 }, (_, i) =>
+ chunkRow(`c${i}`, "big", i, "turn-1", i === 0 ? "user" : "assistant", "text", {
+ text: `t${i}`,
+ }),
+ );
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.endsWith("/tabs")) return Promise.resolve(tabsListResponse("big"));
+ if (url.endsWith("/status")) return Promise.resolve(emptyStatuses());
+ if (url.split("?")[0]?.endsWith("/tabs/big/chunks"))
+ return Promise.resolve(chunksResponse(rows, 10));
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ await store.hydrateFromBackend();
+ const tab = store.tabs.find((t) => t.id === "big");
+ // Bounded to chunkLimit; the OLDEST rows of the single turn were trimmed.
+ expect(tab?.chunks.length).toBe(5);
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([5, 6, 7, 8, 9]);
+ // Pagination cursor points at a real remaining seq.
+ expect(tab?.oldestLoadedSeq).toBe(5);
+ } finally {
+ appSettings.chunkLimit = 100;
+ }
+ });
+
+ it("loadOlderChunks prepends an older page and dedupes by seq", async () => {
+ appSettings.chunkLimit = 1000; // don't evict during this test
+ try {
+ const newer = [6, 7, 8, 9].map((s) =>
+ chunkRow(`c${s}`, "pg", s, "t", "assistant", "text", { text: `t${s}` }),
+ );
+ // Overlaps the newer window at seq 6 — must dedupe.
+ const older = [2, 3, 4, 5, 6].map((s) =>
+ chunkRow(`c${s}`, "pg", s, "t", "assistant", "text", { text: `t${s}` }),
+ );
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.endsWith("/tabs")) return Promise.resolve(tabsListResponse("pg"));
+ if (url.endsWith("/status")) return Promise.resolve(emptyStatuses());
+ if (url.split("?")[0]?.endsWith("/tabs/pg/chunks")) {
+ return Promise.resolve(
+ url.includes("before=") ? chunksResponse(older, 8) : chunksResponse(newer, 8),
+ );
+ }
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ await store.hydrateFromBackend();
+ let tab = store.tabs.find((t) => t.id === "pg");
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([6, 7, 8, 9]);
+ expect(tab?.oldestLoadedSeq).toBe(6);
+
+ await store.loadOlderChunks("pg");
+ tab = store.tabs.find((t) => t.id === "pg");
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([2, 3, 4, 5, 6, 7, 8, 9]);
+ expect(tab?.oldestLoadedSeq).toBe(2);
+ } finally {
+ appSettings.chunkLimit = 100;
+ }
+ });
+
+ it("turn-sealed folds the live turn into the sealed chunk log with real seqs", async () => {
+ const sealed = [
+ chunkRow("u", "rc", 0, "turn-x", "user", "text", { text: "hi" }),
+ chunkRow("a", "rc", 1, "turn-x", "assistant", "text", { text: "answer" }),
+ ];
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.split("?")[0]?.endsWith("/tabs/rc/chunks"))
+ return Promise.resolve(chunksResponse(sealed, 2));
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ store.handleEvent({
+ type: "tab-created",
+ id: "rc",
+ title: "RC",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ });
+ store.handleEvent({ type: "turn-start", turnId: "turn-x", tabId: "rc" });
+ store.handleEvent({ type: "text-delta", delta: "answer", tabId: "rc" });
+ // While streaming: live tail holds the in-flight assistant; sealed empty.
+ let tab = store.tabs.find((t) => t.id === "rc");
+ expect(tab?.live.length).toBe(1);
+ expect(tab?.chunks.length).toBe(0);
+ // The live assistant is tagged with the turn id (stable render key basis).
+ expect(tab?.live[0]?.turnId).toBe("turn-x");
+
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-x", tabId: "rc" });
+ await tick(); // reconcile refetch is async
+ tab = store.tabs.find((t) => t.id === "rc");
+ expect(tab?.live.length).toBe(0);
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]);
+ expect(tab?.renderGroups.map((m) => m.role)).toEqual(["user", "assistant"]);
+ // The sealed messages carry the SAME turn id as the live ones did, so the
+ // turn-scoped render key is stable across reconcile (no remount/flash).
+ expect(tab?.renderGroups.every((m) => m.turnId === "turn-x")).toBe(true);
+ });
+
+ it("defers reconcile while scrolled up, then runs it on return to bottom", async () => {
+ const sealed = [chunkRow("u", "df", 0, "turn-y", "user", "text", { text: "q" })];
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.split("?")[0]?.endsWith("/tabs/df/chunks"))
+ return Promise.resolve(chunksResponse(sealed, 1));
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ store.handleEvent({
+ type: "tab-created",
+ id: "df",
+ title: "DF",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ });
+ store.handleEvent({ type: "turn-start", turnId: "turn-y", tabId: "df" });
+ store.handleEvent({ type: "text-delta", delta: "partial", tabId: "df" });
+ store.setScrolledUp("df", true);
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-y", tabId: "df" });
+ await tick();
+ // Deferred: live tail still present, not yet folded into sealed chunks.
+ expect(store.tabs.find((t) => t.id === "df")?.live.length).toBe(1);
+
+ store.setScrolledUp("df", false);
+ await tick();
+ const tab = store.tabs.find((t) => t.id === "df");
+ expect(tab?.live.length).toBe(0);
+ expect(tab?.chunks.length).toBe(1);
+ });
+
+ it("preserves an optimistic queued user message when an earlier turn reconciles", async () => {
+ const sealed = [
+ chunkRow("u", "q", 0, "turn-a", "user", "text", { text: "first" }),
+ chunkRow("a", "q", 1, "turn-a", "assistant", "text", { text: "answer" }),
+ ];
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.split("?")[0]?.endsWith("/tabs/q/chunks"))
+ return Promise.resolve(chunksResponse(sealed, 2));
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ store.handleEvent({
+ type: "tab-created",
+ id: "q",
+ title: "Q",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ });
+ store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "q" });
+ store.handleEvent({ type: "text-delta", delta: "answer", tabId: "q" });
+ // User queues a follow-up WHILE turn-a streams (optimistic, no turn id yet).
+ store.handleEvent({
+ type: "message-queued",
+ tabId: "q",
+ messageId: "q1",
+ message: "do this next",
+ });
+ expect(store.tabs.find((t) => t.id === "q")?.live.some((m) => m.id === "queued-q1")).toBe(true);
+
+ // turn-a seals → reconcile. The queued user bubble must survive.
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "q" });
+ await tick();
+ const tab = store.tabs.find((t) => t.id === "q");
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]);
+ expect(tab?.live.some((m) => m.id === "queued-q1")).toBe(true);
+ // The sealed turn's assistant was folded out of the live tail.
+ expect(tab?.live.some((m) => m.role === "assistant")).toBe(false);
+ });
+
+ it("turn-start backfill skips a pending queued row (race), tags only the initiator", async () => {
+ // Realistic race: the user sends a prompt (plain optimistic row, no turn id
+ // yet) and, before the WS `turn-start` arrives, sends a follow-up that the
+ // backend queues (`queued-` prefix). When `turn-start` finally lands, the
+ // backfill must tag ONLY the plain initiator and SKIP the pending queued
+ // row — otherwise the queued row inherits the sealing turn's id and is
+ // wiped on reconcile (the Pass-2 Blocker).
+ const store = createTabStore();
+ store.handleEvent({
+ type: "tab-created",
+ id: "rq",
+ title: "RQ",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ });
+ store.switchTab("rq"); // sendMessage targets the active tab
+ const sealed = [
+ chunkRow("u", "rq", 0, "turn-a", "user", "text", { text: "first" }),
+ chunkRow("a", "rq", 1, "turn-a", "assistant", "text", { text: "answer" }),
+ ];
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string, opts?: { body?: string }) => {
+ const path = url.split("?")[0] ?? "";
+ if (path.endsWith("/tabs/rq/chunks")) return Promise.resolve(chunksResponse(sealed, 2));
+ if (path.endsWith("/chat")) {
+ // The 2nd send carries a queueId → backend reports it queued.
+ const queued = (opts?.body ?? "").includes("queueId");
+ return Promise.resolve({
+ ok: true,
+ json: () =>
+ Promise.resolve(queued ? { status: "queued", messageId: "srv" } : { status: "ok" }),
+ });
+ }
+ return Promise.resolve({
+ ok: true,
+ json: () => Promise.resolve({}),
+ text: () => Promise.resolve(""),
+ });
+ }),
+ );
+
+ // A freshly created tab defaults to "running"; the agent is idle here.
+ store.handleEvent({ type: "status", status: "idle", tabId: "rq" });
+ await store.sendMessage("first"); // idle → plain optimistic user row
+ store.handleEvent({ type: "status", status: "running", tabId: "rq" });
+ await store.sendMessage("second"); // running → queued (queued- prefix)
+
+ let tab = store.tabs.find((t) => t.id === "rq");
+ const initiatorId = tab?.live.find((m) => m.role === "user" && !m.id.startsWith("queued-"))?.id;
+ const queuedId = tab?.live.find((m) => m.id.startsWith("queued-"))?.id;
+ expect(initiatorId).toBeTruthy();
+ expect(queuedId).toBeTruthy();
+
+ // turn-start arrives LATE: the queued follow-up is already in the live tail.
+ store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "rq" });
+ tab = store.tabs.find((t) => t.id === "rq");
+ expect(tab?.live.find((m) => m.id === initiatorId)?.turnId).toBe("turn-a");
+ expect(tab?.live.find((m) => m.id === queuedId)?.turnId).toBeUndefined();
+
+ store.handleEvent({ type: "text-delta", delta: "answer", tabId: "rq" });
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "rq" });
+ await tick();
+ tab = store.tabs.find((t) => t.id === "rq");
+ // Initiator folded cleanly into its sealed row (no duplicate user bubble);
+ // the queued follow-up survives, still untagged and still queued.
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]);
+ expect(tab?.live.find((m) => m.id === queuedId)?.turnId).toBeUndefined();
+ expect(tab?.live.some((m) => m.role === "assistant")).toBe(false);
+ expect(tab?.renderGroups.map((m) => m.role)).toEqual(["user", "assistant", "user"]);
+ });
+
+ it("a consumed interrupt message collapses into the sealed turn (no lingering bubble)", async () => {
+ // During a running turn the user queues a message; the agent CONSUMES it
+ // (interrupt), folding its text into the turn's persisted chunks. The
+ // frontend's consumed user bubble must be BOUND to the in-flight turn so it
+ // is dropped on reconcile — otherwise it lingers untagged forever AND
+ // duplicates the interrupt text now living in the sealed chunk (Pass-3 Block).
+ const sealed = [
+ chunkRow("u", "ic", 0, "turn-a", "user", "text", { text: "do a thing" }),
+ chunkRow("a", "ic", 1, "turn-a", "assistant", "text", { text: "working ...resumed" }),
+ ];
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.split("?")[0]?.endsWith("/tabs/ic/chunks"))
+ return Promise.resolve(chunksResponse(sealed, 2));
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ store.handleEvent({
+ type: "tab-created",
+ id: "ic",
+ title: "IC",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ });
+ store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "ic" });
+ store.handleEvent({ type: "text-delta", delta: "working", tabId: "ic" });
+ store.handleEvent({ type: "message-queued", tabId: "ic", messageId: "qx", message: "stop" });
+ // Agent consumes the queued message mid-turn (interrupt injection).
+ store.handleEvent({ type: "message-consumed", tabId: "ic", messageIds: ["qx"] });
+ let tab = store.tabs.find((t) => t.id === "ic");
+ const consumed = tab?.live.find((m) => m.id === "qx");
+ expect(consumed).toBeTruthy();
+ // The fix: the consumed row is bound to the active turn, not left untagged.
+ expect(consumed?.turnId).toBe("turn-a");
+ expect(tab?.queuedMessages.some((m) => m.id === "qx")).toBe(false);
+
+ store.handleEvent({ type: "text-delta", delta: "resumed", tabId: "ic" });
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "ic" });
+ await tick();
+ tab = store.tabs.find((t) => t.id === "ic");
+ // Collapsed to the persisted shape: the consumed bubble was dropped; only
+ // the sealed chunks remain (no lingering / duplicated interrupt bubble).
+ expect(tab?.live.length).toBe(0);
+ expect(tab?.live.some((m) => m.id === "qx")).toBe(false);
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]);
+ });
+
+ it("preserves a concurrent newer turn when an earlier deferred reconcile flushes", async () => {
+ const sealedA = [
+ chunkRow("ua", "c", 0, "turn-a", "user", "text", { text: "A?" }),
+ chunkRow("aa", "c", 1, "turn-a", "assistant", "text", { text: "A!" }),
+ ];
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.split("?")[0]?.endsWith("/tabs/c/chunks"))
+ return Promise.resolve(chunksResponse(sealedA, 2));
+ return Promise.reject(new Error(`unexpected ${url}`));
+ }),
+ );
+ const store = createTabStore();
+ store.handleEvent({
+ type: "tab-created",
+ id: "c",
+ title: "C",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ });
+ // Turn A streams; user scrolls up; A finishes → reconcile deferred.
+ store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "c" });
+ store.handleEvent({ type: "text-delta", delta: "A!", tabId: "c" });
+ store.setScrolledUp("c", true);
+ store.handleEvent({ type: "status", status: "idle", tabId: "c" });
+ store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "c" });
+ // Turn B (a queued message) starts streaming while still scrolled up.
+ store.handleEvent({ type: "turn-start", turnId: "turn-b", tabId: "c" });
+ store.handleEvent({ type: "status", status: "running", tabId: "c" });
+ store.handleEvent({ type: "text-delta", delta: "B in progress", tabId: "c" });
+ let tab = store.tabs.find((t) => t.id === "c");
+ expect(tab?.liveTurnId).toBe("turn-b");
+ const bId = tab?.currentAssistantId;
+ expect(bId).toBeTruthy();
+
+ // User scrolls down → the deferred reconcile for turn-a flushes.
+ store.setScrolledUp("c", false);
+ await tick();
+ tab = store.tabs.find((t) => t.id === "c");
+ // Turn A folded into the sealed log...
+ expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]);
+ // ...but turn B's in-flight state survived intact (no wipe / no remount).
+ expect(tab?.liveTurnId).toBe("turn-b");
+ expect(tab?.currentAssistantId).toBe(bId);
+ expect(tab?.live.some((m) => m.turnId === "turn-b")).toBe(true);
+ expect(tab?.live.some((m) => m.turnId === "turn-a")).toBe(false);
+ });
+});