diff options
| author | Adam Malczewski <[email protected]> | 2026-05-30 23:14:55 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-05-30 23:14:55 +0900 |
| commit | 624b808da0f2f8bbad8a4fbbcca3f82f24ecfc47 (patch) | |
| tree | 869d34092345344ff13953398f876c8b38c8116a /packages/api | |
| parent | b19f1aafc43141a865ecd40a813ed3212e77d95e (diff) | |
| download | dispatch-624b808da0f2f8bbad8a4fbbcca3f82f24ecfc47.tar.gz dispatch-624b808da0f2f8bbad8a4fbbcca3f82f24ecfc47.zip | |
feat(chunks): chunk-native frontend store with turn-sealed reconcile + per-chunk eviction
Replace the stored ChatMessage[] with a chunk-native model: tab.chunks (sealed
ChunkRow[]) + tab.live (transient in-flight turn buffer) + derived tab.renderGroups.
This enables per-chunk eviction (trimming WITHIN a large turn) and raw-chunk
pagination (loadOlderChunks), removing the whole-message eviction limitation.
Backend:
- Emit turn-start/turn-sealed around each turn; expose currentTurnId in the status
snapshot. turn-sealed fires after the durable write (status:idle fires before it).
- New GET /tabs/:id/chunks raw paginated endpoint (limit/before).
- Wrap appendChunks in a single SQLite transaction.
Frontend:
- turn-sealed drives a turn-aware reconcile that folds the sealed turn into chunks
while preserving a concurrent newer in-flight turn and pending queued messages;
deferred while the user is scrolled up.
- Stable turn-scoped render keys (${turnId}:${role}:${n}) avoid remount/flash.
Reconcile correctness (three review passes):
- preserve a concurrent newer turn when an earlier deferred reconcile flushes;
- keep optimistic queued user messages (no loss);
- turn-start backfill skips pending queued rows and tags only the turn initiator;
- bind consumed interrupt messages to the in-flight turn so they collapse on seal
(no lingering/duplicated bubble).
Tests: chat-store reconcile/eviction/pagination suite; api chunks endpoint + events.
Diffstat (limited to 'packages/api')
| -rw-r--r-- | packages/api/src/agent-manager.ts | 13 | ||||
| -rw-r--r-- | packages/api/src/routes/tabs.ts | 24 | ||||
| -rw-r--r-- | packages/api/tests/agent-manager.test.ts | 32 | ||||
| -rw-r--r-- | packages/api/tests/routes.test.ts | 13 |
4 files changed, 80 insertions, 2 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index f7975d1..5f2027d 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -799,6 +799,9 @@ export class AgentManager { if (tabAgent.currentAssistantId) { snap.currentAssistantId = tabAgent.currentAssistantId; } + if (tabAgent.currentTurnId) { + snap.currentTurnId = tabAgent.currentTurnId; + } } result[tabId] = snap; } @@ -1119,6 +1122,10 @@ export class AgentManager { // chunk rows — shares one `turn_id`. const turnId = crypto.randomUUID(); tabAgent.currentTurnId = turnId; + // Announce the turn so the frontend can tag its live chunks with this + // turn_id (stable render keys → flicker-free reconcile when the turn + // seals). Emitted before any content delta. + this.emit({ type: "turn-start", turnId }, tabId); appendChunks(tabId, explodeUserText(turnId, message)); // Store agent models on the tab if provided (defines fallback order) @@ -1284,6 +1291,12 @@ export class AgentManager { this.emit({ type: "status", status: "error" }, tabId); break; } + // Turn fully settled and its chunks are now persisted (flushAssistant ran + // above). Signal the frontend that the turn's rows — with real seqs — are + // durable so it can fold its live representation into the sealed log. + // Emitted AFTER status:idle/error (which fire before the DB write). + this.emit({ type: "turn-sealed", turnId }, tabId); + // Turn fully settled — clear the shared turn id. tabAgent.currentTurnId = null; diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts index e9265ec..b1e9659 100644 --- a/packages/api/src/routes/tabs.ts +++ b/packages/api/src/routes/tabs.ts @@ -93,6 +93,30 @@ tabsRoutes.get("/:id/messages", (c) => { return c.json({ messages, total, oldestSeq }); }); +// Raw chunk window for a tab — the chunk-native frontend's load/paginate +// source. Same `limit`/`before` chunk-`seq` windowing as `/messages`, but +// returns the flat `ChunkRow[]` WITHOUT server-side grouping (the frontend +// groups for render and evicts/paginates on the flat list). Dedupe on the +// client by `seq` when overlap-fetching. +tabsRoutes.get("/:id/chunks", (c) => { + const id = c.req.param("id"); + const limitRaw = c.req.query("limit"); + const beforeRaw = c.req.query("before"); + const limit = limitRaw !== undefined ? Number(limitRaw) : undefined; + const before = beforeRaw !== undefined ? Number(beforeRaw) : undefined; + const options = + limit !== undefined || before !== undefined + ? { + ...(limit !== undefined && Number.isFinite(limit) ? { limit } : {}), + ...(before !== undefined && Number.isFinite(before) ? { before } : {}), + } + : undefined; + const chunks = getChunksForTab(id, options); + const oldestSeq = chunks.length > 0 ? (chunks[0]?.seq ?? null) : null; + const total = getTotalChunkCount(id); + return c.json({ chunks, total, oldestSeq }); +}); + tabsRoutes.patch("/:id", async (c) => { const id = c.req.param("id"); const body = await c.req.json<{ diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index 6b016db..4415bbb 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -340,15 +340,43 @@ describe("AgentManager", () => { await manager.processMessage("tab-1", "test"); expect(events.length).toBeGreaterThan(0); - expect(events[0]).toMatchObject({ type: "status", status: "running" }); + // A turn now opens with `turn-start`, immediately followed by the + // agent's `status: running`. + expect(events[0]).toMatchObject({ type: "turn-start" }); + expect(events[1]).toMatchObject({ type: "status", status: "running" }); + // A turn now closes with `turn-sealed` (emitted after the DB write, which + // is after the agent's final `status: idle`). const lastEvent = events[events.length - 1]; - expect(lastEvent).toMatchObject({ type: "status", status: "idle" }); + expect(lastEvent).toMatchObject({ type: "turn-sealed" }); + expect(events.some((e) => e.type === "status" && e.status === "idle")).toBe(true); const doneEvent = events.find((e) => e.type === "done"); expect(doneEvent).toBeDefined(); }); + it("emits a turn-start with a turnId before any content event", async () => { + const manager = new AgentManager(); + const events: AgentEvent[] = []; + manager.onEvent((event) => { + events.push(event); + }); + + await manager.processMessage("tab-turnstart", "go"); + + const turnStartIdx = events.findIndex((e) => e.type === "turn-start"); + expect(turnStartIdx).toBeGreaterThanOrEqual(0); + const turnStart = events[turnStartIdx] as Extract<AgentEvent, { type: "turn-start" }>; + expect(typeof turnStart.turnId).toBe("string"); + expect(turnStart.turnId.length).toBeGreaterThan(0); + + // Must precede the first content delta. + const firstContentIdx = events.findIndex( + (e) => e.type === "text-delta" || e.type === "reasoning-delta", + ); + expect(firstContentIdx).toBeGreaterThan(turnStartIdx); + }); + it("emits text-delta events during processMessage", async () => { const manager = new AgentManager(); const events: AgentEvent[] = []; diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index f4de845..4b8dd40 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -335,6 +335,19 @@ describe("POST /chat", () => { }); }); +describe("GET /tabs/:id/chunks", () => { + it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => { + const res = await app.request("/tabs/tab-x/chunks?limit=50"); + expect(res.status).toBe(200); + const body = await res.json(); + // Mocked getChunksForTab returns [] → empty window, null cursor. + expect(Array.isArray(body.chunks)).toBe(true); + expect(body.chunks).toEqual([]); + expect(body.total).toBe(0); + expect(body.oldestSeq).toBeNull(); + }); +}); + describe("POST /chat/stop", () => { it("returns 200 with success: true for valid tabId", async () => { const res = await app.request("/chat/stop", { |
