diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 19:00:29 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 19:00:29 +0900 |
| commit | d66585333ee5764700c67a81eaec015b0026f8f1 (patch) | |
| tree | 6e1ac455c2ecbf3c442fce9f73fdaed8fb71fade | |
| parent | 1764e3e5dff836255d121a933dd92542368346f9 (diff) | |
| download | dispatch-web-d66585333ee5764700c67a81eaec015b0026f8f1.tar.gz dispatch-web-d66585333ee5764700c67a81eaec015b0026f8f1.zip | |
feat(chat): consume CR-5 history windowing — server-windowed cold loads + show-earlier backfill
Re-pinned [email protected]>0.10.0 + [email protected]>0.6.1 (reply
frontend-history-windowing-handoff.md); re-mirrored both .dispatch references.
- HistorySync port gains optional { limit?, beforeSeq? } (CR-5 params); the
app's createHistorySync appends them to GET /conversations/:id.
- COLD-cache fresh load now fetches ?sinceSeq=0&limit=<floor(0.75xL)> — a huge
conversation no longer ships whole to show 192 chunks. A warm-cache tail sync
stays unwindowed (windowing a tail that outgrew the limit would leave a
silent seq gap behind the cache).
- hasEarlier now derives from the [email protected] CONTRACT (1-based gap-free seqs):
loaded window starting above seq 1 => older history exists — covering both
locally-trimmed AND server-windowed transcripts (the watermark stays as the
merge floor only).
- showEarlier(): local cache first; when the cache doesn't reach far enough
back, backfills the missing older run via ?beforeSeq=<oldestKnown>&limit=
and persists it (next page-in is local). latestSeq windowed-read caveat is
satisfied structurally (tail cursor derives from the cache's max seq).
- live-probe: +6 CR-5 checks (seq origin, newest-k ascending, short-chat
exactness, beforeSeq paging, 400 validation x2). NOT yet run live — backend
was down at commit time; run pending.
- backend-handoff.md: CR-5 RESOLVED, pins/mirrors current. 602 tests green x2.
| -rw-r--r-- | .dispatch/transport-contract.reference.md | 71 | ||||
| -rw-r--r-- | .dispatch/wire.reference.md | 26 | ||||
| -rw-r--r-- | AGENTS.md | 3 | ||||
| -rw-r--r-- | GLOSSARY.md | 2 | ||||
| -rw-r--r-- | backend-handoff.md | 63 | ||||
| -rw-r--r-- | scripts/live-probe.ts | 57 | ||||
| -rw-r--r-- | src/app/store.svelte.ts | 15 | ||||
| -rw-r--r-- | src/core/chunks/trim.test.ts | 30 | ||||
| -rw-r--r-- | src/core/chunks/trim.ts | 52 | ||||
| -rw-r--r-- | src/features/chat/index.ts | 2 | ||||
| -rw-r--r-- | src/features/chat/ports.ts | 21 | ||||
| -rw-r--r-- | src/features/chat/store.svelte.ts | 59 | ||||
| -rw-r--r-- | src/features/chat/store.test.ts | 75 | ||||
| -rw-r--r-- | src/features/chat/test-helpers.ts | 23 |
14 files changed, 384 insertions, 115 deletions
diff --git a/.dispatch/transport-contract.reference.md b/.dispatch/transport-contract.reference.md index e6ab799..774cfb0 100644 --- a/.dispatch/transport-contract.reference.md +++ b/.dispatch/transport-contract.reference.md @@ -5,10 +5,23 @@ > hangs on a permission prompt). Your CODE still imports `@dispatch/transport-contract` normally — > this file is for READING only. > -> **Orchestrator:** SNAPSHOT of `[email protected]` (CR-4 cache-warming lifecycle shipped). -> Depends on `@dispatch/[email protected]` (see `wire.reference.md`) + `@dispatch/[email protected]` (see +> **Orchestrator:** SNAPSHOT of `[email protected]` (CR-5 history windowing shipped). +> Depends on `@dispatch/[email protected]` (see `wire.reference.md`) + `@dispatch/[email protected]` (see > `ui-contract.reference.md`). > +> **2026-06-12 delta (CR-5 history windowing — package bumped `0.9.0` → `0.10.0`):** NO type-shape +> change — `GET /conversations/:id` gains two OPTIONAL query params alongside `sinceSeq`: +> **`limit=<k>`** (the NEWEST `k` chunks of the selection, still ASCENDING; a selection with ≤ `k` +> chunks is returned whole; omitted = full selection, byte-identical to the old behavior) and +> **`beforeSeq=<s>`** (exclusive upper bound `seq < s`; combined: `sinceSeq < seq < beforeSeq`). +> `limit`/`beforeSeq` must be POSITIVE integers (`sinceSeq` may still be 0); malformed/zero/negative +> → HTTP 400 `{ error }` naming the param. Seq numbering is now a WRITTEN CONTRACT: 1-based, +> monotonic, gap-free (see `[email protected]` `StoredChunk`), so `hasOlder = oldestLoaded.seq > 1` — there +> is deliberately NO `earliestSeq`/`hasOlder` field. CAVEAT: on a windowed read, `latestSeq` +> describes the returned WINDOW; never regress a tail cursor from a `beforeSeq` backfill page. +> Intended flows: fresh load `?sinceSeq=0&limit=<k>` · tail sync `?sinceSeq=<cursor>` (no limit) · +> page older in `?beforeSeq=<oldestLoadedSeq>&limit=<k>`. +> > **2026-06-12 delta (CR-4 cache-warming lifecycle — package bumped `0.8.0` → `0.9.0`):** adds > `POST /conversations/:id/close` (`CloseConversationResponse`) — the EXPLICIT "user closed this > conversation's tab" affordance, distinct from a socket disconnect / `chat.unsubscribe` (which @@ -92,9 +105,11 @@ - `POST /chat` — body `ChatRequest` (JSON); response NDJSON stream, one `AgentEvent` per line; resolved id also in `X-Conversation-Id` header. - `GET /models` — `ModelsResponse`. -- `GET /conversations/:id?sinceSeq=<n>` — `ConversationHistoryResponse`: RAW, append-order, - seq-ordered slice with `seq > n` (NOT reconciled — dangling tool-calls returned as-is). - `latestSeq` = last chunk's `seq`, or the requested `sinceSeq` when caught up (empty `chunks`). +- `GET /conversations/:id?sinceSeq=<n>&beforeSeq=<s>&limit=<k>` — `ConversationHistoryResponse`: + RAW, append-order, seq-ordered slice with `n < seq < s`, windowed to the NEWEST `k` (all params + optional; NOT reconciled — dangling tool-calls returned as-is). `latestSeq` = last chunk's `seq`, + or the requested `sinceSeq` when caught up (empty `chunks`) — a TAIL cursor only; do not regress + a cursor from a windowed/backfill read. `limit`/`beforeSeq` must be positive ints → else 400. - `GET /conversations/:id/metrics` — `ConversationMetricsResponse`: every SEALED turn's `TurnMetrics` in turn order (per-turn token + timing; NOT seq-filtered). IMPLEMENTED + LIVE-VERIFIED (probe 17/17). - `POST /chat/warm` — body `WarmRequest` (JSON) → `200 WarmResponse` (cache-warm usage incl. @@ -182,23 +197,49 @@ export interface ModelsResponse { } /** - * Response body for `GET /conversations/:id?sinceSeq=<n>` — the incremental - * read-side history endpoint a long-lived client uses to (re)hydrate a - * conversation cheaply. + * Response body for + * `GET /conversations/:id?sinceSeq=<n>&beforeSeq=<s>&limit=<k>` — the + * incremental read-side history endpoint a long-lived client uses to + * (re)hydrate a conversation cheaply. All three query params are OPTIONAL and + * combine as one SELECTION + one WINDOW: + * + * - **Selection** — `sinceSeq` (exclusive lower bound, `seq > n`; omitted/0 = + * from the start) and `beforeSeq` (exclusive upper bound, `seq < s`; omitted + * = to the end). Together: `n < seq < s`. + * - **Window** — `limit=<k>` returns only the NEWEST `k` chunks of the + * selection (the response stays ASCENDING by seq). A selection with ≤ `k` + * chunks is returned whole. `limit` omitted = the full selection — exactly + * the pre-windowing behavior, so existing clients are unchanged. + * - `limit` and `beforeSeq` must be POSITIVE integers (`sinceSeq` may be 0); + * malformed, zero, or negative values → HTTP 400 `{ error }`. + * + * Intended client flows: fresh load = `?sinceSeq=0&limit=<k>` (newest window); + * tail sync = `?sinceSeq=<cursor>` (no limit); page older history in = + * `?beforeSeq=<oldestLoadedSeq>&limit=<k>`. + * + * Seq numbering is **1-based and gap-free** (a CONTRACTUAL GUARANTEE — see + * `StoredChunk` in `@dispatch/wire`): a client can derive "older chunks exist" + * purely from `oldestLoaded.seq > 1`; there is deliberately no + * `earliestSeq`/`hasOlder` response field. * * `chunks` is the RAW, append-order, seq-ordered slice of the conversation log - * with `seq > sinceSeq` (or the whole log when `sinceSeq` is omitted/0). It is - * NOT reconciled: a dangling tool-call is returned as-is (rendered as an - * interrupted call). Reconciliation is a turn-path concern — the server repairs - * history only when it feeds a provider, never on this read path — which is what - * preserves the per-chunk `seq` cursor invariant (a synthesized repair chunk - * would have no seq). + * selected + windowed as above. It is NOT reconciled: a dangling tool-call is + * returned as-is (rendered as an interrupted call). Reconciliation is a + * turn-path concern — the server repairs history only when it feeds a provider, + * never on this read path — which is what preserves the per-chunk `seq` cursor + * invariant (a synthesized repair chunk would have no seq). * * `latestSeq` is the `seq` of the LAST chunk in this response, or — when the * slice is empty (the client is already caught up) — the requested `sinceSeq` * (0 for a full read of an empty conversation). So after applying the response a * client's new cursor is always `latestSeq`, and an empty `chunks` means - * "nothing new past your cursor". + * "nothing new past your cursor". CAVEAT (windowed reads): `latestSeq` is a + * TAIL-sync cursor — on a `beforeSeq` backfill page (or any `limit`ed read that + * did not reach the log's true tail) it describes the returned window, NOT the + * conversation's high-water mark, so a client must not regress its sync cursor + * from a backfill response. (A true server-side high-water mark independent of + * the filter is deferred until a consumer needs it — it would require widening + * the store contract.) */ export interface ConversationHistoryResponse { readonly chunks: readonly StoredChunk[]; diff --git a/.dispatch/wire.reference.md b/.dispatch/wire.reference.md index 40f94cf..1d761bf 100644 --- a/.dispatch/wire.reference.md +++ b/.dispatch/wire.reference.md @@ -4,8 +4,14 @@ > types WITHOUT following the `file:` dep symlink out of this repo (which hangs on a permission > prompt). Your CODE still imports `@dispatch/wire` normally — this file is for READING only. > -> **Orchestrator:** SNAPSHOT of `[email protected]` (the metrics types below + the `user-message` turn event -> shipped + version-bumped). Regenerate whenever `@dispatch/wire` changes. +> **Orchestrator:** SNAPSHOT of `[email protected]` (doc-only bump: the 1-based gap-free seq guarantee +> codified on `StoredChunk`). Regenerate whenever `@dispatch/wire` changes. +> +> **2026-06-12 delta (CR-5 history windowing — package bumped `0.6.0` → `0.6.1`, DOC-ONLY):** the +> per-conversation `seq` numbering is now a WRITTEN CONTRACTUAL GUARANTEE on `StoredChunk`: +> **1-based, monotonic, gap-free** — a conversation's first chunk is always `seq === 1` and +> numbering never skips. A client holding only a windowed suffix of the log derives "older chunks +> exist server-side" purely from `oldestLoaded.seq > 1` (no `earliestSeq`/`hasOlder` field exists). > > **2026-06-12 delta (CR-3 user-message handoff — package bumped `0.5.0` → `0.6.0`, ADDITIVE):** adds a > new `AgentEvent` union member `TurnInputEvent` (`{ type: "user-message"; conversationId; turnId; text }`) @@ -168,11 +174,17 @@ export interface ChatMessage { /** * A persisted chunk plus its sync metadata. The append-only conversation log - * stamps every chunk with a monotonic, gap-free, per-conversation `seq` (the - * sync cursor, assigned in append order) and records the `role` of the message - * it belongs to. This makes a flat seq-ordered stream both incrementally - * syncable ("give me chunks after seq N") and regroupable into messages by the - * client. `chunk` is the content unit — `Chunk` carries no storage/sync cursor + * stamps every chunk with a **1-based**, monotonic, gap-free, per-conversation + * `seq` (the sync cursor, assigned in append order) and records the `role` of + * the message it belongs to. This makes a flat seq-ordered stream both + * incrementally syncable ("give me chunks after seq N") and regroupable into + * messages by the client. + * + * The 1-based start is a CONTRACTUAL GUARANTEE (not an implementation detail): + * a conversation's first chunk is always `seq === 1` and numbering never skips, + * so a client holding only a windowed suffix of the log can derive "older + * chunks exist server-side" purely from `oldestLoaded.seq > 1` — no separate + * has-older flag is needed (or provided). `chunk` is the content unit — `Chunk` carries no storage/sync cursor * (`seq` lives here on the envelope, not on the chunk, since it is assigned by * the store and the provider has no use for it). A chunk MAY still carry * generation provenance assigned at production time (e.g. a tool chunk's @@ -139,7 +139,8 @@ streaming; tabs + model selector + DaisyUI/dracula), plus per-conversation cwd + context size, cache-warming (+ retention/timer), markdown, smart auto-scroll, multi-client live view (subscribe/reconnect + the user prompt on the event stream), and the chat limit (bulk quarter-unload past `dispatch.chatLimit`, 75% fresh-load window, show-earlier page-in; -`core/chunks/trim.ts`; backend CR-5 open for `?limit=`/`?beforeSeq=`). Plan in +`core/chunks/trim.ts`; CR-5 `?limit=`/`?beforeSeq=` CONSUMED — server-windowed cold loads + +show-earlier server backfill; `hasOlder` from the 1-based gap-free seq contract). Plan in `../arch-rewrite/notes/frontend-design.md` §10. ## Reports diff --git a/GLOSSARY.md b/GLOSSARY.md index 2f4f199..a9c7017 100644 --- a/GLOSSARY.md +++ b/GLOSSARY.md @@ -39,4 +39,4 @@ | **TPS** (tokens per second) | A FE-DERIVED decode rate: `outputTokens / (decodeMs / 1000)` (per step; per turn over Σ `decodeMs`), falling back to `genTotalMs` when `decodeMs` is absent. The backend-recommended basis (excludes first-token latency). Not carried on the wire; omitted when timing is absent. | throughput | | **chat limit** | The max LOADED chunks per conversation (default 256; localStorage `dispatch.chatLimit`, no UI yet) before the oldest quarter is unloaded. Counts **chunks** (committed + provisional + accumulating). Policy in `core/chunks/trim.ts`. | chunk limit, message limit, history limit | | **unload** | Drop the oldest COMMITTED chunks from the in-memory transcript (and DOM) past the **chat limit** — in BULK (`ceil(limit/4)` per pass, deferred while the reader is scrolled up), never one-per-delta (old Dispatch's scroll-jump bug). Purely local: the IndexedDB cache and the server keep everything; `TranscriptState.hiddenBeforeSeq` is the watermark. Distinct from the conversation-cache's cross-conversation **eviction**. | evict (reserved for the cross-conversation cache), prune, drop | -| **show earlier** | The affordance at the top of a transcript with unloaded history ("Show earlier messages"): pages one unload-unit back in from the local cache (later: the server via CR-5 `?beforeSeq=`), preserving the reader's scroll position. | load more, pagination | +| **show earlier** | The affordance at the top of a transcript with unloaded history ("Show earlier messages"): pages one unload-unit back in — local cache first, then the server (CR-5 `?beforeSeq=&limit=`) when the cache doesn't reach far enough back — preserving the reader's scroll position. Offered whenever the loaded window starts above seq 1 (the [email protected] 1-based gap-free seq contract). | load more, pagination | diff --git a/backend-handoff.md b/backend-handoff.md index 4410b44..5193d8f 100644 --- a/backend-handoff.md +++ b/backend-handoff.md @@ -5,15 +5,13 @@ > **From:** dispatch-web orchestrator · **To:** arch-rewrite orchestrator · **Courier:** the user. > `lsp` does NOT span the repos (AGENTS.md § Backend seam) — every cross-repo ask flows through here. -_Last updated: 2026-06-12 (CR-5 opened: chat-limit history windowing). **FE is current on -consumed: surfaces + WS, conversation transcript/metrics, tabs + model selector, cache-warming -(incl. authoritative timer + retention + cache-rate fix + the CR-4 lifecycle below), -**per-conversation cwd + LSP status**, **context size**, and **turn continuity + multi-client -live view**. -**Open asks: ONE — CR-5** (`?limit=`/`?beforeSeq=` on `GET /conversations/:id`; NOT a blocker, -courier doc `backend-handoff-chat-limit.md`). CR-1/CR-2/CR-4 all RESOLVED ✅ (see §2); §3 lists -likely next asks. +_Last updated: 2026-06-12 (CR-5 consumed). **FE is current on `[email protected]` / +`[email protected]` / `[email protected]`.** All handoffs to date are consumed: surfaces + WS, +conversation transcript/metrics, tabs + model selector, cache-warming (incl. authoritative timer ++ retention + cache-rate fix + the CR-4 lifecycle below), **per-conversation cwd + LSP status**, +**context size**, **turn continuity + multi-client live view**, and the **chat limit + CR-5 +history windowing** (below). +**Open asks: NONE.** CR-1/CR-2/CR-4/CR-5 all RESOLVED ✅ (see §2); §3 lists likely next asks. **CR-3 (watcher couldn't see the USER prompt until seal) → RESOLVED ✅** — backend shipped the `user-message` turn event; FE re-pinned + consumption live. The cwd/LSP draft-path verification (`backend-handoff-cwd-lsp.md`) came back **all ✅ confirmed**._ @@ -63,7 +61,7 @@ backend ask — but the max-limit denominator is now a live FE need; see §3. ## 1. Pinned backend contracts (consumed by the FE) | Package | Used for | |---|---| @@ -72,7 +70,8 @@ Pinned as `file:` deps: **`[email protected]`; `[email protected]`; `transport-contract | `@dispatch/transport-contract` | `ChatRequest`/`ModelsResponse`/`ConversationHistoryResponse`/`ConversationMetricsResponse` + `WarmRequest`/`WarmResponse` + `CwdResponse`/`SetCwdRequest` + LSP (`LspStatusResponse`/`LspServerInfo`/`LspServerState`) + WS chat ops + `WsClientMessage`/`WsServerMessage` | Endpoints in use (HTTP **24203**, WS **24205**, CORS `*` incl. `PUT`): -`POST /chat` (NDJSON) · `GET /models` · `GET /conversations/:id?sinceSeq=<n>` · +`POST /chat` (NDJSON) · `GET /models` · +`GET /conversations/:id?sinceSeq=<n>&beforeSeq=<s>&limit=<k>` (CR-5 windowing) · `GET /conversations/:id/metrics` · `GET`/`PUT /conversations/:id/cwd` · `GET /conversations/:id/lsp` · `POST /chat/warm` · `POST /conversations/:id/close` (explicit tab-close: abort turn + stop/disable warming) · WS `chat.send`→`chat.delta` · @@ -80,28 +79,32 @@ WS `chat.subscribe`/`chat.unsubscribe` (watch a conversation's turns without sen Mirrored in-repo for headless agents: `.dispatch/{ui-contract,wire,transport-contract}.reference.md` (regenerate on any contract bump; all current as of `[email protected]` / -`[email protected]` / `[email protected]`). +`[email protected]` / `[email protected]`). ## 2. Open asks FOR THE BACKEND -**One open: CR-5.** Resolved history below it. - -### CR-5 — history windowing for the FE chat limit → **OPEN (not a blocker)** (courier `backend-handoff-chat-limit.md`) - -The FE is shipping a **chat limit** (default 256 chunks, localStorage-configurable): past the -limit it bulk-unloads the oldest `ceil(L/4)` chunks (scroll-jump-free, unlike old Dispatch's -one-per-delta eviction), and a fresh page load shows only the newest `floor(0.75×L)`. Works -today by fetching the full history and windowing in memory — the ask makes the FRESH-BROWSER -load cheap (today `?sinceSeq=0` returns the whole conversation; a 10k-chunk chat downloads -megabytes to show 192 chunks). Additive `transport-contract` asks: -- **`?limit=<n>`** on `GET /conversations/:id` — newest `n` of the selection, still ascending; - **≤ `n` chunks exist ⇒ return everything** (the FE always sends it; short chats must stay - exact). Absent ⇒ today's behavior. -- **`?beforeSeq=<s>`** — selection `seq < s` (with `limit`: newest `n` below `s`) — the - "Show earlier messages" page-in path once the FE's local cache is exhausted. -- **`earliestSeq?`/`hasOlder?` response field** — OR simply confirm in writing that seqs start - at 1 gap-free, and the FE derives `hasOlder` from `chunks[0].seq > 1` (cheapest, preferred). -Full consumption plan + sequencing in the courier doc. +**None open.** Resolved history below. + +### CR-5 — history windowing for the FE chat limit → **RESOLVED ✅** (courier `backend-handoff-chat-limit.md`; reply `frontend-history-windowing-handoff.md`; consumed) + +Backend shipped everything asked (`[email protected]`, `[email protected]` doc-only): +`?limit=<k>` (newest-k of the selection, ascending; ≤ k ⇒ whole selection, exact) + +`?beforeSeq=<s>` (exclusive `seq < s`; combined `sinceSeq < seq < s`) on +`GET /conversations/:id`; ask #3 answered via our preferred cheapest option — a WRITTEN +contractual guarantee that per-conversation seqs are **1-based, monotonic, gap-free** (codified +on `StoredChunk`), so the FE derives `hasOlder = oldestLoaded.seq > 1` (no new response field). +Validation: `limit`/`beforeSeq` must be positive ints, else 400 (FE never sends `beforeSeq=0` — +`oldestLoaded.seq === 1` already means "nothing older"). **FE consumed:** re-pinned + re-mirrored +both `.dispatch/*.reference.md`; `HistorySync` port gained an optional `{ limit?, beforeSeq? }` +window; a COLD-cache fresh load now fetches `?sinceSeq=0&limit=<floor(0.75×L)>` (a warm-cache +tail sync stays unwindowed — windowing a tail that outgrew the limit would leave a silent seq +gap behind the cache); `hasEarlier` is seq-derived per the new contract; "Show earlier messages" +pages from the local cache first and BACKFILLS the missing older run via +`?beforeSeq=<oldestKnown>&limit=` (persisted to cache, so the next page-in is local). The +`latestSeq` windowed-read caveat is satisfied structurally: the FE's tail cursor derives from +the cache's max seq, never from a response's `latestSeq`. Chat-limit recap (FE-side, shipped +with CR-5 still open): default 256 (`localStorage["dispatch.chatLimit"]`), bulk quarter-unload +past the limit gated on reader-at-bottom, 75% fresh-load window. ### CR-1 — Loaded Extensions as a true table → **RESOLVED ✅** (shipped + consumed) diff --git a/scripts/live-probe.ts b/scripts/live-probe.ts index 7099b44..f44a136 100644 --- a/scripts/live-probe.ts +++ b/scripts/live-probe.ts @@ -75,13 +75,27 @@ function fail(msg: string): never { process.exit(1); } -async function historySync(id: string, sinceSeq: number): Promise<ConversationHistoryResponse> { - const url = `${HTTP_BASE}/conversations/${encodeURIComponent(id)}?sinceSeq=${sinceSeq}`; +async function historySync( + id: string, + sinceSeq: number, + window?: { limit?: number; beforeSeq?: number }, +): Promise<ConversationHistoryResponse> { + let url = `${HTTP_BASE}/conversations/${encodeURIComponent(id)}?sinceSeq=${sinceSeq}`; + if (window?.limit !== undefined) url += `&limit=${window.limit}`; + if (window?.beforeSeq !== undefined) url += `&beforeSeq=${window.beforeSeq}`; const res = await fetch(url, { headers: { Origin: "http://localhost:24204" } }); if (!res.ok) fail(`history fetch ${res.status} for ${url}`); return (await res.json()) as ConversationHistoryResponse; } +/** Raw history GET that returns the status (for the CR-5 validation checks). */ +async function historyStatus(id: string, query: string): Promise<number> { + const url = `${HTTP_BASE}/conversations/${encodeURIComponent(id)}?${query}`; + const res = await fetch(url, { headers: { Origin: "http://localhost:24204" } }); + await res.arrayBuffer(); // drain + return res.status; +} + /** Durable metrics fetch — returns the response, or the HTTP status when not OK * (the endpoint is being implemented backend-side; the FE tolerates a 404). */ async function metricsSync(id: string): Promise<ConversationMetricsResponse | { status: number }> { @@ -203,6 +217,45 @@ async function main() { .join(""); record("turn 1 committed transcript has assistant text", committedText.length > 0); + // ─── CR-5: history windowing (?limit= / ?beforeSeq=, [email protected]) ─────── + const logLen = hist.chunks.length; + record( + "CR-5 seq origin: first chunk is seq 1 (1-based gap-free contract)", + hist.chunks[0]?.seq === 1, + `first seq=${hist.chunks[0]?.seq}`, + ); + const win = await historySync(textConv, 0, { limit: 2 }); + record( + "CR-5 ?limit=2 returns the NEWEST 2, ascending, latestSeq = window tail", + win.chunks.length === Math.min(2, logLen) && + win.chunks[0]?.seq === Math.max(1, logLen - 1) && + win.chunks[win.chunks.length - 1]?.seq === logLen && + win.latestSeq === logLen, + `seqs=[${win.chunks.map((c) => c.seq).join(",")}] latestSeq=${win.latestSeq}`, + ); + const whole = await historySync(textConv, 0, { limit: 200 }); + record( + "CR-5 ?limit= larger than the log returns everything (short-chat flow exact)", + whole.chunks.length === logLen, + `${whole.chunks.length}/${logLen} chunks`, + ); + const oldestLoaded = win.chunks[0]?.seq ?? 0; + if (oldestLoaded > 1) { + const back = await historySync(textConv, 0, { beforeSeq: oldestLoaded, limit: 50 }); + record( + "CR-5 ?beforeSeq= pages the older run (seq < bound, ascending from 1)", + back.chunks.length === oldestLoaded - 1 && + back.chunks[0]?.seq === 1 && + back.chunks.every((c) => c.seq < oldestLoaded), + `seqs=[${back.chunks.map((c) => c.seq).join(",")}]`, + ); + } + record("CR-5 limit=0 rejected with 400", (await historyStatus(textConv, "limit=0")) === 400); + record( + "CR-5 beforeSeq=-1 rejected with 400", + (await historyStatus(textConv, "beforeSeq=-1")) === 400, + ); + // ─── Metrics: LIVE token + timing ([email protected] usage/step-complete/done) ────── // (TurnMetricsEntry is `{ turnId, steps, total }` — the turn aggregate lives on // `total`, present once the live `done` folded.) diff --git a/src/app/store.svelte.ts b/src/app/store.svelte.ts index 379805f..999f2be 100644 --- a/src/app/store.svelte.ts +++ b/src/app/store.svelte.ts @@ -25,7 +25,7 @@ import { subscribe as protocolSubscribe, unsubscribe as protocolUnsubscribe, } from "../core/protocol"; -import type { ChatStore, MetricsSync } from "../features/chat"; +import type { ChatStore, HistorySync, MetricsSync } from "../features/chat"; import { createChatStore } from "../features/chat"; import type { ConversationCache } from "../features/conversation-cache"; import { createConversationCache } from "../features/conversation-cache"; @@ -111,12 +111,13 @@ export interface CreateAppStoreOptions { localStorage?: Storage; } -function createHistorySync( - httpBase: string, - fetchImpl: typeof fetch, -): (conversationId: string, sinceSeq: number) => Promise<ConversationHistoryResponse> { - return async (conversationId: string, sinceSeq: number) => { - const url = `${httpBase}/conversations/${encodeURIComponent(conversationId)}?sinceSeq=${sinceSeq}`; +function createHistorySync(httpBase: string, fetchImpl: typeof fetch): HistorySync { + return async (conversationId, sinceSeq, window) => { + let url = `${httpBase}/conversations/${encodeURIComponent(conversationId)}?sinceSeq=${sinceSeq}`; + // CR-5 windowing ([email protected]): both must be positive + // integers when present (the server 400s otherwise; callers guarantee it). + if (window?.limit !== undefined) url += `&limit=${window.limit}`; + if (window?.beforeSeq !== undefined) url += `&beforeSeq=${window.beforeSeq}`; const res = await fetchImpl(url); if (!res.ok) { throw new Error(`History sync failed: ${res.status}`); diff --git a/src/core/chunks/trim.test.ts b/src/core/chunks/trim.test.ts index 091b646..7914f35 100644 --- a/src/core/chunks/trim.test.ts +++ b/src/core/chunks/trim.test.ts @@ -177,28 +177,43 @@ describe("restoreEarlier", () => { expect(selectHasEarlier(restored)).toBe(true); }); - it("clears the watermark when the restore exhausts known earlier history", () => { + it("restoring down to seq 1 reaches the contractual origin (hasEarlier clears)", () => { const windowed = windowTranscript(stateWith(chunks(1, 100)), 75); // hidden: 1..25 const restored = restoreEarlier(windowed, chunks(1, 100), 64); expect(restored.committed).toHaveLength(100); expect(restored.committed[0]?.seq).toBe(1); - expect(restored.hiddenBeforeSeq).toBe(0); + expect(restored.hiddenBeforeSeq).toBe(1); // floor at the origin — inert expect(restored.hiddenThinkingCount).toBe(0); expect(selectHasEarlier(restored)).toBe(false); }); - it("clears the watermark when nothing is actually below it", () => { + it("is the identity when nothing older is known locally (server may still hold more)", () => { const windowed = windowTranscript(stateWith(chunks(50, 200)), 75); const restored = restoreEarlier(windowed, [], 64); - expect(restored.hiddenBeforeSeq).toBe(0); - expect(restored.committed).toEqual(windowed.committed); + expect(restored).toBe(windowed); + // seqs are 1-based gap-free: window starts at 126 ⇒ older chunks DO exist. + expect(selectHasEarlier(restored)).toBe(true); }); - it("is the identity when nothing is hidden", () => { + it("is the identity when the window already starts at seq 1", () => { const state = stateWith(chunks(1, 10)); expect(restoreEarlier(state, chunks(1, 10), 5)).toBe(state); }); + it("works on a server-windowed transcript (no local watermark)", () => { + // A cold-cache fresh load with `?limit=` commits a suffix (seq 809..1000) + // with hiddenBeforeSeq still 0 — hasEarlier derives from seq > 1, and a + // backfilled run merges below it. + const state = stateWith(chunks(809, 1000)); + expect(state.hiddenBeforeSeq).toBe(0); + expect(selectHasEarlier(state)).toBe(true); + const restored = restoreEarlier(state, chunks(745, 808), 64); + expect(restored.committed[0]?.seq).toBe(745); + expect(restored.committed).toHaveLength(192 + 64); + expect(restored.hiddenBeforeSeq).toBe(745); + expect(selectHasEarlier(restored)).toBe(true); + }); + it("decrements the hidden thinking count by the restored thinking chunks", () => { const committed = [chunk(1, "thinking"), chunk(2), chunk(3, "thinking"), ...chunks(4, 12)]; const trimmed = trimTranscript(stateWith(committed), 10); // drops 3: seqs 1..3 (2 thinking) @@ -213,6 +228,7 @@ describe("restoreEarlier", () => { const trimmed = trimTranscript(stateWith(original), 100); const restored = restoreEarlier(trimmed, original, 1000); expect(restored.committed).toEqual(original); - expect(restored.hiddenBeforeSeq).toBe(0); + expect(restored.hiddenBeforeSeq).toBe(1); + expect(selectHasEarlier(restored)).toBe(false); }); }); diff --git a/src/core/chunks/trim.ts b/src/core/chunks/trim.ts index 1733027..94065b3 100644 --- a/src/core/chunks/trim.ts +++ b/src/core/chunks/trim.ts @@ -110,40 +110,52 @@ export function windowTranscript(state: TranscriptState, maxCommitted: number): } /** - * Page earlier (unloaded) history back in — the "Show earlier messages" action. + * The oldest LOADED seq — the start of the transcript's loaded window. Usually + * `committed[0].seq`; falls back to the watermark when a trim emptied the + * committed list (all-provisional overflow). 0 = window start unknown/origin. + */ +function oldestLoadedSeq(state: TranscriptState): number { + return state.committed[0]?.seq ?? state.hiddenBeforeSeq; +} + +/** + * Page earlier history back in — the "Show earlier messages" action. * - * `earlier` must be ALL locally-known chunks below the watermark (typically the - * full cached conversation; chunks at/above the watermark are ignored). The - * newest `count` of them are merged back in front of `committed` and the - * watermark lowers to the new oldest loaded seq — or clears to 0 when this - * restore exhausts the known earlier history (nothing left to offer). + * `earlier` is every locally-known chunk older than the loaded window + * (typically the full cached conversation, possibly extended by a CR-5 + * `?beforeSeq=` backfill; chunks at/inside the window are ignored). The newest + * `count` of them are merged back in front of `committed`, and the watermark + * follows the new window start so history merges still can't resurrect what + * remains unloaded. Identity when the window already starts at seq 1 (the + * contractual origin) or nothing older is known locally. */ export function restoreEarlier( state: TranscriptState, earlier: readonly StoredChunk[], count: number, ): TranscriptState { - if (state.hiddenBeforeSeq <= 0) return state; - const below = earlier.filter((c) => c.seq < state.hiddenBeforeSeq).sort((a, b) => a.seq - b.seq); - if (below.length === 0) { - // Nothing is actually hidden below the watermark: clear it so the - // "Show earlier" affordance disappears. - return { ...state, hiddenBeforeSeq: 0, hiddenThinkingCount: 0 }; - } + const oldest = oldestLoadedSeq(state); + if (oldest <= 1) return state; + const below = earlier.filter((c) => c.seq < oldest).sort((a, b) => a.seq - b.seq); + if (below.length === 0) return state; const keep = below.slice(-Math.max(1, count)); - const exhausted = keep.length === below.length; const firstKept = keep[0]; return { ...state, committed: [...keep, ...state.committed], - hiddenBeforeSeq: exhausted || firstKept === undefined ? 0 : firstKept.seq, - hiddenThinkingCount: exhausted - ? 0 - : Math.max(0, state.hiddenThinkingCount - countThinking(keep)), + hiddenBeforeSeq: firstKept?.seq ?? state.hiddenBeforeSeq, + hiddenThinkingCount: Math.max(0, state.hiddenThinkingCount - countThinking(keep)), }; } -/** Whether unloaded earlier history exists to offer ("Show earlier messages"). */ +/** + * Whether earlier history exists below the loaded window — drives the + * "Show earlier messages" affordance. Derived from the [email protected] CONTRACT + * that per-conversation seqs are 1-based and gap-free: a loaded window that + * starts above seq 1 means older chunks exist (locally cached or server-side), + * whether the window came from a local trim or a server-windowed (`?limit=`) + * fresh load. + */ export function selectHasEarlier(state: TranscriptState): boolean { - return state.hiddenBeforeSeq > 0; + return oldestLoadedSeq(state) > 1; } diff --git a/src/features/chat/index.ts b/src/features/chat/index.ts index 18ed693..139a64f 100644 --- a/src/features/chat/index.ts +++ b/src/features/chat/index.ts @@ -1,7 +1,7 @@ export type { RenderedChunk, RenderGroup, ToolBatchEntry } from "../../core/chunks"; export { groupRenderedChunks } from "../../core/chunks"; export type { TurnMetricsEntry } from "../../core/metrics"; -export type { ChatTransport, HistorySync, MetricsSync } from "./ports"; +export type { ChatTransport, HistorySync, HistoryWindow, MetricsSync } from "./ports"; export type { ChatStore, ChatStoreDependencies } from "./store.svelte"; export { createChatStore } from "./store.svelte"; export { default as ChatView } from "./ui/ChatView.svelte"; diff --git a/src/features/chat/ports.ts b/src/features/chat/ports.ts index e28ebf6..f8c665f 100644 --- a/src/features/chat/ports.ts +++ b/src/features/chat/ports.ts @@ -9,10 +9,29 @@ export interface ChatTransport { send(msg: ChatSendMessage): void; } -/** Injected history-sync port — fetches incremental history from the server. */ +/** + * Optional windowing for a history fetch ([email protected], CR-5). + * Both must be POSITIVE integers when present (the server 400s otherwise). + */ +export interface HistoryWindow { + /** Return only the NEWEST `limit` chunks of the selection (still ascending). */ + readonly limit?: number; + /** Exclusive upper bound: only chunks with `seq < beforeSeq` (backfill paging). */ + readonly beforeSeq?: number; +} + +/** + * Injected history-sync port — fetches incremental history from the server + * (`GET /conversations/:id?sinceSeq=&beforeSeq=&limit=`). NOTE the contract + * caveat: on a windowed/backfill read the response's `latestSeq` describes the + * returned window, not the conversation's high-water mark — never regress a + * tail cursor from it (the FE's cursor comes from the cache's max seq, which + * satisfies this naturally). + */ export type HistorySync = ( conversationId: string, sinceSeq: number, + window?: HistoryWindow, ) => Promise<ConversationHistoryResponse>; /** Injected metrics-sync port — fetches persisted per-turn metrics from the server. */ diff --git a/src/features/chat/store.svelte.ts b/src/features/chat/store.svelte.ts index 5ca28af..e74980d 100644 --- a/src/features/chat/store.svelte.ts +++ b/src/features/chat/store.svelte.ts @@ -92,10 +92,10 @@ export interface ChatStore { setModel(model: string): void; load(): Promise<void>; /** - * Page one unload-unit (`ceil(limit/4)`) of earlier history back in from the - * local cache — the "Show earlier messages" action. (When the backend ships - * CR-5 `?beforeSeq=`, this can fall through to the server once the cache is - * exhausted.) + * Page one unload-unit (`ceil(limit/4)`) of earlier history back in — the + * "Show earlier messages" action. Local cache first; when the cache doesn't + * reach far enough back (a server-windowed fresh load), the missing older + * run is fetched via CR-5 `?beforeSeq=&limit=` and persisted to the cache. */ showEarlier(): Promise<void>; /** @@ -129,12 +129,21 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { transcript = trimTranscript(transcript, chatLimit); } - async function syncTail(): Promise<void> { + /** + * Pull `seq > cache-cursor` from the server and fold it in. `coldLimit`, when + * given AND the cache is empty (a truly fresh browser), windows the fetch to + * the newest N chunks (CR-5 `?limit=`) so a huge conversation doesn't ship + * whole. It is deliberately NOT applied to a warm-cache tail: windowing a + * tail that grew past N while we were away would leave a silent seq GAP + * between the cache and the fetched window. + */ + async function syncTail(coldLimit?: number): Promise<void> { if (disposed || _pendingSync) return; _pendingSync = true; try { const since = await deps.cache.sinceSeq(deps.conversationId); - const res = await deps.historySync(deps.conversationId, since); + const window = since === 0 && coldLimit !== undefined ? { limit: coldLimit } : undefined; + const res = await deps.historySync(deps.conversationId, since, window); const merged = await deps.cache.commit(deps.conversationId, res.chunks); transcript = applyHistory(transcript, merged); maybeTrim(); @@ -227,24 +236,48 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { async load(): Promise<void> { // Fresh load shows only the newest 75% of the limit — headroom before the - // first trim. Window the cached slice SYNCHRONOUSLY with its apply (no - // render in between), and again after the tail sync (a cold cache means - // syncTail pulled the whole history in one response). + // first trim. A warm cache is windowed locally (synchronously with its + // apply — no render in between); a COLD cache passes the window to the + // server instead (CR-5 `?limit=`), so a huge conversation never ships + // whole. The post-sync window re-asserts the cap either way. const windowSize = initialWindowSize(chatLimit); const cached = await deps.cache.load(deps.conversationId); if (cached.length > 0) { transcript = windowTranscript(applyHistory(transcript, cached), windowSize); } - await syncTail(); + await syncTail(windowSize); transcript = windowTranscript(transcript, windowSize); await syncMetrics(); }, async showEarlier(): Promise<void> { if (disposed) return; - if (!selectHasEarlier(transcript)) return; - const cached = await deps.cache.load(deps.conversationId); - transcript = restoreEarlier(transcript, cached, unloadCount(chatLimit)); + const oldest = transcript.committed[0]?.seq ?? transcript.hiddenBeforeSeq; + if (oldest <= 1) return; + const want = unloadCount(chatLimit); + try { + let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest); + // The local cache may not reach far enough back (a server-windowed + // fresh load cached only the window): page the missing OLDER run in + // from the server (CR-5 `?beforeSeq=&limit=`) and persist it, so the + // next page-in is local. Seqs are gap-free, so the fetched run is + // contiguous with what we hold. NOTE: the backfill response's + // `latestSeq` is a window cursor — never fed to the tail cursor + // (ours derives from the cache's max seq). + const oldestKnown = earlier[0]?.seq ?? oldest; + if (earlier.length < want && oldestKnown > 1) { + const res = await deps.historySync(deps.conversationId, 0, { + beforeSeq: oldestKnown, + limit: want - earlier.length, + }); + const merged = await deps.cache.commit(deps.conversationId, res.chunks); + earlier = merged.filter((c) => c.seq < oldest); + } + transcript = restoreEarlier(transcript, earlier, want); + _error = null; + } catch (err) { + _error = err instanceof Error ? err.message : String(err); + } }, resync(): void { diff --git a/src/features/chat/store.test.ts b/src/features/chat/store.test.ts index 5c798d6..3232009 100644 --- a/src/features/chat/store.test.ts +++ b/src/features/chat/store.test.ts @@ -1054,12 +1054,12 @@ describe("createChatStore", () => { store.dispose(); }); - it("chat limit: a cold cache (fresh browser) windows the full server history to 75%", async () => { + it("chat limit: a cold cache (fresh browser) asks the SERVER for the 75% window (CR-5 ?limit=)", async () => { const transport = createFakeTransport(); const historySync = createFakeHistorySync(); const metricsSync = createFakeMetricsSync(); const cache = createFakeCache(); - // Backend has no limit param yet (CR-5): sinceSeq=0 returns EVERYTHING. + // The server holds 500 chunks; the windowed fetch returns the newest 75. historySync.returnChunks = Array.from({ length: 500 }, (_, i) => makeStoredChunk(i + 1)); const store = createChatStore({ @@ -1073,12 +1073,77 @@ describe("createChatStore", () => { await store.load(); + // The cold-cache initial sync carried the window (`?sinceSeq=0&limit=75`). + expect(historySync.calls[0]?.sinceSeq).toBe(0); + expect(historySync.calls[0]?.window).toEqual({ limit: 75 }); + expect(store.chunks).toHaveLength(75); expect(store.chunks[0]?.seq).toBe(426); + // hasEarlier derives from the 1-based gap-free seq contract (426 > 1) — + // no local watermark was ever set. + expect(store.hasEarlier).toBe(true); + // Only the window was shipped + cached (the point of CR-5). + const cached = await cache.impl.load(CONV_ID); + expect(cached).toHaveLength(75); + + store.dispose(); + }); + + it("chat limit: a warm cache syncs the tail UNWINDOWED (no seq gap behind the cache)", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + await cache.impl.commit(CONV_ID, [makeStoredChunk(1), makeStoredChunk(2)]); + historySync.returnChunks = [makeStoredChunk(3)]; + + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + + await store.load(); + + expect(historySync.calls[0]?.sinceSeq).toBe(2); + expect(historySync.calls[0]?.window).toBeUndefined(); + + store.dispose(); + }); + + it("chat limit: showEarlier backfills from the server when the cache is too shallow (CR-5 ?beforeSeq=)", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + historySync.returnChunks = Array.from({ length: 500 }, (_, i) => makeStoredChunk(i + 1)); + + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + + await store.load(); // server-windowed: loaded + cached = 426..500 + expect(store.chunks[0]?.seq).toBe(426); + + await store.showEarlier(); + + // Nothing below 426 was cached → fetched the missing run from the server. + const backfill = historySync.calls[1]; + expect(backfill?.window).toEqual({ beforeSeq: 426, limit: 25 }); + expect(store.chunks).toHaveLength(100); + expect(store.chunks[0]?.seq).toBe(401); expect(store.hasEarlier).toBe(true); - // The full history is still CACHED locally (show-earlier pages from it). + // The backfilled run is persisted: the NEXT page-in is cache-local. const cached = await cache.impl.load(CONV_ID); - expect(cached).toHaveLength(500); + expect(cached).toHaveLength(100); store.dispose(); }); @@ -1109,6 +1174,8 @@ describe("createChatStore", () => { expect(store.chunks).toHaveLength(100); expect(store.chunks[0]?.seq).toBe(401); expect(store.hasEarlier).toBe(true); + // The cache reached deep enough — no server backfill was needed. + expect(historySync.calls).toHaveLength(1); store.dispose(); }); diff --git a/src/features/chat/test-helpers.ts b/src/features/chat/test-helpers.ts index 07dad26..6bb98a1 100644 --- a/src/features/chat/test-helpers.ts +++ b/src/features/chat/test-helpers.ts @@ -1,6 +1,6 @@ import type { StoredChunk } from "@dispatch/wire"; import type { ConversationCache } from "../conversation-cache"; -import type { ChatTransport, HistorySync, MetricsSync } from "./ports"; +import type { ChatTransport, HistorySync, HistoryWindow, MetricsSync } from "./ports"; export interface FakeTransport { readonly sent: import("@dispatch/transport-contract").ChatSendMessage[]; @@ -20,14 +20,14 @@ export function createFakeTransport(): FakeTransport { } export interface FakeHistorySync { - readonly calls: Array<{ conversationId: string; sinceSeq: number }>; + readonly calls: Array<{ conversationId: string; sinceSeq: number; window?: HistoryWindow }>; /** Set the chunks to return on the next call. */ returnChunks: readonly StoredChunk[]; readonly impl: HistorySync; } export function createFakeHistorySync(): FakeHistorySync { - const calls: Array<{ conversationId: string; sinceSeq: number }> = []; + const calls: Array<{ conversationId: string; sinceSeq: number; window?: HistoryWindow }> = []; let returnChunks: readonly StoredChunk[] = []; return { calls, @@ -37,9 +37,20 @@ export function createFakeHistorySync(): FakeHistorySync { set returnChunks(v: readonly StoredChunk[]) { returnChunks = v; }, - impl: async (conversationId, sinceSeq) => { - calls.push({ conversationId, sinceSeq }); - const chunks = returnChunks; + impl: async (conversationId, sinceSeq, window) => { + calls.push({ conversationId, sinceSeq, ...(window !== undefined ? { window } : {}) }); + // Apply the CR-5 WINDOW semantics (`beforeSeq` bound, then newest-`limit`) + // so store tests exercise the real windowed flows. `sinceSeq` filtering is + // deliberately NOT applied — tests set `returnChunks` to the slice they + // mean the server to hold past the cursor. + let chunks = returnChunks; + const before = window?.beforeSeq; + if (before !== undefined) { + chunks = chunks.filter((c) => c.seq < before); + } + if (window?.limit !== undefined && chunks.length > window.limit) { + chunks = chunks.slice(-window.limit); + } const latestSeq = chunks.length > 0 ? Math.max(...chunks.map((c) => c.seq)) : sinceSeq; return { chunks, latestSeq }; }, |
