From 624b808da0f2f8bbad8a4fbbcca3f82f24ecfc47 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Sat, 30 May 2026 23:14:55 +0900 Subject: feat(chunks): chunk-native frontend store with turn-sealed reconcile + per-chunk eviction Replace the stored ChatMessage[] with a chunk-native model: tab.chunks (sealed ChunkRow[]) + tab.live (transient in-flight turn buffer) + derived tab.renderGroups. This enables per-chunk eviction (trimming WITHIN a large turn) and raw-chunk pagination (loadOlderChunks), removing the whole-message eviction limitation. Backend: - Emit turn-start/turn-sealed around each turn; expose currentTurnId in the status snapshot. turn-sealed fires after the durable write (status:idle fires before it). - New GET /tabs/:id/chunks raw paginated endpoint (limit/before). - Wrap appendChunks in a single SQLite transaction. Frontend: - turn-sealed drives a turn-aware reconcile that folds the sealed turn into chunks while preserving a concurrent newer in-flight turn and pending queued messages; deferred while the user is scrolled up. - Stable turn-scoped render keys (${turnId}:${role}:${n}) avoid remount/flash. Reconcile correctness (three review passes): - preserve a concurrent newer turn when an earlier deferred reconcile flushes; - keep optimistic queued user messages (no loss); - turn-start backfill skips pending queued rows and tags only the turn initiator; - bind consumed interrupt messages to the in-flight turn so they collapse on seal (no lingering/duplicated bubble). Tests: chat-store reconcile/eviction/pagination suite; api chunks endpoint + events. --- packages/api/src/agent-manager.ts | 13 + packages/api/src/routes/tabs.ts | 24 + packages/api/tests/agent-manager.test.ts | 32 +- packages/api/tests/routes.test.ts | 13 + packages/core/src/chunks/append.ts | 13 +- packages/core/src/db/chunks.ts | 58 +- packages/core/src/types/index.ts | 26 +- .../frontend/src/lib/components/ChatPanel.svelte | 33 +- packages/frontend/src/lib/tabs.svelte.ts | 811 ++++++++++++--------- packages/frontend/src/lib/types.ts | 20 +- packages/frontend/tests/chat-store.test.ts | 483 +++++++++++- 11 files changed, 1099 insertions(+), 427 deletions(-) diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index f7975d1..5f2027d 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -799,6 +799,9 @@ export class AgentManager { if (tabAgent.currentAssistantId) { snap.currentAssistantId = tabAgent.currentAssistantId; } + if (tabAgent.currentTurnId) { + snap.currentTurnId = tabAgent.currentTurnId; + } } result[tabId] = snap; } @@ -1119,6 +1122,10 @@ export class AgentManager { // chunk rows — shares one `turn_id`. const turnId = crypto.randomUUID(); tabAgent.currentTurnId = turnId; + // Announce the turn so the frontend can tag its live chunks with this + // turn_id (stable render keys → flicker-free reconcile when the turn + // seals). Emitted before any content delta. + this.emit({ type: "turn-start", turnId }, tabId); appendChunks(tabId, explodeUserText(turnId, message)); // Store agent models on the tab if provided (defines fallback order) @@ -1284,6 +1291,12 @@ export class AgentManager { this.emit({ type: "status", status: "error" }, tabId); break; } + // Turn fully settled and its chunks are now persisted (flushAssistant ran + // above). Signal the frontend that the turn's rows — with real seqs — are + // durable so it can fold its live representation into the sealed log. + // Emitted AFTER status:idle/error (which fire before the DB write). + this.emit({ type: "turn-sealed", turnId }, tabId); + // Turn fully settled — clear the shared turn id. tabAgent.currentTurnId = null; diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts index e9265ec..b1e9659 100644 --- a/packages/api/src/routes/tabs.ts +++ b/packages/api/src/routes/tabs.ts @@ -93,6 +93,30 @@ tabsRoutes.get("/:id/messages", (c) => { return c.json({ messages, total, oldestSeq }); }); +// Raw chunk window for a tab — the chunk-native frontend's load/paginate +// source. Same `limit`/`before` chunk-`seq` windowing as `/messages`, but +// returns the flat `ChunkRow[]` WITHOUT server-side grouping (the frontend +// groups for render and evicts/paginates on the flat list). Dedupe on the +// client by `seq` when overlap-fetching. +tabsRoutes.get("/:id/chunks", (c) => { + const id = c.req.param("id"); + const limitRaw = c.req.query("limit"); + const beforeRaw = c.req.query("before"); + const limit = limitRaw !== undefined ? Number(limitRaw) : undefined; + const before = beforeRaw !== undefined ? Number(beforeRaw) : undefined; + const options = + limit !== undefined || before !== undefined + ? { + ...(limit !== undefined && Number.isFinite(limit) ? { limit } : {}), + ...(before !== undefined && Number.isFinite(before) ? { before } : {}), + } + : undefined; + const chunks = getChunksForTab(id, options); + const oldestSeq = chunks.length > 0 ? (chunks[0]?.seq ?? null) : null; + const total = getTotalChunkCount(id); + return c.json({ chunks, total, oldestSeq }); +}); + tabsRoutes.patch("/:id", async (c) => { const id = c.req.param("id"); const body = await c.req.json<{ diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index 6b016db..4415bbb 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -340,15 +340,43 @@ describe("AgentManager", () => { await manager.processMessage("tab-1", "test"); expect(events.length).toBeGreaterThan(0); - expect(events[0]).toMatchObject({ type: "status", status: "running" }); + // A turn now opens with `turn-start`, immediately followed by the + // agent's `status: running`. + expect(events[0]).toMatchObject({ type: "turn-start" }); + expect(events[1]).toMatchObject({ type: "status", status: "running" }); + // A turn now closes with `turn-sealed` (emitted after the DB write, which + // is after the agent's final `status: idle`). const lastEvent = events[events.length - 1]; - expect(lastEvent).toMatchObject({ type: "status", status: "idle" }); + expect(lastEvent).toMatchObject({ type: "turn-sealed" }); + expect(events.some((e) => e.type === "status" && e.status === "idle")).toBe(true); const doneEvent = events.find((e) => e.type === "done"); expect(doneEvent).toBeDefined(); }); + it("emits a turn-start with a turnId before any content event", async () => { + const manager = new AgentManager(); + const events: AgentEvent[] = []; + manager.onEvent((event) => { + events.push(event); + }); + + await manager.processMessage("tab-turnstart", "go"); + + const turnStartIdx = events.findIndex((e) => e.type === "turn-start"); + expect(turnStartIdx).toBeGreaterThanOrEqual(0); + const turnStart = events[turnStartIdx] as Extract; + expect(typeof turnStart.turnId).toBe("string"); + expect(turnStart.turnId.length).toBeGreaterThan(0); + + // Must precede the first content delta. + const firstContentIdx = events.findIndex( + (e) => e.type === "text-delta" || e.type === "reasoning-delta", + ); + expect(firstContentIdx).toBeGreaterThan(turnStartIdx); + }); + it("emits text-delta events during processMessage", async () => { const manager = new AgentManager(); const events: AgentEvent[] = []; diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index f4de845..4b8dd40 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -335,6 +335,19 @@ describe("POST /chat", () => { }); }); +describe("GET /tabs/:id/chunks", () => { + it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => { + const res = await app.request("/tabs/tab-x/chunks?limit=50"); + expect(res.status).toBe(200); + const body = await res.json(); + // Mocked getChunksForTab returns [] → empty window, null cursor. + expect(Array.isArray(body.chunks)).toBe(true); + expect(body.chunks).toEqual([]); + expect(body.total).toBe(0); + expect(body.oldestSeq).toBeNull(); + }); +}); + describe("POST /chat/stop", () => { it("returns 200 with success: true for valid tabId", async () => { const res = await app.request("/chat/stop", { diff --git a/packages/core/src/chunks/append.ts b/packages/core/src/chunks/append.ts index baccd10..4bc6b5f 100644 --- a/packages/core/src/chunks/append.ts +++ b/packages/core/src/chunks/append.ts @@ -6,7 +6,7 @@ * backend (agent + agent-manager) and the frontend store call this helper * so the wire format stays in lockstep across the boundary. * - * Open/close rules — see plan-chunk-refactor.md for the full table. + * Open/close rules — see notes/plan-chunk-refactor.md for the full table. * * | Chunk | Opens on | Coalesces | * |---------------|-----------------------------------------------------------------|------------------------------------------------------------| @@ -35,9 +35,10 @@ * (no unsealed thinking chunk) are dropped. * * Ignored events: - * - `status`, `done`, `usage`, `task-list-update`, `tab-created`, - * `message-queued`, `message-consumed`, `message-cancelled` — these are - * control / lifecycle events, not message content. + * - `status`, `turn-start`, `turn-sealed`, `done`, `usage`, + * `task-list-update`, `tab-created`, `message-queued`, `message-consumed`, + * `message-cancelled` — these are control / lifecycle events, not message + * content. */ import type { @@ -200,6 +201,8 @@ export function appendEventToChunks(chunks: Chunk[], event: AgentEvent): void { // Lifecycle / control events — no chunk emitted. case "status": + case "turn-start": + case "turn-sealed": case "done": case "usage": case "task-list-update": @@ -251,7 +254,7 @@ export interface SystemEventLike { * in flight*. (When a turn IS in flight, the caller should instead use * `appendEventToChunks` against the in-flight message's chunks directly.) * - * Routing rules (from plan-chunk-refactor.md): + * Routing rules (from notes/plan-chunk-refactor.md): * * 1. Most recent message is `role: "system"` → append a `system` chunk * to it. (Note: a second consecutive system event creates a second diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts index 6841eb5..077259d 100644 --- a/packages/core/src/db/chunks.ts +++ b/packages/core/src/db/chunks.ts @@ -52,32 +52,38 @@ export function appendChunks(tabId: string, drafts: ChunkRowDraft[]): ChunkRow[] VALUES ($id, $tabId, $seq, $turnId, $step, $role, $type, $dataJson, $now)`, ); const out: ChunkRow[] = []; - for (const draft of drafts) { - const id = randomUUID(); - insert.run({ - $id: id, - $tabId: tabId, - $seq: seq, - $turnId: draft.turnId, - $step: draft.step, - $role: draft.role, - $type: draft.type, - $dataJson: JSON.stringify(draft.data), - $now: now, - }); - out.push({ - id, - tabId, - seq, - turnId: draft.turnId, - step: draft.step, - role: draft.role, - type: draft.type, - data: draft.data, - createdAt: now, - }); - seq++; - } + // Wrap the whole batch in one transaction: a turn's chunks are persisted in + // a single `appendChunks` call, so this is one fsync per turn instead of one + // per row — the chosen low-IO write strategy for constrained backends. + const insertAll = db.transaction(() => { + for (const draft of drafts) { + const id = randomUUID(); + insert.run({ + $id: id, + $tabId: tabId, + $seq: seq, + $turnId: draft.turnId, + $step: draft.step, + $role: draft.role, + $type: draft.type, + $dataJson: JSON.stringify(draft.data), + $now: now, + }); + out.push({ + id, + tabId, + seq, + turnId: draft.turnId, + step: draft.step, + role: draft.role, + type: draft.type, + data: draft.data, + createdAt: now, + }); + seq++; + } + }); + insertAll(); return out; } diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index 3fcdb40..a4230ca 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -10,7 +10,7 @@ export type MessageRole = "user" | "assistant" | "system"; * preserves the actual temporal ordering of text, reasoning, tool calls, * system notices, and errors as they arrived from the model. * - * Coalescing rules (see plan-chunk-refactor.md): + * Coalescing rules (see notes/plan-chunk-refactor.md): * - `text` and `thinking` coalesce on consecutive same-type deltas. * - `tool-batch` coalesces on consecutive `tool-call` events * (appends a new entry to `calls`). @@ -199,10 +199,34 @@ export interface TabStatusSnapshot { status: AgentStatus; currentChunks?: Chunk[]; currentAssistantId?: string; + /** + * `turn_id` of the in-flight turn. Present iff `status === "running"`. + * Lets a frontend that reconnects mid-stream key its live chunks the same + * way `turn-start` would, so they reconcile cleanly when the turn seals. + */ + currentTurnId?: string; } export type AgentEvent = | { type: "status"; status: AgentStatus } + /** + * Emitted once at the start of a turn (`processMessage`), before any + * content deltas. Carries the `turn_id` shared by this turn's user message + * and every assistant/tool chunk row. The frontend tags its in-flight + * (live) chunks with this id so they key-match the sealed rows on + * turn-completion reconcile (no remount/flicker). Display/sync only — not + * conversation content. + */ + | { type: "turn-start"; turnId: string } + /** + * Emitted once after a turn has fully settled AND its chunks have been + * persisted (after `flushAssistant`). Signals the frontend that the turn's + * rows — with real `seq`s — are now durable and can be reloaded, so it can + * fold its transient live representation into the sealed chunk log. Emitted + * after `status: idle`/`error` (which fire before the DB write). Display/sync + * only — not conversation content. + */ + | { type: "turn-sealed"; turnId: string } | { type: "text-delta"; delta: string } | { type: "reasoning-delta"; delta: string } /** diff --git a/packages/frontend/src/lib/components/ChatPanel.svelte b/packages/frontend/src/lib/components/ChatPanel.svelte index 8170d43..abdec17 100644 --- a/packages/frontend/src/lib/components/ChatPanel.svelte +++ b/packages/frontend/src/lib/components/ChatPanel.svelte @@ -8,9 +8,26 @@ let userScrolledUp = $state(false); let isAutoScrolling = false; let isLoadingMore = $state(false); -const messages = $derived(tabStore.activeTab?.messages ?? []); +const renderGroups = $derived(tabStore.activeTab?.renderGroups ?? []); const activeTabId = $derived(tabStore.activeTab?.id); +// Stable, turn-scoped render keys. A bubble's identity is `${turnId}:${role}:${n}` +// (n = its index among same-(turn,role) messages) rather than the underlying +// row/client id, so when the live turn reconciles into sealed chunk rows the +// bubble keeps its identity and does NOT remount (no flash). Falls back to the +// message id for anything without a turnId (e.g. optimistic/queued messages +// before turn-start, standalone system notices). +const keyedMessages = $derived.by(() => { + const counts = new Map(); + return renderGroups.map((m) => { + if (!m.turnId) return { m, key: m.id }; + const base = `${m.turnId}:${m.role}`; + const n = counts.get(base) ?? 0; + counts.set(base, n + 1); + return { m, key: `${base}:${n}` }; + }); +}); + function isNearBottom(el: HTMLElement): boolean { return el.scrollHeight - el.scrollTop - el.clientHeight < 64; } @@ -35,7 +52,7 @@ async function onNearTop() { const prevScrollHeight = messagesEl?.scrollHeight ?? 0; const prevScrollTop = messagesEl?.scrollTop ?? 0; try { - await tabStore.loadMoreMessages(tab.id); + await tabStore.loadOlderChunks(tab.id); // Wait for Svelte to flush the prepended messages into the DOM. // Reading `scrollHeight` synchronously after the await would observe // the OLD layout (reactive updates are batched), so the scroll @@ -66,7 +83,7 @@ function handleScroll() { if (activeTabId) tabStore.setScrolledUp(activeTabId, userScrolledUp); // User just scrolled back to the bottom manually — safe to evict now. if (wasScrolledUp && !userScrolledUp && activeTabId) { - tabStore.evictMessages(activeTabId); + tabStore.evictChunks(activeTabId); } // Near the top — pull in older history. if (userScrolledUp && messagesEl.scrollTop < 200) { @@ -79,13 +96,13 @@ function resumeAutoScroll() { isAutoScrolling = true; if (activeTabId) { tabStore.setScrolledUp(activeTabId, false); - tabStore.evictMessages(activeTabId); + tabStore.evictChunks(activeTabId); } scrollToBottom(true); } $effect(() => { - const count = messages.length; + const count = renderGroups.length; void count; if (messagesEl) { untrack(() => { @@ -121,13 +138,13 @@ $effect(() => { {#if isLoadingMore}
Loading earlier messages...
{/if} - {#if messages.length === 0} + {#if renderGroups.length === 0}
Send a message to start a conversation
{/if} - {#each messages as message (message.id)} - + {#each keyedMessages as { m, key } (key)} + {/each} diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts index c57f800..e303c80 100644 --- a/packages/frontend/src/lib/tabs.svelte.ts +++ b/packages/frontend/src/lib/tabs.svelte.ts @@ -8,6 +8,10 @@ import { type IdentifiedMessage, type SystemEventLike, } from "@dispatch/core/src/chunks/append.js"; +// DB-free; safe in the browser bundle. The flat chunk log is the frontend's +// source of truth for HISTORY; `groupRowsToMessages` derives render bubbles. +import { groupRowsToMessages, type MessageRow } from "@dispatch/core/src/chunks/transform.js"; +import type { ChunkRow } from "@dispatch/core/src/types/index.js"; import { config } from "./config.js"; import { appSettings } from "./settings.svelte.js"; import type { @@ -35,34 +39,118 @@ function generateId(): string { }); } +function makeDebugInfo(overrides: Partial = {}): DebugInfo { + return { + timestamp: new Date().toISOString(), + connectionStatus: wsClient.connectionStatus, + ...overrides, + }; +} + +// ─── Chunk-log → render projection ─────────────────────────────── +// +// History lives as a flat `ChunkRow[]` (sealed, real seq). For rendering we +// group it into bubbles with `groupRowsToMessages` (pairs tool_call+tool_result +// by callId, wraps a turn's assistant chunks) — a pure, ephemeral view, never +// stored as the source of truth. + +/** Map a grouped chunk-row message to a render `ChatMessage`. */ +function rowGroupToMessage(m: MessageRow): ChatMessage { + return { + id: m.id, + role: m.role, + chunks: m.chunks, + isStreaming: false, + seq: m.seq, + turnId: m.turnId, + }; +} + /** - * Extract the smallest `seq` from a list of raw API message rows. The backend - * tags each persisted message with a monotonic `seq`; we track the oldest one - * currently loaded so `loadMoreMessages` can page backwards via `?before=seq`. - * Returns null when no row carries a usable seq. + * The render view for a tab: grouped sealed chunks followed by the transient + * live tail (current unsealed turn). This is what the chat panel renders. */ -function oldestSeqOf(messages: Array<{ seq?: number }>): number | null { +function deriveRenderGroups(chunks: ChunkRow[], live: ChatMessage[]): ChatMessage[] { + const sealed = groupRowsToMessages(chunks).map(rowGroupToMessage); + return live.length > 0 ? [...sealed, ...live] : sealed; +} + +/** Total chunk count of the live tail (for the eviction budget). */ +function countLiveChunks(live: ChatMessage[]): number { + return live.reduce((sum, m) => sum + m.chunks.length, 0); +} + +/** Smallest `seq` among sealed chunk rows, or null when empty. */ +function minSeqOf(chunks: ChunkRow[]): number | null { let min: number | null = null; - for (const m of messages) { - if (typeof m.seq === "number" && (min === null || m.seq < min)) { - min = m.seq; - } + for (const c of chunks) { + if (typeof c.seq === "number" && (min === null || c.seq < min)) min = c.seq; } return min; } -function makeDebugInfo(overrides: Partial = {}): DebugInfo { - return { - timestamp: new Date().toISOString(), - connectionStatus: wsClient.connectionStatus, - ...overrides, - }; +/** Merge older chunk rows into a window, dedupe by `seq`, keep ascending. */ +function mergeChunksBySeq(existing: ChunkRow[], incoming: ChunkRow[]): ChunkRow[] { + const bySeq = new Map(); + for (const c of existing) bySeq.set(c.seq, c); + for (const c of incoming) bySeq.set(c.seq, c); + return [...bySeq.values()].sort((a, b) => a.seq - b.seq); +} + +/** Fetch a raw chunk window from the backend (the chunk-native load source). */ +async function fetchChunkWindow( + tabId: string, + params: { limit?: number; before?: number } = {}, +): Promise<{ ok: boolean; chunks: ChunkRow[]; total: number; oldestSeq: number | null }> { + const qs = new URLSearchParams(); + if (params.limit !== undefined) qs.set("limit", String(params.limit)); + if (params.before !== undefined) qs.set("before", String(params.before)); + const q = qs.toString(); + try { + const res = await fetch(`${config.apiBase}/tabs/${tabId}/chunks${q ? `?${q}` : ""}`); + if (!res.ok) return { ok: false, chunks: [], total: 0, oldestSeq: null }; + const data = (await res.json()) as { + chunks?: ChunkRow[]; + total?: number; + oldestSeq?: number | null; + }; + const chunks = Array.isArray(data.chunks) ? data.chunks : []; + return { + ok: true, + chunks, + total: data.total ?? chunks.length, + oldestSeq: data.oldestSeq ?? minSeqOf(chunks), + }; + } catch { + return { ok: false, chunks: [], total: 0, oldestSeq: null }; + } } export interface Tab { id: string; title: string; - messages: ChatMessage[]; + /** + * SEALED conversation history as a flat chunk log (real per-tab `seq`). + * The source of truth for history and the unit of eviction + pagination. + */ + chunks: ChunkRow[]; + /** + * Transient render buffer for the CURRENT (unsealed) turn only: the + * optimistic user message, the in-flight assistant turn (folded from + * stream deltas), queued/consumed user messages, interrupt splits. Tiny and + * short-lived — cleared and folded into `chunks` (via refetch) the moment + * the turn seals. NOT stored history. + */ + live: ChatMessage[]; + /** + * Materialized render projection = groupRowsToMessages(chunks) ++ live, + * recomputed by `updateTab` after any change to `chunks`/`live`. A derived + * cache for the view layer — NOT the source of truth, never the + * eviction/pagination unit. + */ + renderGroups: ChatMessage[]; + /** turn_id of the in-flight turn (stable render keys + reconcile). */ + liveTurnId: string | null; agentStatus: "idle" | "running" | "error"; keyId: string | null; modelId: string | null; @@ -70,26 +158,18 @@ export interface Tab { currentAssistantId: string | null; tasks: TaskItem[]; injectedSkills: string[]; - /** null = user-owned tab, string = spawned by that tab */ parentTabId: string | null; - /** Persistent tabs stay until manually closed. Temp tabs disappear when agent finishes. */ persistent: boolean; - /** Slug of the selected agent, or null for manual mode */ agentSlug: string | null; - /** Scope of the selected agent */ agentScope: string | null; - /** Ordered key+model fallback hierarchy from the selected agent */ agentModels: Array<{ key_id: string; model_id: string }> | null; - /** Custom working directory override for this tab */ workingDirectory: string | null; - /** Messages queued to be sent once the agent finishes its current run */ queuedMessages: QueuedMessage[]; - /** Max chunks to keep in memory before evicting oldest messages */ chunkLimit: number; - /** Seq of the oldest message currently loaded, or null if unknown/none */ + /** Smallest `seq` currently in `chunks` — the backward-pagination cursor. */ oldestLoadedSeq: number | null; - /** Total number of messages for this tab on the backend */ - totalMessages: number; + /** Total chunk count for this tab on the backend (drives "more to load?"). */ + totalChunks: number; /** * Cumulative prompt-cache token telemetry for this tab since the page * loaded (in-memory only — resets on reload). Undefined until the first @@ -126,6 +206,12 @@ export function createTabStore() { // `setScrolledUp`. A `force` eviction ignores this set entirely. const scrolledUpTabs = new Set(); + // tabId → the turn_id whose reconcile was deferred because the user was + // scrolled up. A Map (not a Set) so the deferred flush knows which turn + // sealed and can preserve a newer turn that started streaming meanwhile. + // Flushed when they return to the bottom so we don't yank their viewport. + const pendingReconcileTabs = new Map(); + // Clear any stale listeners from HMR reloads, then register wsClient.clearCallbacks(); wsClient.onEvent((event) => { @@ -164,7 +250,10 @@ export function createTabStore() { const tab: Tab = { id, title, - messages: [], + chunks: [], + live: [], + renderGroups: [], + liveTurnId: null, agentStatus: "idle", keyId: null, modelId: null, @@ -181,7 +270,7 @@ export function createTabStore() { queuedMessages: [], chunkLimit: appSettings.chunkLimit, oldestLoadedSeq: null, - totalMessages: 0, + totalChunks: 0, }; tabs = [...tabs, tab]; activeTabId = id; @@ -229,35 +318,17 @@ export function createTabStore() { parentTabId?: string | null; }; - const messagesRes = await fetch(`${config.apiBase}/tabs/${agentId}/messages?limit=100`); - // `GET /messages` windows the flat chunk log (last N chunks) and - // groups the rows into render messages (`groupRowsToMessages` in - // packages/core/src/chunks/transform.ts), serving them as `chunks` - // per message over the wire — NOT a raw JSON string. - const messagesData = messagesRes.ok - ? ((await messagesRes.json()) as { - messages: Array<{ - id?: string; - role: string; - chunks?: Chunk[]; - seq?: number; - }>; - total?: number; - }) - : { messages: [], total: 0 }; - - const chatMessages: ChatMessage[] = messagesData.messages.map((m) => ({ - id: m.id ?? generateId(), - role: m.role as ChatMessage["role"], - chunks: Array.isArray(m.chunks) ? m.chunks : [], - isStreaming: false, - seq: m.seq, - })); + // Load the tail of the flat chunk log (raw rows — the frontend groups + // for render and evicts/paginates on the flat list). + const win = await fetchChunkWindow(agentId, { limit: 100 }); const newTab: Tab = { id: agentId, title: tabData.title, - messages: chatMessages, + chunks: win.chunks, + live: [], + renderGroups: deriveRenderGroups(win.chunks, []), + liveTurnId: null, agentStatus: "idle", keyId: tabData.keyId ?? null, modelId: tabData.modelId ?? null, @@ -273,12 +344,12 @@ export function createTabStore() { workingDirectory: null, queuedMessages: [], chunkLimit: appSettings.chunkLimit, - oldestLoadedSeq: oldestSeqOf(messagesData.messages), - totalMessages: messagesData.total ?? messagesData.messages.length, + oldestLoadedSeq: win.oldestSeq, + totalChunks: win.total, }; tabs = [...tabs, newTab]; activeTabId = agentId; - evictMessages(agentId); + evictChunks(agentId); } catch (err) { console.error("openAgentTab failed:", err); } @@ -312,7 +383,17 @@ export function createTabStore() { } function updateTab(id: string, patch: Partial): void { - tabs = tabs.map((t) => (t.id === id ? { ...t, ...patch } : t)); + tabs = tabs.map((t) => { + if (t.id !== id) return t; + const next = { ...t, ...patch }; + // `renderGroups` is a derived cache: recompute it whenever its inputs + // (`chunks` / `live`) change so the view layer never reads a stale + // projection. Callers only ever mutate `chunks`/`live`. + if ("chunks" in patch || "live" in patch) { + next.renderGroups = deriveRenderGroups(next.chunks, next.live); + } + return next; + }); } /** @@ -321,26 +402,56 @@ export function createTabStore() { * messages — we don't want to delete what they're currently looking at. */ function setScrolledUp(tabId: string, scrolledUp: boolean): void { - if (scrolledUp) scrolledUpTabs.add(tabId); - else scrolledUpTabs.delete(tabId); + if (scrolledUp) { + scrolledUpTabs.add(tabId); + } else { + scrolledUpTabs.delete(tabId); + // Returned to the bottom — run any reconcile we deferred while reading. + const deferredTurnId = pendingReconcileTabs.get(tabId); + if (deferredTurnId !== undefined) { + pendingReconcileTabs.delete(tabId); + reconcileSealedTurn(tabId, deferredTurnId); + } + } } /** - * Trim a tab's in-memory message history down to its `chunkLimit`. - * - * Counts the total number of chunks across all messages and removes the - * oldest messages (from the front of the array) until the count is at or - * below the limit — but never drops below a coherent conversation bottom: - * - the in-flight streaming assistant message is always pinned; - * - the most recent user+assistant pair (last 2 messages) is always pinned. - * - * Evicted messages are fully removed from `tab.messages` — there is no - * stub or hidden cache; scrolling up re-fetches them via `loadMoreMessages`. - * - * Eviction is suppressed while the user is scrolled up (reading history), - * unless `force` is true. + * Drop up to `n` of the oldest chunks from the live tail (front-to-back + * across its messages), never removing the chunk currently being streamed + * (the last chunk of the in-flight assistant message). Emptied messages are + * dropped. Used only when a single in-flight turn alone exceeds the budget. + */ + function trimLiveChunks( + live: ChatMessage[], + n: number, + streamingId: string | null, + ): ChatMessage[] { + let remaining = n; + const out = live.map((m) => ({ ...m, chunks: [...m.chunks] })); + for (const m of out) { + if (remaining <= 0) break; + const isStreamingMsg = m.id === streamingId || m.isStreaming === true; + while (m.chunks.length > 0 && remaining > 0) { + // Keep the last (open) chunk of the actively streaming message. + if (isStreamingMsg && m.chunks.length === 1) break; + m.chunks.shift(); + remaining--; + } + } + return out.filter((m) => m.chunks.length > 0); + } + + /** + * Bound a tab's in-memory footprint to `chunkLimit` by rolling eviction of + * the OLDEST chunks. Sealed history (`tab.chunks`) is trimmed from the front + * first; if a single in-flight turn alone still exceeds the budget, the + * oldest chunks of the live tail are trimmed too (never the chunk currently + * being streamed). Evicted sealed chunks are re-fetched on scroll-up via + * `loadOlderChunks`; live chunks that haven't sealed yet are recovered by + * the turn-completion reconcile once their write lands. Suppressed while + * scrolled up unless `force` is set. */ - function evictMessages(tabId: string, force = false): void { + function evictChunks(tabId: string, force = false): void { const tab = getTabById(tabId); if (!tab) return; if (!force && scrolledUpTabs.has(tabId)) return; @@ -348,132 +459,61 @@ export function createTabStore() { const limit = appSettings.chunkLimit; if (!Number.isFinite(limit) || limit <= 0) return; - const countChunks = (msgs: ChatMessage[]) => msgs.reduce((sum, m) => sum + m.chunks.length, 0); - - // Work on a shallow copy we can shift from. - const working = [...tab.messages]; - let total = countChunks(working); - let removed = false; - - // `chunkLimit` is a SOFT target for trimming OLD history, not a hard - // cap. We always keep at least the latest user+assistant pair (the - // `working.length > 2` floor) so the view is never blank and an answer - // always has its preceding question on screen. Consequently a single - // very long turn (one message holding e.g. 150 chunks) can briefly push - // the in-memory total above `limit` — that is intentional and accepted. - // We never trim chunks from WITHIN a message: messages are the - // persistence/pagination unit (the backend stores whole rows keyed by - // `seq` and `loadMoreMessages` pages by whole messages via `?before=`), - // so a partially-trimmed message would just be re-fetched from the DB - // and bounce back, and each message's `role` would be lost in a flat - // chunk array. Whole-message eviction from the front is the correct - // granularity. - while (total > limit && working.length > 2) { - const candidate = working[0]; - if (!candidate) break; - // Never evict the in-flight streaming assistant message. - if (candidate.isStreaming || candidate.id === tab.currentAssistantId) break; - working.shift(); - total -= candidate.chunks.length; - removed = true; + let sealed = tab.chunks; + let live = tab.live; + let total = sealed.length + countLiveChunks(live); + if (total <= limit) return; + + // 1. Drop oldest sealed chunk rows from the front. + if (sealed.length > 0) { + let dropTo = 0; + while (total > limit && dropTo < sealed.length) { + dropTo++; + total--; + } + if (dropTo > 0) sealed = sealed.slice(dropTo); + } + + // 2. Still over budget → one live turn exceeds the limit on its own. + if (total > limit && live.length > 0) { + live = trimLiveChunks(live, total - limit, tab.currentAssistantId); } - if (!removed) return; - // Recalculate oldestLoadedSeq from remaining messages after eviction so - // the `?before=` pagination cursor doesn't point at an evicted seq. - const remainingSeq = (working as Array).reduce( - (min, m) => (typeof m.seq === "number" && (min === null || m.seq < min) ? m.seq : min), - null, - ); updateTab(tabId, { - messages: working, - oldestLoadedSeq: remainingSeq ?? tab.oldestLoadedSeq, + chunks: sealed, + live, + oldestLoadedSeq: minSeqOf(sealed) ?? tab.oldestLoadedSeq, }); } /** - * Fetch and prepend the next page of older messages for a tab. Called when - * the user scrolls toward the top. Pages backwards using the oldest loaded - * `seq` (`?before=`). Does NOT trigger eviction — the user is reading - * history, so we keep everything they've pulled in. + * Fetch and prepend the next older page of CHUNKS (raw rows). Called when + * the user scrolls toward the top. Pages backward by the oldest loaded + * `seq` (`?before=`), dedupes by `seq`, and keeps the window seq-sorted — + * so a turn split across the window boundary regroups into one bubble with + * no special-casing. Does NOT evict (the user is reading history). */ - async function loadMoreMessages(tabId: string): Promise { + async function loadOlderChunks(tabId: string): Promise { const tab = getTabById(tabId); if (!tab) return; - - const beforeParam = tab.oldestLoadedSeq !== null ? `&before=${tab.oldestLoadedSeq}` : ""; - try { - const res = await fetch(`${config.apiBase}/tabs/${tabId}/messages?limit=50${beforeParam}`); - if (!res.ok) return; - const data = (await res.json()) as { - messages?: Array<{ - id?: string; - role: string; - chunks?: Chunk[]; - seq?: number; - turnId?: string; - }>; - total?: number; - oldestSeq?: number | null; - }; - const rawMessages = data.messages ?? []; - if (rawMessages.length === 0) { - // Nothing older to load; record the total if provided. - if (typeof data.total === "number") { - updateTab(tabId, { totalMessages: data.total }); - } - return; - } - - const older: ChatMessage[] = rawMessages.map((m) => ({ - id: m.id ?? generateId(), - role: m.role as ChatMessage["role"], - chunks: Array.isArray(m.chunks) ? m.chunks : [], - isStreaming: false, - seq: m.seq, - ...(m.turnId !== undefined ? { turnId: m.turnId } : {}), - })); - - const current = getTabById(tabId); - if (!current) return; - - // Chunk-granular pagination can split ONE turn across the window - // boundary: the oldest message already loaded and the newest message - // in this older page may share a turn_id. Merge them (older chunks - // first) so the turn renders as one bubble instead of duplicating. - const merged = [...current.messages]; - const lastOlder = older[older.length - 1]; - const firstCurrent = merged[0]; - if ( - lastOlder && - firstCurrent && - lastOlder.turnId !== undefined && - lastOlder.turnId === firstCurrent.turnId && - lastOlder.role === firstCurrent.role - ) { - older.pop(); - merged[0] = { - ...firstCurrent, - id: lastOlder.id, - seq: lastOlder.seq, - turnId: lastOlder.turnId, - chunks: [...lastOlder.chunks, ...firstCurrent.chunks], - }; - } - - // Avoid duplicating messages we already have loaded. - const existingIds = new Set(merged.map((m) => m.id)); - const toPrepend = older.filter((m) => !existingIds.has(m.id)); - - const newOldestSeq = data.oldestSeq ?? oldestSeqOf(rawMessages); - updateTab(tabId, { - messages: [...toPrepend, ...merged], - oldestLoadedSeq: newOldestSeq ?? current.oldestLoadedSeq, - totalMessages: data.total ?? current.totalMessages, - }); - } catch (err) { - console.warn("[loadMoreMessages] failed:", err); + const before = tab.oldestLoadedSeq; + const win = await fetchChunkWindow(tabId, { + limit: 50, + ...(before !== null ? { before } : {}), + }); + const current = getTabById(tabId); + if (!current) return; + if (win.chunks.length === 0) { + // Nothing older; refresh the total if the backend reported a real one. + if (win.total > 0) updateTab(tabId, { totalChunks: win.total }); + return; } + const merged = mergeChunksBySeq(current.chunks, win.chunks); + updateTab(tabId, { + chunks: merged, + oldestLoadedSeq: minSeqOf(merged), + totalChunks: win.total, + }); } function ensureAssistantMessage(tabId: string): ChatMessage | null { @@ -481,7 +521,7 @@ export function createTabStore() { if (!tab) return null; if (tab.currentAssistantId) { - const existing = tab.messages.find((m) => m.id === tab.currentAssistantId); + const existing = tab.live.find((m) => m.id === tab.currentAssistantId); if (existing) return existing; } @@ -491,19 +531,24 @@ export function createTabStore() { role: "assistant", chunks: [], isStreaming: true, + ...(tab.liveTurnId !== null ? { turnId: tab.liveTurnId } : {}), }; updateTab(tabId, { currentAssistantId: id, - messages: [...tab.messages, newMsg], + live: [...tab.live, newMsg], }); - evictMessages(tabId); + evictChunks(tabId); return newMsg; } - function updateMessages(tabId: string, updater: (msgs: ChatMessage[]) => ChatMessage[]): void { + /** + * Update the live tail (the current unsealed turn). All streaming handlers + * operate here; sealed history (`tab.chunks`) is never touched by streaming. + */ + function updateLive(tabId: string, updater: (live: ChatMessage[]) => ChatMessage[]): void { const tab = getTabById(tabId); if (!tab) return; - updateTab(tabId, { messages: updater(tab.messages) }); + updateTab(tabId, { live: updater(tab.live) }); } /** @@ -516,7 +561,7 @@ export function createTabStore() { * `$state.snapshot` (Svelte's own safe clone — strips reactive proxies and * falls back gracefully where native `structuredClone` would throw * `DataCloneError` on a `$state` proxy), mutate the snapshot, then write - * it back through `updateMessages`. The previous use of `structuredClone` + * it back through `updateLive`. The previous use of `structuredClone` * here threw silently and was swallowed by the WS try/catch — left chunks * empty for every streaming turn. */ @@ -526,7 +571,7 @@ export function createTabStore() { if (!tab) return; const currentId = tab.currentAssistantId; if (!currentId) return; - updateMessages(tabId, (msgs) => + updateLive(tabId, (msgs) => msgs.map((m) => { if (m.id !== currentId) return m; const cloned = $state.snapshot(m.chunks) as Chunk[]; @@ -538,6 +583,8 @@ export function createTabStore() { return { ...m, chunks: cloned, isStreaming: true }; }), ); + // A chunk may have just completed — keep the in-memory footprint bounded. + evictChunks(tabId); } /** @@ -548,20 +595,22 @@ export function createTabStore() { function routeSystemEvent(tabId: string, sysEvent: SystemEventLike): void { const tab = getTabById(tabId); if (!tab) return; - // We need to mutate the messages array (applySystemEvent does in-place - // push). Build a shallow-cloned IdentifiedMessage[] view via - // `$state.snapshot` (safe against Svelte 5 reactive proxies; native - // `structuredClone` would throw), run the helper, then write it back. - const view: IdentifiedMessage[] = tab.messages.map((m) => ({ + // Operate on the live tail (applySystemEvent appends a system chunk to + // the trailing system message or creates one). Build a shallow-cloned + // IdentifiedMessage[] view via `$state.snapshot` (safe against Svelte 5 + // reactive proxies; native `structuredClone` would throw), run the + // helper, then write it back. The backend persists this system row too, + // so it reconciles into `chunks` on the next turn/load. + const view: IdentifiedMessage[] = tab.live.map((m) => ({ id: m.id, role: m.role, chunks: $state.snapshot(m.chunks) as Chunk[], })); applySystemEvent(view, sysEvent, generateId); - // Reconcile: rebuild the ChatMessage array from the view, preserving - // existing message metadata (isStreaming, debugInfo) where IDs match. - const byId = new Map(tab.messages.map((m) => [m.id, m])); + // Reconcile: rebuild the live array from the view, preserving existing + // message metadata (isStreaming, debugInfo) where IDs match. + const byId = new Map(tab.live.map((m) => [m.id, m])); const rebuilt: ChatMessage[] = view.map((v) => { const existing = byId.get(v.id); if (existing) { @@ -574,40 +623,74 @@ export function createTabStore() { isStreaming: false, }; }); - updateTab(tabId, { messages: rebuilt }); + updateTab(tabId, { live: rebuilt }); } /** - * Reload a tab's messages from the API. Used after a WS reconnect when - * we detect the backend finished work while we were disconnected — the - * persisted chunks are the source of truth; in-memory state may be - * missing events. + * Reload a tab's chunk window from the API and fold the sealed turn out of + * the live tail. The persisted chunk log is the source of truth. Two modes: + * - turn-completion reconcile (`preserveActiveTurn=true`, `sealedTurnId` + * set): the just-sealed turn's rows now carry real seqs. Drop that turn + * from `live`, but PRESERVE (a) a newer turn that began streaming while a + * reconcile was deferred — the queued-message race — and (b) optimistic + * user messages not yet bound to a turn, so neither is wiped. + * - WS-reconnect desync (`preserveActiveTurn=false`): the backend has moved + * on and is idle, so trust the DB fully and clear the live tail. + * A failed fetch is a no-op (never wipes a populated tab). */ - async function reloadTabMessagesFromApi(tabId: string): Promise { - try { - const res = await fetch(`${config.apiBase}/tabs/${tabId}/messages?limit=100`); - if (!res.ok) return; - const data = (await res.json()) as { - messages: Array<{ id?: string; role: string; chunks?: Chunk[]; seq?: number }>; - total?: number; - }; - const reloaded: ChatMessage[] = data.messages.map((m) => ({ - id: m.id ?? generateId(), - role: m.role as ChatMessage["role"], - chunks: Array.isArray(m.chunks) ? m.chunks : [], - isStreaming: false, - seq: m.seq, - })); - updateTab(tabId, { - messages: reloaded, - currentAssistantId: null, - oldestLoadedSeq: oldestSeqOf(data.messages), - totalMessages: data.total ?? reloaded.length, - }); - evictMessages(tabId); - } catch (err) { - console.warn("[reloadTabMessagesFromApi] failed:", err); + async function reloadChunksFromApi( + tabId: string, + preserveActiveTurn = false, + sealedTurnId?: string, + ): Promise { + const win = await fetchChunkWindow(tabId, { limit: 100 }); + if (!win.ok) return; + const current = getTabById(tabId); + if (!current) return; + // A turn that started streaming AFTER the one being reconciled must not be + // wiped — only the sealed turn folds into `chunks`. + const preserveTurnId = + preserveActiveTurn && current.liveTurnId !== null && current.liveTurnId !== sealedTurnId + ? current.liveTurnId + : null; + const keptLive = preserveActiveTurn + ? current.live.filter( + (m) => + (preserveTurnId !== null && m.turnId === preserveTurnId) || + // Optimistic / queued user messages not yet bound to a turn. + (m.turnId === undefined && m.role === "user"), + ) + : []; + const stillActive = preserveTurnId !== null; + updateTab(tabId, { + chunks: win.chunks, + live: keptLive, + liveTurnId: stillActive ? current.liveTurnId : null, + currentAssistantId: stillActive ? current.currentAssistantId : null, + oldestLoadedSeq: win.oldestSeq, + totalChunks: win.total, + }); + evictChunks(tabId); + } + + /** + * Turn-completion reconcile. On `turn-sealed`, fold the just-finished turn + * (`sealedTurnId`) into the sealed log by reloading the chunk window (real + * seqs) and dropping that turn from the live tail — while preserving any + * newer in-flight turn and not-yet-sealed optimistic user messages. Deferred + * while the user is scrolled up so the viewport isn't disturbed; re-attempted + * (with the same `sealedTurnId`) when they return to the bottom. + */ + function reconcileSealedTurn(tabId: string, sealedTurnId: string): void { + const tab = getTabById(tabId); + if (!tab) return; + if (tab.live.length === 0 && tab.liveTurnId === null) return; + if (scrolledUpTabs.has(tabId)) { + pendingReconcileTabs.set(tabId, sealedTurnId); + return; } + pendingReconcileTabs.delete(tabId); + void reloadChunksFromApi(tabId, true, sealedTurnId); } /** @@ -675,91 +758,51 @@ export function createTabStore() { // Non-fatal: tabs still restore with idle status. } - // 3. For each tab, fetch its persisted messages in parallel. - const messageFetches = tabRows.map(async (row) => { - try { - const res = await fetch(`${config.apiBase}/tabs/${row.id}/messages?limit=100`); - if (!res.ok) - return { id: row.id, messages: [] as ChatMessage[], total: 0, oldestSeq: null }; - const data = (await res.json()) as { - messages?: Array<{ id?: string; role: string; chunks?: Chunk[]; seq?: number }>; - total?: number; - }; - const rawMessages = data.messages ?? []; - const messages: ChatMessage[] = rawMessages.map((m) => ({ - id: m.id ?? generateId(), - role: m.role as ChatMessage["role"], - chunks: Array.isArray(m.chunks) ? m.chunks : [], - isStreaming: false, - seq: m.seq, - })); - return { - id: row.id, - messages, - total: data.total ?? messages.length, - oldestSeq: oldestSeqOf(rawMessages), - }; - } catch { - return { id: row.id, messages: [] as ChatMessage[], total: 0, oldestSeq: null }; - } - }); - - const messagesByTab = new Map(); - const totalByTab = new Map(); - const oldestSeqByTab = new Map(); - for (const result of await Promise.all(messageFetches)) { - messagesByTab.set(result.id, result.messages); - totalByTab.set(result.id, result.total); - oldestSeqByTab.set(result.id, result.oldestSeq); + // 3. For each tab, fetch its chunk window (raw rows) in parallel. + type Win = { ok: boolean; chunks: ChunkRow[]; total: number; oldestSeq: number | null }; + const winByTab = new Map(); + for (const { id, win } of await Promise.all( + tabRows.map(async (row) => ({ + id: row.id, + win: await fetchChunkWindow(row.id, { limit: 100 }), + })), + )) { + winByTab.set(id, win); } - // 4. Build the Tab objects, splicing in the in-flight snapshot for - // running tabs. + // 4. Build the Tab objects, seeding the in-flight live turn for running + // tabs from the status snapshot (the unsealed turn isn't in the DB + // yet; it reconciles into `chunks` when `turn-sealed` arrives). const restored: Tab[] = tabRows.map((row) => { const snap = statusMap[row.id]; - const messages = messagesByTab.get(row.id) ?? []; + const win: Win = winByTab.get(row.id) ?? { ok: true, chunks: [], total: 0, oldestSeq: null }; const agentStatus: Tab["agentStatus"] = snap?.status ?? "idle"; let currentAssistantId: string | null = null; - let finalMessages = messages; + let liveTurnId: string | null = null; + let live: ChatMessage[] = []; if (agentStatus === "running" && snap?.currentAssistantId) { currentAssistantId = snap.currentAssistantId; - // Find or create the in-flight assistant message. If the DB - // already has a row with this id (the backend appended on - // first flush and we picked it up via /tabs/:id/messages), - // merge the snapshot chunks on top — the snapshot is the - // live source of truth and may have chunks the DB doesn't. - // If there's no matching row, append a new in-flight - // assistant message holding only the snapshot chunks. - const existingIdx = finalMessages.findIndex((m) => m.id === snap.currentAssistantId); - if (existingIdx >= 0) { - finalMessages = finalMessages.map((m, i) => - i === existingIdx - ? { - ...m, - chunks: snap.currentChunks ? [...snap.currentChunks] : m.chunks, - isStreaming: true, - } - : m, - ); - } else { - finalMessages = [ - ...finalMessages, - { - id: snap.currentAssistantId, - role: "assistant", - chunks: snap.currentChunks ? [...snap.currentChunks] : [], - isStreaming: true, - }, - ]; - } + liveTurnId = snap.currentTurnId ?? null; + live = [ + { + id: snap.currentAssistantId, + role: "assistant", + chunks: snap.currentChunks ? [...snap.currentChunks] : [], + isStreaming: true, + ...(liveTurnId !== null ? { turnId: liveTurnId } : {}), + }, + ]; } return { id: row.id, title: row.title, - messages: finalMessages, + chunks: win.chunks, + live, + renderGroups: deriveRenderGroups(win.chunks, live), + liveTurnId, agentStatus, keyId: row.keyId ?? null, modelId: row.modelId ?? null, @@ -775,15 +818,15 @@ export function createTabStore() { workingDirectory: null, queuedMessages: [], chunkLimit: appSettings.chunkLimit, - oldestLoadedSeq: oldestSeqByTab.get(row.id) ?? null, - totalMessages: totalByTab.get(row.id) ?? finalMessages.length, + oldestLoadedSeq: win.oldestSeq, + totalChunks: win.total, }; }); tabs = restored; // Trim each restored tab down to the chunk limit (user starts at bottom). for (const t of restored) { - evictMessages(t.id); + evictChunks(t.id); } // Activate the first restored tab (the list is already ordered by // `position` from the backend). @@ -796,18 +839,73 @@ export function createTabStore() { switch (event.type) { case "status": { - if (tabId) { - updateTab(tabId, { agentStatus: event.status }); - if (event.status === "idle" || event.status === "error") { - updateTab(tabId, { currentAssistantId: null }); - const tab = getTabById(tabId); - if (tab && !tab.persistent && tabId !== activeTabId) { - tabs = tabs.filter((t) => t.id !== tabId); - } + if (!tabId) break; + updateTab(tabId, { agentStatus: event.status }); + if (event.status === "idle" || event.status === "error") { + // Stop the streaming cursor immediately; the fold of the live + // tail into the sealed chunk log happens on `turn-sealed` + // (after the DB write lands — status fires before it). + updateLive(tabId, (msgs) => + msgs.map((m) => (m.isStreaming ? { ...m, isStreaming: false } : m)), + ); + updateTab(tabId, { currentAssistantId: null }); + const tab = getTabById(tabId); + if (tab && !tab.persistent && tabId !== activeTabId) { + tabs = tabs.filter((t) => t.id !== tabId); } } break; } + case "turn-start": { + if (!tabId) break; + const tsTab = getTabById(tabId); + // Tag the in-flight turn. Also backfill the turn_id onto THIS + // turn's initiating optimistic user message — it was created on + // send before the turn_id was known — so it key-matches the sealed + // user row after reconcile (flicker-free; no remount). + // + // A turn-start corresponds to exactly one persisted user row + // (processMessage → explodeUserText), and a queued message never + // gets its own turn-start (it is drained into a running turn via + // message-consumed). So the initiator is the single most-recent + // NON-queued untagged user row. We must NOT tag pending `queued-` + // rows: they belong to future turns, and tagging them here would + // wipe them from the UI when THIS turn seals (reconcile drops live + // rows bound to the sealed turn). + const taggedLive = tsTab + ? (() => { + const live = [...tsTab.live]; + for (let i = live.length - 1; i >= 0; i--) { + const m = live[i]; + // Stop at the first non-user row (assistant/system + // boundary): earlier user rows belong to prior turns. + if (!m || m.role !== "user") break; + // Skip past pending queued messages (future turns). + if (m.id.startsWith("queued-")) continue; + // Most-recent non-queued user row = this turn's + // initiator. Tag it once (if untagged), then stop. + if (m.turnId === undefined) { + live[i] = { ...m, turnId: event.turnId }; + } + break; + } + return live; + })() + : undefined; + updateTab(tabId, { + liveTurnId: event.turnId, + ...(taggedLive ? { live: taggedLive } : {}), + }); + break; + } + case "turn-sealed": { + if (!tabId) break; + // The turn's rows are now durable — fold THIS turn out of the live + // tail into the sealed chunk log (refetch real seqs), preserving any + // newer in-flight turn. Deferred while scrolled up. + reconcileSealedTurn(tabId, event.turnId); + break; + } case "statuses": { // WS (re)connect snapshot. The shape was widened to // TabStatusSnapshot (status + optional currentChunks + @@ -819,10 +917,10 @@ export function createTabStore() { const backendStatus = snap?.status ?? "idle"; // Desync case: frontend thought it was streaming, backend - // has already moved on. Pull the persisted chunks so the - // final answer shows up. + // has already moved on. The turn is persisted now — reload + // the chunk window so the final answer shows up. if (t.agentStatus === "running" && backendStatus !== "running") { - void reloadTabMessagesFromApi(t.id); + void reloadChunksFromApi(t.id); } // Status alignment. @@ -838,8 +936,11 @@ export function createTabStore() { // in-memory currentChunks. if (snap?.currentAssistantId) { const targetId = snap.currentAssistantId; - updateTab(t.id, { currentAssistantId: targetId }); - updateMessages(t.id, (msgs) => { + updateTab(t.id, { + currentAssistantId: targetId, + ...(snap.currentTurnId ? { liveTurnId: snap.currentTurnId } : {}), + }); + updateLive(t.id, (msgs) => { const idx = msgs.findIndex((m) => m.id === targetId); if (idx >= 0) { return msgs.map((m, i) => @@ -859,13 +960,14 @@ export function createTabStore() { role: "assistant", chunks: snap.currentChunks ? [...snap.currentChunks] : [], isStreaming: true, + ...(snap.currentTurnId ? { turnId: snap.currentTurnId } : {}), }, ]; }); } } else if (t.currentAssistantId) { // Not running: clear streaming flags. - updateMessages(t.id, (msgs) => + updateLive(t.id, (msgs) => msgs.map((m) => (m.id === t.currentAssistantId ? { ...m, isStreaming: false } : m)), ); updateTab(t.id, { currentAssistantId: null }); @@ -910,7 +1012,7 @@ export function createTabStore() { if (!tabId) break; const tab5 = getTabById(tabId); if (!tab5) break; - updateMessages(tabId, (msgs) => + updateLive(tabId, (msgs) => msgs.map((m) => (m.id === tab5.currentAssistantId ? { ...m, isStreaming: false } : m)), ); updateTab(tabId, { currentAssistantId: null }); @@ -925,7 +1027,7 @@ export function createTabStore() { // assistant message via the shared helper. Mark debug info // on the message for parity with the previous behavior. applyChunkEvent(tabId, event); - updateMessages(tabId, (msgs) => + updateLive(tabId, (msgs) => msgs.map((m) => m.id === errTab.currentAssistantId ? { @@ -946,7 +1048,7 @@ export function createTabStore() { const afterTab = getTabById(tabId); if (afterTab?.currentAssistantId) { const newId = afterTab.currentAssistantId; - updateMessages(tabId, (msgs) => + updateLive(tabId, (msgs) => msgs.map((m) => m.id === newId ? { @@ -1036,7 +1138,10 @@ export function createTabStore() { const tab: Tab = { id: newTabEvent.id, title: newTabEvent.title, - messages: [], + chunks: [], + live: [], + renderGroups: [], + liveTurnId: null, agentStatus: "running", keyId: newTabEvent.keyId ?? null, modelId: newTabEvent.modelId ?? null, @@ -1053,7 +1158,7 @@ export function createTabStore() { queuedMessages: [], chunkLimit: appSettings.chunkLimit, oldestLoadedSeq: null, - totalMessages: 0, + totalChunks: 0, }; tabs = [...tabs, tab]; } @@ -1075,9 +1180,9 @@ export function createTabStore() { timestamp: Date.now(), }; updateTab(tabId, { queuedMessages: [...mqTab.queuedMessages, qm] }); - // Also add as a user chat message if not already present + // Also add as a user message in the live tail if not present. const tabAfterQm = getTabById(tabId); - const existingMsg = tabAfterQm?.messages.find( + const existingMsg = tabAfterQm?.live.find( (m) => m.id === `queued-${mqEvent.messageId}` || m.id === mqEvent.messageId, ); if (!existingMsg) { @@ -1086,7 +1191,7 @@ export function createTabStore() { role: "user", chunks: [{ type: "text", text: mqEvent.message }], }; - updateTab(tabId, { messages: [...(tabAfterQm?.messages ?? []), userMsg] }); + updateTab(tabId, { live: [...(tabAfterQm?.live ?? []), userMsg] }); } } // If alreadyQueued, the optimistic update already put everything in place with the @@ -1110,7 +1215,7 @@ export function createTabStore() { // the consumed user messages after it. Subsequent streaming events // will create a NEW assistant message block below. const currentAssistantId = mcTab.currentAssistantId; - updateMessages(tabId, (msgs) => { + updateLive(tabId, (msgs) => { // Extract consumed messages const consumed: ChatMessage[] = []; const rest: ChatMessage[] = []; @@ -1118,7 +1223,20 @@ export function createTabStore() { if (m.id.startsWith("queued-")) { const queuedId = m.id.slice(7); if (mcEvent.messageIds.includes(queuedId)) { - consumed.push({ ...m, id: queuedId }); + // Bind the consumed message to the in-flight turn that is + // consuming it. Stripping the `queued-` prefix alone leaves + // it an UNTAGGED user row, which reconcileSealedTurn KEEPS — + // so the interrupt bubble would linger in the live tail + // forever AND duplicate the `[USER INTERRUPT]` text the + // backend folds into the sealed tool-result chunk. Tagging + // it lets reconcile drop it on seal, collapsing to the + // persisted shape. (liveTurnId is set for the duration of a + // running turn, which is the only time a consume happens.) + consumed.push({ + ...m, + id: queuedId, + ...(mcTab.liveTurnId !== null ? { turnId: mcTab.liveTurnId } : {}), + }); continue; } } @@ -1154,7 +1272,7 @@ export function createTabStore() { if (!cancelTab) break; updateTab(tabId, { queuedMessages: cancelTab.queuedMessages.filter((m) => m.id !== cancelEvent.messageId), - messages: cancelTab.messages.filter( + live: cancelTab.live.filter( (m) => !(m.role === "user" && m.id === `queued-${cancelEvent.messageId}`), ), }); @@ -1377,8 +1495,11 @@ export function createTabStore() { ]; } - updateTab(tab.id, { messages: [...tab.messages, userMsg] }); // Generate title from first user message - if (tab.messages.length === 0 || (tab.messages.length === 1 && tab.title === "New Tab")) { + // Optimistically show the user's message in the live tail. + updateTab(tab.id, { live: [...tab.live, userMsg] }); + // Generate a title from the first user message of an empty tab. + const isFirstMessage = tab.chunks.length === 0 && tab.live.length === 0; + if (isFirstMessage || tab.title === "New Tab") { const titleText = text.length > 50 ? `${text.slice(0, 47)}...` : text; updateTab(tab.id, { title: titleText }); fetch(`${config.apiBase}/tabs/${tab.id}`, { @@ -1445,7 +1566,7 @@ export function createTabStore() { queuedMessages: currentTab.queuedMessages.filter((m) => m.id !== queueId), }); } - updateMessages(tab.id, (msgs) => + updateLive(tab.id, (msgs) => msgs.map((m) => (m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m)), ); } @@ -1466,7 +1587,7 @@ export function createTabStore() { httpBody: body, }), }; - updateTab(tab.id, { messages: [...(getTabById(tab.id)?.messages ?? []), errMsg] }); + updateTab(tab.id, { live: [...(getTabById(tab.id)?.live ?? []), errMsg] }); } else { const responseData = (await res.json()) as { status: string; messageId?: string }; if (responseData.status === "queued" && responseData.messageId) { @@ -1489,7 +1610,7 @@ export function createTabStore() { }); } // Restore the message to a normal (non-queued) ID - updateMessages(tab.id, (msgs) => + updateLive(tab.id, (msgs) => msgs.map((m) => (m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m)), ); } @@ -1504,7 +1625,7 @@ export function createTabStore() { queuedMessages: currentTab.queuedMessages.filter((m) => m.id !== queueId), }); } - updateMessages(tab.id, (msgs) => + updateLive(tab.id, (msgs) => msgs.map((m) => (m.id === `queued-${queueId}` ? { ...m, id: generateId() } : m)), ); } @@ -1515,7 +1636,7 @@ export function createTabStore() { isStreaming: false, debugInfo: makeDebugInfo({ error: err instanceof Error ? err.message : String(err) }), }; - updateTab(tab.id, { messages: [...(getTabById(tab.id)?.messages ?? []), errMsg] }); + updateTab(tab.id, { live: [...(getTabById(tab.id)?.live ?? []), errMsg] }); } } @@ -1641,9 +1762,7 @@ export function createTabStore() { if (tab) { updateTab(tabId, { queuedMessages: tab.queuedMessages.filter((m) => m.id !== messageId), - messages: tab.messages.filter( - (m) => !(m.role === "user" && m.id === `queued-${messageId}`), - ), + live: tab.live.filter((m) => !(m.role === "user" && m.id === `queued-${messageId}`)), }); } try { @@ -1684,7 +1803,7 @@ export function createTabStore() { // for it — which is the canonical symptom of a wire-format / load // failure. Always include this so bug reports are diagnosable from // the paste alone, without DB access. - const summarizeChunks = (chunks: (typeof tab.messages)[number]["chunks"]) => { + const summarizeChunks = (chunks: (typeof tab.renderGroups)[number]["chunks"]) => { if (chunks.length === 0) return "chunks=0"; const parts = chunks.map((c) => { if (c.type === "tool-batch") return `tool-batch[${c.calls.length}]`; @@ -1713,7 +1832,7 @@ export function createTabStore() { `Connected to backend: ${isConnected}`, `Tab agentStatus: ${tab.agentStatus}`, `Tab currentAssistantId: ${tab.currentAssistantId ?? "null"}`, - `Messages in store: ${tab.messages.length}`, + `Render groups in store: ${tab.renderGroups.length}`, `Queued messages: ${tab.queuedMessages.length}`, `Persistent: ${tab.persistent}`, `Working directory: ${tab.workingDirectory ?? "default"}`, @@ -1723,7 +1842,7 @@ export function createTabStore() { ]; const TOOL_RESULT_MAX = 300; - for (const msg of tab.messages) { + for (const msg of tab.renderGroups) { const role = msg.role === "user" ? "User" : msg.role === "system" ? "System" : "Assistant"; const streamingFlag = msg.isStreaming ? ", streaming=true" : ""; // Inline message diagnostics — id, streaming, chunk summary — @@ -1826,8 +1945,8 @@ export function createTabStore() { // components — they should rely on the WS subscription instead. handleEvent, hydrateFromBackend, - loadMoreMessages, - evictMessages, + loadOlderChunks, + evictChunks, setScrolledUp, }; } diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts index 6e87aec..bede2cc 100644 --- a/packages/frontend/src/lib/types.ts +++ b/packages/frontend/src/lib/types.ts @@ -89,6 +89,11 @@ export interface SystemChunk { text: string; } +/** + * A render bubble. NOT stored state — it's a derived projection of the flat + * chunk log (`groupRowsToMessages(tab.chunks)`) concatenated with the transient + * live tail. The store's source of truth for history is `tab.chunks: ChunkRow[]`. + */ export interface ChatMessage { id: string; role: "user" | "assistant" | "system"; @@ -97,10 +102,9 @@ export interface ChatMessage { debugInfo?: DebugInfo; seq?: number; /** - * turn_id of the chunk rows this message was grouped from (history loaded - * from the backend). Used by `loadMoreMessages` to merge a turn that was - * split across the chunk-pagination window boundary. Absent for live - * (streaming) messages built client-side. + * turn_id of the chunk rows this message was grouped from (or the in-flight + * turn for a live message). Gives a stable, turn-scoped render key so the + * bubble doesn't remount when the live turn reconciles into sealed chunks. */ turnId?: string; } @@ -125,10 +129,18 @@ export interface TabStatusSnapshot { status: "idle" | "running" | "error"; currentChunks?: Chunk[]; currentAssistantId?: string; + /** turn_id of the in-flight turn; present iff status === "running". */ + currentTurnId?: string; } export type AgentEvent = | { type: "status"; status: "idle" | "running" | "error" } + // Opens a turn before any content delta; carries the turn_id used to tag + // the live chunks so they reconcile cleanly when the turn seals. + | { type: "turn-start"; turnId: string } + // Fires after the turn settled AND its chunks were persisted (after the DB + // write, post status:idle). Triggers the frontend's reconcile-from-DB. + | { type: "turn-sealed"; turnId: string } // Sent on every WS (re)connect: a snapshot of every tab the backend is // currently tracking and its live status. The frontend uses this to // detect desync after a reconnect (e.g. bun --watch restart killed the diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts index c485fc7..b9d37f2 100644 --- a/packages/frontend/tests/chat-store.test.ts +++ b/packages/frontend/tests/chat-store.test.ts @@ -71,6 +71,7 @@ beforeEach(() => { ); }); +import { appSettings } from "../src/lib/settings.svelte.js"; import { createTabStore } from "../src/lib/tabs.svelte.js"; import type { Chunk, PermissionPrompt } from "../src/lib/types.js"; import { wsClient } from "../src/lib/ws.svelte.js"; @@ -92,17 +93,34 @@ async function setupStoreWithTab() { */ function getAssistantChunks(store: ReturnType): Chunk[] | undefined { const tab = store.tabs[0]; - const assistant = tab?.messages.find((m) => m.role === "assistant"); + const assistant = tab?.renderGroups.find((m) => m.role === "assistant"); return assistant?.chunks; } +/** + * Build a raw `ChunkRow` as the `GET /tabs/:id/chunks` endpoint returns them. + * The chunk-native store loads/paginates raw rows and groups them for render. + */ +function chunkRow( + id: string, + tabId: string, + seq: number, + turnId: string, + role: "user" | "assistant" | "tool" | "system", + type: "text" | "thinking" | "tool_call" | "tool_result" | "error" | "system", + data: unknown, + step = 0, +): Record { + return { id, tabId, seq, turnId, step, role, type, data, createdAt: seq }; +} + describe("tabStore — streaming chunk flow (real $state)", () => { it("text-delta creates a streaming assistant message and appends deltas", async () => { const { store, tabId } = await setupStoreWithTab(); store.handleEvent({ type: "text-delta", delta: "Hello", tabId }); - const assistant = store.tabs[0]?.messages.find((m) => m.role === "assistant"); + const assistant = store.tabs[0]?.renderGroups.find((m) => m.role === "assistant"); expect(assistant).toBeDefined(); expect(assistant?.isStreaming).toBe(true); expect(assistant?.chunks).toEqual([{ type: "text", text: "Hello" }]); @@ -204,7 +222,7 @@ describe("tabStore — streaming chunk flow (real $state)", () => { message: { role: "assistant", chunks: [] }, tabId, } as Parameters[0]); - const assistant = store.tabs[0]?.messages.find((m) => m.role === "assistant"); + const assistant = store.tabs[0]?.renderGroups.find((m) => m.role === "assistant"); expect(assistant?.chunks).toEqual([{ type: "text", text: "partial" }]); expect(assistant?.isStreaming).toBe(false); expect(store.tabs[0]?.currentAssistantId).toBeNull(); @@ -315,7 +333,8 @@ describe("tabStore — streaming chunk flow (real $state)", () => { it("error event with no in-flight turn opens a fresh assistant message", async () => { const { store, tabId } = await setupStoreWithTab(); store.handleEvent({ type: "error", error: "boom", tabId }); - const assistantMessages = store.tabs[0]?.messages.filter((m) => m.role === "assistant") ?? []; + const assistantMessages = + store.tabs[0]?.renderGroups.filter((m) => m.role === "assistant") ?? []; expect(assistantMessages.length).toBeGreaterThanOrEqual(1); const errChunks = assistantMessages[assistantMessages.length - 1]?.chunks; expect(errChunks).toHaveLength(1); @@ -342,7 +361,7 @@ describe("tabStore — streaming chunk flow (real $state)", () => { it("notice with no turn in flight creates a role:system message", async () => { const { store, tabId } = await setupStoreWithTab(); store.handleEvent({ type: "notice", message: "standalone", tabId }); - const systemMsg = store.tabs[0]?.messages.find((m) => m.role === "system"); + const systemMsg = store.tabs[0]?.renderGroups.find((m) => m.role === "system"); expect(systemMsg).toBeDefined(); const chunks = systemMsg?.chunks; expect(chunks).toHaveLength(1); @@ -355,7 +374,7 @@ describe("tabStore — streaming chunk flow (real $state)", () => { const { store, tabId } = await setupStoreWithTab(); store.handleEvent({ type: "notice", message: "first", tabId }); store.handleEvent({ type: "notice", message: "second", tabId }); - const sysMsgs = store.tabs[0]?.messages.filter((m) => m.role === "system") ?? []; + const sysMsgs = store.tabs[0]?.renderGroups.filter((m) => m.role === "system") ?? []; expect(sysMsgs).toHaveLength(1); expect(sysMsgs[0]?.chunks).toHaveLength(2); }); @@ -455,7 +474,7 @@ describe("tabStore — reactivity contract", () => { expect(store.tabs[0]?.agentStatus).toBe("idle"); expect(store.tabs[0]?.currentAssistantId).toBeNull(); // The streaming flag on the in-flight message should be cleared. - const assistant = store.tabs[0]?.messages.find((m) => m.role === "assistant"); + const assistant = store.tabs[0]?.renderGroups.find((m) => m.role === "assistant"); expect(assistant?.isStreaming).toBe(false); }); }); @@ -702,26 +721,24 @@ describe("hydrateFromBackend", () => { json: () => Promise.resolve({ statuses: {} }), }); } - if (url.split("?")[0]?.endsWith("/tabs/t1/messages")) { + if (url.split("?")[0]?.endsWith("/tabs/t1/chunks")) { return Promise.resolve({ ok: true, json: () => Promise.resolve({ - messages: [ - { id: "m1", role: "user", chunks: [{ type: "text", text: "hello" }] }, - { - id: "m2", - role: "assistant", - chunks: [{ type: "text", text: "hi back" }], - }, + chunks: [ + chunkRow("m1", "t1", 0, "u1", "user", "text", { text: "hello" }), + chunkRow("m2", "t1", 1, "u1", "assistant", "text", { text: "hi back" }), ], + total: 2, + oldestSeq: 0, }), }); } - if (url.split("?")[0]?.endsWith("/tabs/t2/messages")) { + if (url.split("?")[0]?.endsWith("/tabs/t2/chunks")) { return Promise.resolve({ ok: true, - json: () => Promise.resolve({ messages: [] }), + json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }), }); } return Promise.reject(new Error(`unexpected fetch ${url}`)); @@ -733,9 +750,9 @@ describe("hydrateFromBackend", () => { expect(n).toBe(2); expect(store.tabs.length).toBe(2); expect(store.tabs[0]?.id).toBe("t1"); - expect(store.tabs[0]?.messages.length).toBe(2); + expect(store.tabs[0]?.renderGroups.length).toBe(2); expect(store.tabs[1]?.id).toBe("t2"); - expect(store.tabs[1]?.messages.length).toBe(0); + expect(store.tabs[1]?.renderGroups.length).toBe(0); expect(store.activeTabId).toBe("t1"); }); @@ -772,12 +789,14 @@ describe("hydrateFromBackend", () => { }), }); } - if (url.split("?")[0]?.endsWith("/tabs/tr/messages")) { + if (url.split("?")[0]?.endsWith("/tabs/tr/chunks")) { return Promise.resolve({ ok: true, json: () => Promise.resolve({ - messages: [{ id: "u1", role: "user", chunks: [{ type: "text", text: "go" }] }], + chunks: [chunkRow("u1", "tr", 0, "turn-r", "user", "text", { text: "go" })], + total: 1, + oldestSeq: 0, }), }); } @@ -792,8 +811,8 @@ describe("hydrateFromBackend", () => { expect(tab?.agentStatus).toBe("running"); expect(tab?.currentAssistantId).toBe("live-msg-id"); // Two messages: the user message + the seeded in-flight assistant. - expect(tab?.messages.length).toBe(2); - const inflight = tab?.messages.find((m) => m.id === "live-msg-id"); + expect(tab?.renderGroups.length).toBe(2); + const inflight = tab?.renderGroups.find((m) => m.id === "live-msg-id"); expect(inflight).toBeDefined(); expect(inflight?.isStreaming).toBe(true); expect(inflight?.chunks).toEqual([ @@ -882,8 +901,11 @@ describe("hydrateFromBackend", () => { if (url.endsWith("/status")) { return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) }); } - if (url.split("?")[0]?.endsWith("/tabs/ti/messages")) { - return Promise.resolve({ ok: true, json: () => Promise.resolve({ messages: [] }) }); + if (url.split("?")[0]?.endsWith("/tabs/ti/chunks")) { + return Promise.resolve({ + ok: true, + json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }), + }); } return Promise.reject(new Error(`unexpected fetch ${url}`)); }), @@ -939,20 +961,22 @@ describe("hydrateFromBackend", () => { if (url.endsWith("/status")) { return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) }); } - if (url.split("?")[0]?.endsWith("/tabs/tA/messages")) { + if (url.split("?")[0]?.endsWith("/tabs/tA/chunks")) { return Promise.resolve({ ok: true, json: () => Promise.resolve({ - messages: [{ id: "msg-a", role: "user", chunks: [{ type: "text", text: "ok" }] }], + chunks: [chunkRow("msg-a", "tA", 0, "turn-a", "user", "text", { text: "ok" })], + total: 1, + oldestSeq: 0, }), }); } - if (url.split("?")[0]?.endsWith("/tabs/tB/messages")) { + if (url.split("?")[0]?.endsWith("/tabs/tB/chunks")) { // HTTP error path: response is not ok. return Promise.resolve({ ok: false, json: () => Promise.resolve({}) }); } - if (url.split("?")[0]?.endsWith("/tabs/tC/messages")) { + if (url.split("?")[0]?.endsWith("/tabs/tC/chunks")) { // Network error path: the fetch itself rejects. return Promise.reject(new Error("simulated network failure")); } @@ -966,19 +990,19 @@ describe("hydrateFromBackend", () => { // Healthy tab restored with its message. const tA = store.tabs.find((t) => t.id === "tA"); - expect(tA?.messages.length).toBe(1); - expect(tA?.messages[0]?.chunks).toEqual([{ type: "text", text: "ok" }]); + expect(tA?.renderGroups.length).toBe(1); + expect(tA?.renderGroups[0]?.chunks).toEqual([{ type: "text", text: "ok" }]); // Both broken tabs restored with empty message lists — neither // crashed the hydration nor leaked an error chunk into the UI. const tB = store.tabs.find((t) => t.id === "tB"); expect(tB).toBeDefined(); - expect(tB?.messages.length).toBe(0); + expect(tB?.renderGroups.length).toBe(0); expect(tB?.agentStatus).toBe("idle"); const tC = store.tabs.find((t) => t.id === "tC"); expect(tC).toBeDefined(); - expect(tC?.messages.length).toBe(0); + expect(tC?.renderGroups.length).toBe(0); expect(tC?.agentStatus).toBe("idle"); }); }); @@ -1013,7 +1037,7 @@ describe("handleEvent statuses with TabStatusSnapshot", () => { const tab = store.tabs.find((t) => t.id === tabId); expect(tab?.agentStatus).toBe("running"); expect(tab?.currentAssistantId).toBe("live-x"); - const inflight = tab?.messages.find((m) => m.id === "live-x"); + const inflight = tab?.renderGroups.find((m) => m.id === "live-x"); expect(inflight).toBeDefined(); expect(inflight?.chunks).toEqual([{ type: "text", text: "live data" }]); expect(inflight?.isStreaming).toBe(true); @@ -1046,7 +1070,7 @@ describe("handleEvent statuses with TabStatusSnapshot", () => { const tab = store.tabs.find((t) => t.id === tabId); expect(tab?.agentStatus).toBe("idle"); expect(tab?.currentAssistantId).toBeNull(); - const msgA = tab?.messages.find((m) => m.id === "msg-a"); + const msgA = tab?.renderGroups.find((m) => m.id === "msg-a"); expect(msgA?.isStreaming).toBe(false); }); }); @@ -1097,3 +1121,392 @@ describe("tabStore — cache rate (usage events)", () => { expect(store.tabs[0]?.cacheStats).toBeUndefined(); }); }); + +// ─── chunk-native store: eviction, pagination, reconcile ──────── +// +// The store's source of truth for HISTORY is a flat ChunkRow[] (`tab.chunks`, +// real seqs); the live turn is a transient tail (`tab.live`) folded from +// stream deltas and reconciled into `chunks` on `turn-sealed`. `tab.renderGroups` +// is a derived render projection. These tests drive the real store. + +describe("tabStore — chunk-native eviction / pagination / reconcile", () => { + function chunksResponse(rows: Array>, total?: number) { + const oldestSeq = rows.length > 0 ? ((rows[0]?.seq as number) ?? null) : null; + return { + ok: true, + json: () => Promise.resolve({ chunks: rows, total: total ?? rows.length, oldestSeq }), + }; + } + function tabsListResponse(id: string) { + return { + ok: true, + json: () => Promise.resolve({ tabs: [{ id, title: id, parentTabId: null }] }), + }; + } + function emptyStatuses() { + return { ok: true, json: () => Promise.resolve({ statuses: {} }) }; + } + const tick = () => new Promise((r) => setTimeout(r, 0)); + + it("evicts sealed chunks to chunkLimit, trimming WITHIN one large turn", async () => { + appSettings.chunkLimit = 5; + try { + // One turn of 10 chunk rows (the pathological single big turn). + const rows = Array.from({ length: 10 }, (_, i) => + chunkRow(`c${i}`, "big", i, "turn-1", i === 0 ? "user" : "assistant", "text", { + text: `t${i}`, + }), + ); + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.endsWith("/tabs")) return Promise.resolve(tabsListResponse("big")); + if (url.endsWith("/status")) return Promise.resolve(emptyStatuses()); + if (url.split("?")[0]?.endsWith("/tabs/big/chunks")) + return Promise.resolve(chunksResponse(rows, 10)); + return Promise.reject(new Error(`unexpected ${url}`)); + }), + ); + const store = createTabStore(); + await store.hydrateFromBackend(); + const tab = store.tabs.find((t) => t.id === "big"); + // Bounded to chunkLimit; the OLDEST rows of the single turn were trimmed. + expect(tab?.chunks.length).toBe(5); + expect(tab?.chunks.map((c) => c.seq)).toEqual([5, 6, 7, 8, 9]); + // Pagination cursor points at a real remaining seq. + expect(tab?.oldestLoadedSeq).toBe(5); + } finally { + appSettings.chunkLimit = 100; + } + }); + + it("loadOlderChunks prepends an older page and dedupes by seq", async () => { + appSettings.chunkLimit = 1000; // don't evict during this test + try { + const newer = [6, 7, 8, 9].map((s) => + chunkRow(`c${s}`, "pg", s, "t", "assistant", "text", { text: `t${s}` }), + ); + // Overlaps the newer window at seq 6 — must dedupe. + const older = [2, 3, 4, 5, 6].map((s) => + chunkRow(`c${s}`, "pg", s, "t", "assistant", "text", { text: `t${s}` }), + ); + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.endsWith("/tabs")) return Promise.resolve(tabsListResponse("pg")); + if (url.endsWith("/status")) return Promise.resolve(emptyStatuses()); + if (url.split("?")[0]?.endsWith("/tabs/pg/chunks")) { + return Promise.resolve( + url.includes("before=") ? chunksResponse(older, 8) : chunksResponse(newer, 8), + ); + } + return Promise.reject(new Error(`unexpected ${url}`)); + }), + ); + const store = createTabStore(); + await store.hydrateFromBackend(); + let tab = store.tabs.find((t) => t.id === "pg"); + expect(tab?.chunks.map((c) => c.seq)).toEqual([6, 7, 8, 9]); + expect(tab?.oldestLoadedSeq).toBe(6); + + await store.loadOlderChunks("pg"); + tab = store.tabs.find((t) => t.id === "pg"); + expect(tab?.chunks.map((c) => c.seq)).toEqual([2, 3, 4, 5, 6, 7, 8, 9]); + expect(tab?.oldestLoadedSeq).toBe(2); + } finally { + appSettings.chunkLimit = 100; + } + }); + + it("turn-sealed folds the live turn into the sealed chunk log with real seqs", async () => { + const sealed = [ + chunkRow("u", "rc", 0, "turn-x", "user", "text", { text: "hi" }), + chunkRow("a", "rc", 1, "turn-x", "assistant", "text", { text: "answer" }), + ]; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.split("?")[0]?.endsWith("/tabs/rc/chunks")) + return Promise.resolve(chunksResponse(sealed, 2)); + return Promise.reject(new Error(`unexpected ${url}`)); + }), + ); + const store = createTabStore(); + store.handleEvent({ + type: "tab-created", + id: "rc", + title: "RC", + keyId: null, + modelId: null, + parentTabId: null, + }); + store.handleEvent({ type: "turn-start", turnId: "turn-x", tabId: "rc" }); + store.handleEvent({ type: "text-delta", delta: "answer", tabId: "rc" }); + // While streaming: live tail holds the in-flight assistant; sealed empty. + let tab = store.tabs.find((t) => t.id === "rc"); + expect(tab?.live.length).toBe(1); + expect(tab?.chunks.length).toBe(0); + // The live assistant is tagged with the turn id (stable render key basis). + expect(tab?.live[0]?.turnId).toBe("turn-x"); + + store.handleEvent({ type: "turn-sealed", turnId: "turn-x", tabId: "rc" }); + await tick(); // reconcile refetch is async + tab = store.tabs.find((t) => t.id === "rc"); + expect(tab?.live.length).toBe(0); + expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]); + expect(tab?.renderGroups.map((m) => m.role)).toEqual(["user", "assistant"]); + // The sealed messages carry the SAME turn id as the live ones did, so the + // turn-scoped render key is stable across reconcile (no remount/flash). + expect(tab?.renderGroups.every((m) => m.turnId === "turn-x")).toBe(true); + }); + + it("defers reconcile while scrolled up, then runs it on return to bottom", async () => { + const sealed = [chunkRow("u", "df", 0, "turn-y", "user", "text", { text: "q" })]; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.split("?")[0]?.endsWith("/tabs/df/chunks")) + return Promise.resolve(chunksResponse(sealed, 1)); + return Promise.reject(new Error(`unexpected ${url}`)); + }), + ); + const store = createTabStore(); + store.handleEvent({ + type: "tab-created", + id: "df", + title: "DF", + keyId: null, + modelId: null, + parentTabId: null, + }); + store.handleEvent({ type: "turn-start", turnId: "turn-y", tabId: "df" }); + store.handleEvent({ type: "text-delta", delta: "partial", tabId: "df" }); + store.setScrolledUp("df", true); + store.handleEvent({ type: "turn-sealed", turnId: "turn-y", tabId: "df" }); + await tick(); + // Deferred: live tail still present, not yet folded into sealed chunks. + expect(store.tabs.find((t) => t.id === "df")?.live.length).toBe(1); + + store.setScrolledUp("df", false); + await tick(); + const tab = store.tabs.find((t) => t.id === "df"); + expect(tab?.live.length).toBe(0); + expect(tab?.chunks.length).toBe(1); + }); + + it("preserves an optimistic queued user message when an earlier turn reconciles", async () => { + const sealed = [ + chunkRow("u", "q", 0, "turn-a", "user", "text", { text: "first" }), + chunkRow("a", "q", 1, "turn-a", "assistant", "text", { text: "answer" }), + ]; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.split("?")[0]?.endsWith("/tabs/q/chunks")) + return Promise.resolve(chunksResponse(sealed, 2)); + return Promise.reject(new Error(`unexpected ${url}`)); + }), + ); + const store = createTabStore(); + store.handleEvent({ + type: "tab-created", + id: "q", + title: "Q", + keyId: null, + modelId: null, + parentTabId: null, + }); + store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "q" }); + store.handleEvent({ type: "text-delta", delta: "answer", tabId: "q" }); + // User queues a follow-up WHILE turn-a streams (optimistic, no turn id yet). + store.handleEvent({ + type: "message-queued", + tabId: "q", + messageId: "q1", + message: "do this next", + }); + expect(store.tabs.find((t) => t.id === "q")?.live.some((m) => m.id === "queued-q1")).toBe(true); + + // turn-a seals → reconcile. The queued user bubble must survive. + store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "q" }); + await tick(); + const tab = store.tabs.find((t) => t.id === "q"); + expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]); + expect(tab?.live.some((m) => m.id === "queued-q1")).toBe(true); + // The sealed turn's assistant was folded out of the live tail. + expect(tab?.live.some((m) => m.role === "assistant")).toBe(false); + }); + + it("turn-start backfill skips a pending queued row (race), tags only the initiator", async () => { + // Realistic race: the user sends a prompt (plain optimistic row, no turn id + // yet) and, before the WS `turn-start` arrives, sends a follow-up that the + // backend queues (`queued-` prefix). When `turn-start` finally lands, the + // backfill must tag ONLY the plain initiator and SKIP the pending queued + // row — otherwise the queued row inherits the sealing turn's id and is + // wiped on reconcile (the Pass-2 Blocker). + const store = createTabStore(); + store.handleEvent({ + type: "tab-created", + id: "rq", + title: "RQ", + keyId: null, + modelId: null, + parentTabId: null, + }); + store.switchTab("rq"); // sendMessage targets the active tab + const sealed = [ + chunkRow("u", "rq", 0, "turn-a", "user", "text", { text: "first" }), + chunkRow("a", "rq", 1, "turn-a", "assistant", "text", { text: "answer" }), + ]; + vi.stubGlobal( + "fetch", + vi.fn((url: string, opts?: { body?: string }) => { + const path = url.split("?")[0] ?? ""; + if (path.endsWith("/tabs/rq/chunks")) return Promise.resolve(chunksResponse(sealed, 2)); + if (path.endsWith("/chat")) { + // The 2nd send carries a queueId → backend reports it queued. + const queued = (opts?.body ?? "").includes("queueId"); + return Promise.resolve({ + ok: true, + json: () => + Promise.resolve(queued ? { status: "queued", messageId: "srv" } : { status: "ok" }), + }); + } + return Promise.resolve({ + ok: true, + json: () => Promise.resolve({}), + text: () => Promise.resolve(""), + }); + }), + ); + + // A freshly created tab defaults to "running"; the agent is idle here. + store.handleEvent({ type: "status", status: "idle", tabId: "rq" }); + await store.sendMessage("first"); // idle → plain optimistic user row + store.handleEvent({ type: "status", status: "running", tabId: "rq" }); + await store.sendMessage("second"); // running → queued (queued- prefix) + + let tab = store.tabs.find((t) => t.id === "rq"); + const initiatorId = tab?.live.find((m) => m.role === "user" && !m.id.startsWith("queued-"))?.id; + const queuedId = tab?.live.find((m) => m.id.startsWith("queued-"))?.id; + expect(initiatorId).toBeTruthy(); + expect(queuedId).toBeTruthy(); + + // turn-start arrives LATE: the queued follow-up is already in the live tail. + store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "rq" }); + tab = store.tabs.find((t) => t.id === "rq"); + expect(tab?.live.find((m) => m.id === initiatorId)?.turnId).toBe("turn-a"); + expect(tab?.live.find((m) => m.id === queuedId)?.turnId).toBeUndefined(); + + store.handleEvent({ type: "text-delta", delta: "answer", tabId: "rq" }); + store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "rq" }); + await tick(); + tab = store.tabs.find((t) => t.id === "rq"); + // Initiator folded cleanly into its sealed row (no duplicate user bubble); + // the queued follow-up survives, still untagged and still queued. + expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]); + expect(tab?.live.find((m) => m.id === queuedId)?.turnId).toBeUndefined(); + expect(tab?.live.some((m) => m.role === "assistant")).toBe(false); + expect(tab?.renderGroups.map((m) => m.role)).toEqual(["user", "assistant", "user"]); + }); + + it("a consumed interrupt message collapses into the sealed turn (no lingering bubble)", async () => { + // During a running turn the user queues a message; the agent CONSUMES it + // (interrupt), folding its text into the turn's persisted chunks. The + // frontend's consumed user bubble must be BOUND to the in-flight turn so it + // is dropped on reconcile — otherwise it lingers untagged forever AND + // duplicates the interrupt text now living in the sealed chunk (Pass-3 Block). + const sealed = [ + chunkRow("u", "ic", 0, "turn-a", "user", "text", { text: "do a thing" }), + chunkRow("a", "ic", 1, "turn-a", "assistant", "text", { text: "working ...resumed" }), + ]; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.split("?")[0]?.endsWith("/tabs/ic/chunks")) + return Promise.resolve(chunksResponse(sealed, 2)); + return Promise.reject(new Error(`unexpected ${url}`)); + }), + ); + const store = createTabStore(); + store.handleEvent({ + type: "tab-created", + id: "ic", + title: "IC", + keyId: null, + modelId: null, + parentTabId: null, + }); + store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "ic" }); + store.handleEvent({ type: "text-delta", delta: "working", tabId: "ic" }); + store.handleEvent({ type: "message-queued", tabId: "ic", messageId: "qx", message: "stop" }); + // Agent consumes the queued message mid-turn (interrupt injection). + store.handleEvent({ type: "message-consumed", tabId: "ic", messageIds: ["qx"] }); + let tab = store.tabs.find((t) => t.id === "ic"); + const consumed = tab?.live.find((m) => m.id === "qx"); + expect(consumed).toBeTruthy(); + // The fix: the consumed row is bound to the active turn, not left untagged. + expect(consumed?.turnId).toBe("turn-a"); + expect(tab?.queuedMessages.some((m) => m.id === "qx")).toBe(false); + + store.handleEvent({ type: "text-delta", delta: "resumed", tabId: "ic" }); + store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "ic" }); + await tick(); + tab = store.tabs.find((t) => t.id === "ic"); + // Collapsed to the persisted shape: the consumed bubble was dropped; only + // the sealed chunks remain (no lingering / duplicated interrupt bubble). + expect(tab?.live.length).toBe(0); + expect(tab?.live.some((m) => m.id === "qx")).toBe(false); + expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]); + }); + + it("preserves a concurrent newer turn when an earlier deferred reconcile flushes", async () => { + const sealedA = [ + chunkRow("ua", "c", 0, "turn-a", "user", "text", { text: "A?" }), + chunkRow("aa", "c", 1, "turn-a", "assistant", "text", { text: "A!" }), + ]; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.split("?")[0]?.endsWith("/tabs/c/chunks")) + return Promise.resolve(chunksResponse(sealedA, 2)); + return Promise.reject(new Error(`unexpected ${url}`)); + }), + ); + const store = createTabStore(); + store.handleEvent({ + type: "tab-created", + id: "c", + title: "C", + keyId: null, + modelId: null, + parentTabId: null, + }); + // Turn A streams; user scrolls up; A finishes → reconcile deferred. + store.handleEvent({ type: "turn-start", turnId: "turn-a", tabId: "c" }); + store.handleEvent({ type: "text-delta", delta: "A!", tabId: "c" }); + store.setScrolledUp("c", true); + store.handleEvent({ type: "status", status: "idle", tabId: "c" }); + store.handleEvent({ type: "turn-sealed", turnId: "turn-a", tabId: "c" }); + // Turn B (a queued message) starts streaming while still scrolled up. + store.handleEvent({ type: "turn-start", turnId: "turn-b", tabId: "c" }); + store.handleEvent({ type: "status", status: "running", tabId: "c" }); + store.handleEvent({ type: "text-delta", delta: "B in progress", tabId: "c" }); + let tab = store.tabs.find((t) => t.id === "c"); + expect(tab?.liveTurnId).toBe("turn-b"); + const bId = tab?.currentAssistantId; + expect(bId).toBeTruthy(); + + // User scrolls down → the deferred reconcile for turn-a flushes. + store.setScrolledUp("c", false); + await tick(); + tab = store.tabs.find((t) => t.id === "c"); + // Turn A folded into the sealed log... + expect(tab?.chunks.map((c) => c.seq)).toEqual([0, 1]); + // ...but turn B's in-flight state survived intact (no wipe / no remount). + expect(tab?.liveTurnId).toBe("turn-b"); + expect(tab?.currentAssistantId).toBe(bId); + expect(tab?.live.some((m) => m.turnId === "turn-b")).toBe(true); + expect(tab?.live.some((m) => m.turnId === "turn-a")).toBe(false); + }); +}); -- cgit v1.2.3