diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 18:37:09 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 18:37:09 +0900 |
| commit | dbf77ba78ff840e0ed5f6294030523fe3ab121fa (patch) | |
| tree | e768aef3edd0c126212058c3d1433355594c49be | |
| parent | 6689eb51b467d8e370f31495840d88661f978168 (diff) | |
| download | dispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.tar.gz dispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.zip | |
feat(history): CR-5 windowed reads — ?limit= / ?beforeSeq= on GET /conversations/:id
Selection sinceSeq < seq < beforeSeq; newest-limit window, ascending; positive-
integer validation (400, store never sees an invalid window); 1-based gap-free
seq codified as the contractual has-older mechanism (no earliestSeq field).
transport-contract 0.9.0->0.10.0, wire 0.6.0->0.6.1 (doc-only).
conversation-store +8 tests, transport-http +20; 935 vitest + 112 bun green.
Live-verified: 6/6 probe checks OK. FE courier: frontend-history-windowing-handoff.md
| -rw-r--r-- | frontend-history-windowing-handoff.md | 70 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.test.ts | 88 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.ts | 45 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 2 | ||||
| -rw-r--r-- | packages/transport-contract/src/index.ts | 48 | ||||
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 169 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 39 | ||||
| -rw-r--r-- | packages/transport-http/src/index.ts | 3 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.test.ts | 54 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.ts | 27 | ||||
| -rw-r--r-- | packages/wire/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 16 | ||||
| -rw-r--r-- | tasks.md | 80 |
13 files changed, 614 insertions, 29 deletions
diff --git a/frontend-history-windowing-handoff.md b/frontend-history-windowing-handoff.md new file mode 100644 index 0000000..f9244b1 --- /dev/null +++ b/frontend-history-windowing-handoff.md @@ -0,0 +1,70 @@ +# Backend → frontend handoff — CR-5: history windowing (`limit` / `beforeSeq`) + +> **From:** arch-rewrite · **To:** dispatch-web · **Courier:** the user. +> Reply to `backend-handoff-chat-limit.md` (CR-5). 2026-06-12. SHIPPED. + +## What shipped + +`GET /conversations/:id` now takes two OPTIONAL query params on top of +`sinceSeq` (all combinable; authoritative spec = the +`ConversationHistoryResponse` JSDoc in `@dispatch/transport-contract`): + +1. **`limit=<k>`** — returns only the NEWEST `k` chunks of the selection, + still ASCENDING by seq. A selection with ≤ `k` chunks is returned whole + (your `limit=192` against a short conversation gets the normal full + response, exact). `limit` absent → exactly the previous behavior. +2. **`beforeSeq=<s>`** — restricts the selection to `seq < s` (exclusive). + Combined semantics: `sinceSeq < seq < beforeSeq`; with `limit`: the newest + `k` chunks below `s`, ascending — your "Show earlier messages" page-in path. + +Your three flows, verbatim from your handoff, all work as written: + +- Fresh load: `?sinceSeq=0&limit=192` +- Tail sync: `?sinceSeq=<maxCachedSeq>` (unchanged) +- Page older in: `?beforeSeq=<oldestLoadedSeq>&limit=<ceil(L/4)>` + +## Ask #3 — our pick: the seq invariant, no new field (your "cheapest option") + +We CONFIRM IN WRITING, as a contractual guarantee (now codified in the +`StoredChunk` doc in `@dispatch/wire` and referenced from the history-response +doc): **per-conversation `seq` is 1-based, monotonic, and gap-free** — a +conversation's first chunk is always `seq === 1` and numbering never skips. + +So derive it exactly as you proposed: `hasOlder = oldestLoaded.seq > 1`. +There is deliberately NO `earliestSeq`/`hasOlder` response field. + +## Validation (new, only for the new params) + +`limit` and `beforeSeq` must be **positive integers** when present +(`sinceSeq` keeps its existing semantics — `0` = from the start). Malformed, +zero, or negative values → **HTTP 400 `{ error }`** (the error message names +the offending param). Don't send `beforeSeq=0` — and you never need to: +`oldestLoaded.seq === 1` already means there is nothing older. + +## `latestSeq` caveat (important for your cursor logic) + +`latestSeq` semantics are UNCHANGED (seq of the last returned chunk; the +requested `sinceSeq` when the slice is empty) — but on a **windowed** read it +describes the returned window, NOT the conversation's high-water mark: + +- A fresh `?sinceSeq=0&limit=192` load DID reach the true tail → `latestSeq` + is a valid sync cursor. +- A `?beforeSeq=...` backfill page did NOT → do not regress your tail cursor + from a backfill response. (Your seq-keyed dedup cache makes this natural — + just don't feed backfill `latestSeq` into the tail cursor.) + +## Versions (re-pin + re-mirror) + +- `@dispatch/transport-contract` **`0.9.0 → 0.10.0`** — the param/validation/ + caveat docs above (response TYPE shape unchanged; no new fields). +- `@dispatch/wire` **`0.6.0 → 0.6.1`** — doc-only: the 1-based seq guarantee + codified on `StoredChunk`. + +## Test coverage (backend, for your confidence) + +- conversation-store: +8 windowing tests (newest-N ascending, bounds, + combined bounds, page-in, empty selection, garbage-in, no-window regression + guard; the "gap-free 1-based seq" test now backs a written contract). +- transport-http: +20 route/param tests incl. all five 400 validation cases + and a no-params byte-identical regression guard. +- Full suite: typecheck clean · biome clean · 935 vitest + 112 bun tests green. diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts index 49b0e58..5b07eca 100644 --- a/packages/conversation-store/src/store.test.ts +++ b/packages/conversation-store/src/store.test.ts @@ -499,6 +499,94 @@ describe("ConversationStore", () => { }); }); +describe("ConversationStore loadSince windowing", () => { + let storage: StorageNamespace; + + beforeEach(() => { + storage = createMemoryStorage(); + }); + + // Append `count` single-chunk user messages so seq runs 1..count, gap-free. + async function seed(store: ReturnType<typeof createConversationStore>, count: number) { + const messages: ChatMessage[] = []; + for (let i = 1; i <= count; i++) { + messages.push({ role: "user", chunks: [{ type: "text", text: `m${i}` }] }); + } + await store.append("conv1", messages); + } + + it("limit returns the newest N of the selection, ascending by seq", async () => { + const store = createConversationStore(storage); + await seed(store, 5); + const chunks = await store.loadSince("conv1", 0, { limit: 2 }); + expect(chunks.map((c) => c.seq)).toEqual([4, 5]); + }); + + it("limit >= selection size returns the whole selection (exact, not truncated)", async () => { + const store = createConversationStore(storage); + await seed(store, 3); + const exactlyAll = await store.loadSince("conv1", 0, { limit: 3 }); + expect(exactlyAll.map((c) => c.seq)).toEqual([1, 2, 3]); + const overAll = await store.loadSince("conv1", 0, { limit: 99 }); + expect(overAll.map((c) => c.seq)).toEqual([1, 2, 3]); + }); + + it("beforeSeq bounds the selection exclusively (seq < beforeSeq)", async () => { + const store = createConversationStore(storage); + await seed(store, 5); + const chunks = await store.loadSince("conv1", 0, { beforeSeq: 3 }); + expect(chunks.map((c) => c.seq)).toEqual([1, 2]); + }); + + it("sinceSeq + beforeSeq combine to sinceSeq < seq < beforeSeq", async () => { + const store = createConversationStore(storage); + await seed(store, 6); + const chunks = await store.loadSince("conv1", 2, { beforeSeq: 5 }); + expect(chunks.map((c) => c.seq)).toEqual([3, 4]); + }); + + it("beforeSeq + limit: newest N below the bound, ascending (page older history in)", async () => { + const store = createConversationStore(storage); + await seed(store, 8); + const chunks = await store.loadSince("conv1", 0, { beforeSeq: 6, limit: 2 }); + expect(chunks.map((c) => c.seq)).toEqual([4, 5]); + }); + + it("empty selection returns [] (beforeSeq=1, and sinceSeq past the tail)", async () => { + const store = createConversationStore(storage); + await seed(store, 4); + expect(await store.loadSince("conv1", 0, { beforeSeq: 1 })).toEqual([]); + expect(await store.loadSince("conv1", 4, { limit: 3 })).toEqual([]); + }); + + it("non-positive / non-integer limit and beforeSeq are treated as absent", async () => { + const store = createConversationStore(storage); + await seed(store, 4); + const all = [1, 2, 3, 4]; + expect((await store.loadSince("conv1", 0, { limit: 0 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { limit: -2 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { limit: 1.5 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { beforeSeq: 0 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { beforeSeq: -3 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { beforeSeq: 2.7 })).map((c) => c.seq)).toEqual(all); + }); + + it("window omitted is identical to today's behavior (regression guard)", async () => { + const store = createConversationStore(storage); + await seed(store, 5); + const base = await store.loadSince("conv1", 1); + const withEmptyWindow = await store.loadSince("conv1", 1, {}); + // A caller whose window fields happen to be undefined (e.g. unset query + // params) — modelled as an optional-field record, not explicit `undefined` + // literals (which exactOptionalPropertyTypes rejects on the contract). + const undefinedFieldsWindow: { beforeSeq?: number; limit?: number } = {}; + const withUndefinedFields = await store.loadSince("conv1", 1, undefinedFieldsWindow); + expect(base.map((c) => c.seq)).toEqual([2, 3, 4, 5]); + expect(withEmptyWindow).toEqual(base); + expect(withUndefinedFields).toEqual(base); + }); +}); + describe("ConversationStore metrics", () => { let storage: StorageNamespace; diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts index 0948f64..0a42917 100644 --- a/packages/conversation-store/src/store.ts +++ b/packages/conversation-store/src/store.ts @@ -23,9 +23,33 @@ import { reconcileWithReport } from "./reconcile.js"; export interface ConversationStore { readonly append: (conversationId: string, messages: readonly ChatMessage[]) => Promise<void>; readonly load: (conversationId: string) => Promise<ChatMessage[]>; + /** + * Read the conversation's persisted chunks as a SELECTION + optional WINDOW, + * ascending by seq. The raw append-order log; NOT reconciled (a dangling + * tool-call is returned as-is — repair is a turn-path concern). + * + * - **Selection** — `sinceSeq` is an exclusive lower bound (`seq > sinceSeq`; + * omitted/`0`/non-positive/non-integer = from the start). When + * `window.beforeSeq` is given it is an exclusive upper bound + * (`seq < beforeSeq`). Together: `sinceSeq < seq < beforeSeq`. + * - **Window** — `window.limit` returns only the NEWEST `limit` chunks of the + * selection; the result STAYS ASCENDING by seq. A selection with ≤ `limit` + * chunks is returned whole (exact, not truncated). + * - **Omitted = unchanged** — `window` absent (or both its fields undefined) + * is byte-identical to the pre-windowing behavior, so existing callers that + * pass no third argument are unaffected. + * - **Garbage-in is forgiving** — a non-positive or non-integer `limit` (or + * `beforeSeq`) is treated as ABSENT (full selection); this method never + * throws on bad window input. The transport validates and 400s upstream. + * + * Seq numbering is 1-based and gap-free, so a client derives "older chunks + * exist" purely from the oldest returned `seq > 1`; there is deliberately no + * `earliestSeq`/high-water-mark API. + */ readonly loadSince: ( conversationId: string, sinceSeq?: number, + window?: { readonly beforeSeq?: number; readonly limit?: number }, ) => Promise<readonly StoredChunk[]>; readonly appendMetrics: (conversationId: string, metrics: TurnMetrics) => Promise<void>; readonly loadMetrics: (conversationId: string) => Promise<readonly TurnMetrics[]>; @@ -37,6 +61,16 @@ export interface ConversationStore { export const conversationStoreHandle = defineService<ConversationStore>("conversation-store/store"); +/** + * Coerce a window bound to a positive integer, or `undefined` (= absent) for any + * non-positive / non-integer / undefined input. Keeps `loadSince` total. + */ +function positiveInt(value: number | undefined): number | undefined { + if (value === undefined) return undefined; + if (!Number.isInteger(value) || value <= 0) return undefined; + return value; +} + interface PersistedChunkEntry { readonly chunk: Chunk; readonly role: Role; @@ -118,23 +152,32 @@ export function createConversationStore( return repaired; }, - async loadSince(conversationId, sinceSeq) { + async loadSince(conversationId, sinceSeq, window) { const prefix = chunkPrefix(conversationId); const keys = await storage.keys(prefix); const sorted = [...keys].sort(); const result: StoredChunk[] = []; const minSeq = sinceSeq ?? 0; + // Forgiving: a non-positive / non-integer bound is treated as ABSENT. + const beforeSeq = positiveInt(window?.beforeSeq); + const limit = positiveInt(window?.limit); for (const key of sorted) { const seq = parseSeq(key.split(":").pop() ?? null); if (seq <= minSeq) continue; + if (beforeSeq !== undefined && seq >= beforeSeq) continue; const value = await storage.get(key); if (value === null) continue; const entry = JSON.parse(value) as PersistedChunkEntry; result.push({ seq, role: entry.role, chunk: entry.chunk }); } + // Window: keep only the NEWEST `limit` chunks, still ascending by seq. + if (limit !== undefined && result.length > limit) { + return result.slice(result.length - limit); + } + return result; }, diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 2e9d586..84ceada 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.9.0", + "version": "0.10.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index b000147..b0f6e20 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -67,25 +67,49 @@ export interface ModelsResponse { } /** - * Response body for `GET /conversations/:id?sinceSeq=<n>` — the incremental - * read-side history endpoint a long-lived client uses to (re)hydrate a - * conversation cheaply. + * Response body for + * `GET /conversations/:id?sinceSeq=<n>&beforeSeq=<s>&limit=<k>` — the + * incremental read-side history endpoint a long-lived client uses to + * (re)hydrate a conversation cheaply. All three query params are OPTIONAL and + * combine as one SELECTION + one WINDOW: + * + * - **Selection** — `sinceSeq` (exclusive lower bound, `seq > n`; omitted/0 = + * from the start) and `beforeSeq` (exclusive upper bound, `seq < s`; omitted + * = to the end). Together: `n < seq < s`. + * - **Window** — `limit=<k>` returns only the NEWEST `k` chunks of the + * selection (the response stays ASCENDING by seq). A selection with ≤ `k` + * chunks is returned whole. `limit` omitted = the full selection — exactly + * the pre-windowing behavior, so existing clients are unchanged. + * - `limit` and `beforeSeq` must be POSITIVE integers (`sinceSeq` may be 0); + * malformed, zero, or negative values → HTTP 400 `{ error }`. + * + * Intended client flows: fresh load = `?sinceSeq=0&limit=<k>` (newest window); + * tail sync = `?sinceSeq=<cursor>` (no limit); page older history in = + * `?beforeSeq=<oldestLoadedSeq>&limit=<k>`. + * + * Seq numbering is **1-based and gap-free** (a CONTRACTUAL GUARANTEE — see + * `StoredChunk` in `@dispatch/wire`): a client can derive "older chunks exist" + * purely from `oldestLoaded.seq > 1`; there is deliberately no + * `earliestSeq`/`hasOlder` response field. * * `chunks` is the RAW, append-order, seq-ordered slice of the conversation log - * with `seq > sinceSeq` (or the whole log when `sinceSeq` is omitted/0). It is - * NOT reconciled: a dangling tool-call is returned as-is (rendered as an - * interrupted call). Reconciliation is a turn-path concern — the server repairs - * history only when it feeds a provider, never on this read path — which is what - * preserves the per-chunk `seq` cursor invariant (a synthesized repair chunk - * would have no seq). + * selected + windowed as above. It is NOT reconciled: a dangling tool-call is + * returned as-is (rendered as an interrupted call). Reconciliation is a + * turn-path concern — the server repairs history only when it feeds a provider, + * never on this read path — which is what preserves the per-chunk `seq` cursor + * invariant (a synthesized repair chunk would have no seq). * * `latestSeq` is the `seq` of the LAST chunk in this response, or — when the * slice is empty (the client is already caught up) — the requested `sinceSeq` * (0 for a full read of an empty conversation). So after applying the response a * client's new cursor is always `latestSeq`, and an empty `chunks` means - * "nothing new past your cursor". (A true server-side high-water mark - * independent of the filter is deferred until a consumer needs it — it would - * require widening the store contract.) + * "nothing new past your cursor". CAVEAT (windowed reads): `latestSeq` is a + * TAIL-sync cursor — on a `beforeSeq` backfill page (or any `limit`ed read that + * did not reach the log's true tail) it describes the returned window, NOT the + * conversation's high-water mark, so a client must not regress its sync cursor + * from a backfill response. (A true server-side high-water mark independent of + * the filter is deferred until a consumer needs it — it would require widening + * the store contract.) */ export interface ConversationHistoryResponse { readonly chunks: readonly StoredChunk[]; diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index c26a868..88de38f 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -86,10 +86,19 @@ function createFakeConversationStore( async load() { return []; }, - async loadSince(conversationId, sinceSeq) { + async loadSince(conversationId, sinceSeq, window) { const chunks = store.get(conversationId) ?? []; const minSeq = sinceSeq ?? 0; - return chunks.filter((c) => c.seq > minSeq); + const beforeSeq = window?.beforeSeq; + const limit = window?.limit; + const selected = chunks.filter( + (c) => c.seq > minSeq && (beforeSeq === undefined || c.seq < beforeSeq), + ); + // Window: keep only the NEWEST `limit`, still ascending by seq. + if (limit !== undefined && selected.length > limit) { + return selected.slice(selected.length - limit); + } + return selected; }, async appendMetrics() {}, async loadMetrics(conversationId) { @@ -710,6 +719,162 @@ describe("GET /conversations/:id", () => { const res = await app.request("/conversations/conv1?sinceSeq=-1"); expect(res.status).toBe(400); }); + + const sixChunks: StoredChunk[] = [ + { seq: 1, role: "user", chunk: { type: "text", text: "one" } }, + { seq: 2, role: "assistant", chunk: { type: "text", text: "two" } }, + { seq: 3, role: "user", chunk: { type: "text", text: "three" } }, + { seq: 4, role: "assistant", chunk: { type: "text", text: "four" } }, + { seq: 5, role: "user", chunk: { type: "text", text: "five" } }, + { seq: 6, role: "assistant", chunk: { type: "text", text: "six" } }, + ]; + + function appWithChunks(chunks: StoredChunk[]) { + const store = new Map<string, StoredChunk[]>([["conv1", chunks]]); + return createApp({ + conversationStore: createFakeConversationStore(store), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + } + + it("?limit=N returns only the newest N chunks, ascending, latestSeq = last seq", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?limit=2"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([5, 6]); + expect(body.latestSeq).toBe(6); + }); + + it("?limit=N with N >= conversation size returns the full log", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?limit=10"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4, 5, 6]); + expect(body.latestSeq).toBe(6); + }); + + it("?beforeSeq=S returns only chunks with seq < S", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?beforeSeq=3"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([1, 2]); + expect(body.latestSeq).toBe(2); + }); + + it("?beforeSeq=S&limit=N returns the newest N below S, ascending", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?beforeSeq=5&limit=2"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + // selection = seq 1..4; newest 2 = [3, 4] + expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]); + expect(body.latestSeq).toBe(4); + }); + + it("?sinceSeq=A&beforeSeq=B returns A < seq < B", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?sinceSeq=2&beforeSeq=5"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]); + expect(body.latestSeq).toBe(4); + }); + + describe("window param validation → 400 and store not called with an invalid window", () => { + function appCapturingWindow() { + const calls: { + readonly sinceSeq: number | undefined; + readonly window: { readonly beforeSeq?: number; readonly limit?: number } | undefined; + }[] = []; + const store: ConversationStore = { + async append() {}, + async load() { + return []; + }, + async loadSince(_conversationId, sinceSeq, window) { + calls.push({ sinceSeq, window }); + return []; + }, + async appendMetrics() {}, + async loadMetrics() { + return []; + }, + async getCwd() { + return null; + }, + async setCwd() {}, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + return { app, calls }; + } + + const cases: readonly { readonly name: string; readonly query: string }[] = [ + { name: "limit=0", query: "limit=0" }, + { name: "limit=-1", query: "limit=-1" }, + { name: "limit=abc", query: "limit=abc" }, + { name: "beforeSeq=0", query: "beforeSeq=0" }, + { name: "beforeSeq=1.5", query: "beforeSeq=1.5" }, + ]; + + for (const { name, query } of cases) { + it(`${name} → 400 { error } and loadSince is never called`, async () => { + const { app, calls } = appCapturingWindow(); + const res = await app.request(`/conversations/conv1?${query}`); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(typeof body.error).toBe("string"); + expect(body.error.length).toBeGreaterThan(0); + expect(calls).toHaveLength(0); + }); + } + }); + + it("no params → byte-identical to the no-window read (regression guard)", async () => { + // Rest-param spy: record how many args the route actually passes, so we + // can prove the third (window) arg is OMITTED entirely — not merely + // forwarded as undefined — preserving the existing two-arg call shape. + const argCounts: number[] = []; + const store: ConversationStore = { + async append() {}, + async load() { + return []; + }, + loadSince(...args: Parameters<ConversationStore["loadSince"]>) { + argCounts.push(args.length); + const sinceSeq = args[1] ?? 0; + return Promise.resolve(sampleChunks.filter((c) => c.seq > sinceSeq)); + }, + async appendMetrics() {}, + async loadMetrics() { + return []; + }, + async getCwd() { + return null; + }, + async setCwd() {}, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + + const res = await app.request("/conversations/conv1"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4]); + expect(body.latestSeq).toBe(4); + // Called once, with exactly two arguments (no window arg). + expect(argCounts).toEqual([2]); + }); }); describe("GET /conversations/:id/metrics", () => { diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 11d2850..ae24922 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -17,9 +17,11 @@ import { computeExpectedCacheRate, isParseError, isSinceSeqError, + isWindowParamError, parseChatBody, parseSinceSeq, parseWarmBody, + parseWindowParam, serializeEventLine, } from "./logic.js"; import { @@ -147,8 +149,43 @@ export function createApp(opts: CreateServerOptions): Hono { return c.json({ error: sinceSeqResult.error }, 400); } + // `limit` / `beforeSeq` are optional positive-integer history-window + // params. The store is deliberately forgiving (a 0/negative bound is + // treated as ABSENT), so we MUST reject malformed values here and never + // forward an invalid window. + const beforeSeqResult = parseWindowParam(c.req.query("beforeSeq"), "beforeSeq"); + if (isWindowParamError(beforeSeqResult)) { + log.warn("conversations: invalid beforeSeq", { + conversationId, + error: beforeSeqResult.error, + }); + return c.json({ error: beforeSeqResult.error }, 400); + } + const limitResult = parseWindowParam(c.req.query("limit"), "limit"); + if (isWindowParamError(limitResult)) { + log.warn("conversations: invalid limit", { + conversationId, + error: limitResult.error, + }); + return c.json({ error: limitResult.error }, 400); + } + + // Include only the fields actually provided (exactOptionalPropertyTypes), + // and omit the window argument entirely when neither was given — keeping + // the pre-windowing call shape byte-identical for existing callers. + const window: { readonly beforeSeq?: number; readonly limit?: number } | undefined = + beforeSeqResult !== undefined || limitResult !== undefined + ? { + ...(beforeSeqResult !== undefined ? { beforeSeq: beforeSeqResult } : {}), + ...(limitResult !== undefined ? { limit: limitResult } : {}), + } + : undefined; + try { - const chunks = await opts.conversationStore.loadSince(conversationId, sinceSeqResult); + const chunks = + window !== undefined + ? await opts.conversationStore.loadSince(conversationId, sinceSeqResult, window) + : await opts.conversationStore.loadSince(conversationId, sinceSeqResult); const latestSeq = chunks.length > 0 ? (chunks[chunks.length - 1]?.seq ?? sinceSeqResult) : sinceSeqResult; log.info("conversations: read", { diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts index 718aaef..1a42259 100644 --- a/packages/transport-http/src/index.ts +++ b/packages/transport-http/src/index.ts @@ -7,13 +7,16 @@ export type { ParseResult, SinceSeqResult, WarmBodyParsed, + WindowParamResult, } from "./logic.js"; export { computeCachePct, isParseError, isSinceSeqError, + isWindowParamError, parseChatBody, parseSinceSeq, + parseWindowParam, serializeEventLine, } from "./logic.js"; export type { diff --git a/packages/transport-http/src/logic.test.ts b/packages/transport-http/src/logic.test.ts index 19b47ef..9a7bda2 100644 --- a/packages/transport-http/src/logic.test.ts +++ b/packages/transport-http/src/logic.test.ts @@ -4,8 +4,10 @@ import { computeExpectedCacheRate, isParseError, isSinceSeqError, + isWindowParamError, parseChatBody, parseSinceSeq, + parseWindowParam, serializeEventLine, } from "./logic.js"; @@ -173,6 +175,58 @@ describe("parseSinceSeq", () => { }); }); +describe("parseWindowParam", () => { + it("returns undefined (absent) when undefined", () => { + expect(parseWindowParam(undefined, "limit")).toBeUndefined(); + }); + + it("returns undefined (absent) when empty string", () => { + expect(parseWindowParam("", "limit")).toBeUndefined(); + }); + + it("parses a valid positive integer", () => { + expect(parseWindowParam("1", "limit")).toBe(1); + expect(parseWindowParam("42", "beforeSeq")).toBe(42); + }); + + it("returns ParseError for zero (store would treat it as absent)", () => { + const result = parseWindowParam("0", "limit"); + expect(isWindowParamError(result)).toBe(true); + if (isWindowParamError(result)) { + expect(result.error).toContain("limit"); + expect(result.error).toContain("positive integer"); + } + }); + + it("returns ParseError for a negative integer", () => { + expect(isWindowParamError(parseWindowParam("-1", "limit"))).toBe(true); + }); + + it("returns ParseError for a non-integer", () => { + expect(isWindowParamError(parseWindowParam("1.5", "beforeSeq"))).toBe(true); + }); + + it("returns ParseError for a non-numeric string", () => { + const result = parseWindowParam("abc", "beforeSeq"); + expect(isWindowParamError(result)).toBe(true); + if (isWindowParamError(result)) { + expect(result.error).toContain("beforeSeq"); + } + }); + + it("names the param in the error message", () => { + const limit = parseWindowParam("0", "limit"); + const before = parseWindowParam("0", "beforeSeq"); + if (isWindowParamError(limit)) expect(limit.error).toContain("limit"); + if (isWindowParamError(before)) expect(before.error).toContain("beforeSeq"); + }); + + it("isWindowParamError is false for absent and for a valid number", () => { + expect(isWindowParamError(undefined)).toBe(false); + expect(isWindowParamError(5)).toBe(false); + }); +}); + describe("serializeEventLine", () => { it("serializes an event as JSON followed by newline", () => { const event: AgentEvent = { diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts index bddedf0..0008110 100644 --- a/packages/transport-http/src/logic.ts +++ b/packages/transport-http/src/logic.ts @@ -72,6 +72,33 @@ export function isSinceSeqError(result: SinceSeqResult): result is ParseError { return typeof result === "object"; } +/** + * Result of parsing an OPTIONAL positive-integer history-window query param + * (`limit` / `beforeSeq`): `undefined` = the param was absent (omit it from the + * store window), a `number` = a valid positive integer, or a {@link ParseError} + * for a malformed / zero / negative value (the route 400s on it). + */ +export type WindowParamResult = number | undefined | ParseError; + +/** + * Parse an optional `limit` / `beforeSeq` query param. Unlike `sinceSeq` these + * must be STRICTLY POSITIVE integers when present (zero is rejected, since the + * store treats a zero bound as absent and would silently return the full log). + * Absent (`undefined` / empty) is the valid "no window" case → `undefined`. + */ +export function parseWindowParam(raw: string | undefined, name: string): WindowParamResult { + if (raw === undefined || raw === "") return undefined; + const n = Number(raw); + if (!Number.isInteger(n) || n <= 0) { + return { error: `${name} must be a positive integer` }; + } + return n; +} + +export function isWindowParamError(result: WindowParamResult): result is ParseError { + return typeof result === "object" && result !== null; +} + export interface WarmBodyParsed { readonly conversationId: string; readonly model?: string; diff --git a/packages/wire/package.json b/packages/wire/package.json index f4ede19..d00772d 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.6.0", + "version": "0.6.1", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 28aab87..4fdf389 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -124,11 +124,17 @@ export interface ChatMessage { /** * A persisted chunk plus its sync metadata. The append-only conversation log - * stamps every chunk with a monotonic, gap-free, per-conversation `seq` (the - * sync cursor, assigned in append order) and records the `role` of the message - * it belongs to. This makes a flat seq-ordered stream both incrementally - * syncable ("give me chunks after seq N") and regroupable into messages by the - * client. `chunk` is the content unit — `Chunk` carries no storage/sync cursor + * stamps every chunk with a **1-based**, monotonic, gap-free, per-conversation + * `seq` (the sync cursor, assigned in append order) and records the `role` of + * the message it belongs to. This makes a flat seq-ordered stream both + * incrementally syncable ("give me chunks after seq N") and regroupable into + * messages by the client. + * + * The 1-based start is a CONTRACTUAL GUARANTEE (not an implementation detail): + * a conversation's first chunk is always `seq === 1` and numbering never skips, + * so a client holding only a windowed suffix of the log can derive "older + * chunks exist server-side" purely from `oldestLoaded.seq > 1` — no separate + * has-older flag is needed (or provided). `chunk` is the content unit — `Chunk` carries no storage/sync cursor * (`seq` lives here on the envelope, not on the chunk, since it is assigned by * the store and the provider has no use for it). A chunk MAY still carry * generation provenance assigned at production time (e.g. a tool chunk's @@ -26,8 +26,11 @@ extension through the host): ## How to run ```bash -KEY1=$(grep DISPATCH_API_KEY_OPENCODE1 .env | cut -d= -f2) -PORT=4567 DISPATCH_API_KEY="$KEY1" bun packages/host-bin/src/main.ts # boots app + collector +# .env auto-loads DISPATCH_API_KEY (do NOT re-export) and pins BACKEND_PORT (beats PORT). +# Private probe instance: override the port + ISOLATE data paths (ORCHESTRATOR §8): +BACKEND_PORT=4567 SURFACE_WS_PORT=4569 DISPATCH_DB=/tmp/opencode/probe/dispatch.db \ + DISPATCH_TRACE_DB=/tmp/opencode/probe/traces.db DISPATCH_JOURNAL=/tmp/opencode/probe/app.ndjson \ + bun packages/host-bin/src/main.ts # boots app + collector curl -s -X POST localhost:4567/chat -H 'content-type: application/json' \ -d '{"conversationId":"c1","message":"Say hello in 3 words."}' # field = conversationId ``` @@ -323,8 +326,35 @@ FE courier in: `../dispatch-web/backend-handoff-cache-warming.md` (+ CR-1/CR-2 f 2 automatic warms @5s each pushing future `nextWarmAt`, mid-turn close → `abortedTurn:true` + `done.reason:"aborted"` + warming disabled, catalog scopes + table field present, echo present. +## History windowing — FE CR-5 (DONE) +FE courier in: `../dispatch-web/backend-handoff-chat-limit.md` (+ living `backend-handoff.md` §2 +CR-5). Courier out: `frontend-history-windowing-handoff.md`. User-gated call: ask #3 shipped as +the INVARIANT option (no new field) — seq is contractually **1-based, monotonic, gap-free**; FE +derives `hasOlder` from `chunks[0].seq > 1`. +- **Wave 0 (orchestrator, contracts):** `limit`/`beforeSeq` query-param semantics + validation + + `latestSeq` windowed-read caveat documented on `ConversationHistoryResponse` + (`@dispatch/transport-contract` `0.9.0→0.10.0`); 1-based seq guarantee codified on + `StoredChunk` (`@dispatch/wire` `0.6.0→0.6.1`, doc-only). +- **Wave 1 — `conversation-store`:** additive `loadSince(id, sinceSeq?, window?: { beforeSeq?, + limit? })` — selection `sinceSeq < seq < beforeSeq`, newest-`limit` window, result stays + ascending; garbage-in treated as absent (transport validates upstream). +8 tests. +- **Wave 2 — `transport-http`:** parses + validates the params (positive integers; malformed/ + zero/negative → 400 `{ error }`, store never called with an invalid window); two-arg call + shape preserved when no params (regression-guarded). +20 tests. +- 935 vitest + 112 bun tests, typecheck + biome clean. **LIVE-VERIFIED** (isolated boot, real + flash turns): firstSeq=1; `limit=2`→`[5,6]` ascending w/ correct `latestSeq`; `limit=9999`→ + full log; `beforeSeq=3`→`[1,2]`; `beforeSeq=3&limit=1`→`[2]`; `limit=0`/`beforeSeq=0`/ + `limit=abc`→400×3. `RESULT: OK` ×6. +- **Scar tissue (process):** (1) probing with a PRIVATE boot was overkill — the windowing checks + are read-only GETs and the dev stack was running; prefer probing `bin/up`/`up2` or asking the + user (ORCHESTRATOR §8 updated). (2) The §8 boot recipe was stale (`DISPATCH_API_KEY_OPENCODE1` + doesn't exist; an empty re-export OVERRIDES `.env` → "No providers registered"; `.env`'s + `BACKEND_PORT` beats `PORT`; un-isolated data paths spawn a duplicate collector on the dev + DB) — recipe fixed in §8 + above. (3) Violated the bracket trick once (`pkill -f 'cr5-data'` + self-matched → killed parent shell, timeout-with-no-output); the existing §8 rule stands. + ## Open items -- **Context window LIMIT (next, sibling of context size):** expose the selected model's max +- **Context window LIMIT (deferred, sibling of context size):** expose the selected model's max context-window token limit so the FE can render `contextSize / limit` (e.g. `1286 / 200000`). Source = the provider/model catalog (`ModelInfo`); likely a field on the models response and/or stamped alongside `contextSize`. "context size" = current usage (DONE); "context @@ -341,9 +371,47 @@ FE courier in: `../dispatch-web/backend-handoff-cache-warming.md` (+ CR-1/CR-2 f chain). ## Roadmap -1. **CLI** — done. -2. **Web frontend** (in progress, SEPARATE repo `../dispatch-web`; Svelte + +1. **Web frontend** (in progress, SEPARATE repo `../dispatch-web`; Svelte + DaisyUI, same methodology). Slice 2 = browser chat MVP consuming the wire/transport-contract + metrics. Cross-repo contract changes are couriered via the user (ORCHESTRATOR §7); `lsp references` does not span repos. -3. **dedup / storage growth** — after the frontend (see Open items). +2. **CLI → open-tab handoff (cross-client messaging):** an AI given a short + conversation identifier (4+ chars) in chat can use the CLI to send a message + INTO that conversation, and an FE tab watching it sees the message + reply + live. To the receiving AI the message appears as a REGULAR user message + (no special framing). The broadcast plumbing already exists and is + live-verified (detached turns + `chat.subscribe` + CR-3 `user-message` + event); the gaps are (a) a conversation LIST/discovery endpoint (none + exists), (b) short-id / prefix resolution of `conversationId` in the CLI, + (c) end-to-end verification of the handoff workflow. CLI semantics: + - **read last message** (new): BLOCKING — waits until the in-flight turn + settles, then returns ONLY the AI's last message of that turn (not the + whole conversation). + - **send, no `--queue` flag (default):** BLOCKING — sends, waits for the + turn to settle, returns the AI's last message (same shape as the read). + - **send with `--queue`:** enqueues the message into the conversation's + message queue (roadmap item 3) and exits immediately. +3. **Message queue + steering injection (backend core; prerequisite for the + `--queue` flag in item 2):** a per-conversation queue a client (FE or CLI) + can push a message onto while a turn is GENERATING. Delivery semantics: + - On the turn's next TOOL CALL, queued messages are injected as "steering" + messages returned alongside the tool result (the model sees them + mid-turn and can adjust course). + - If the turn ends before any tool call fires, the queued messages are + COMBINED and sent as a fresh user message starting a NEW turn. + - FE queueing UX (queue while generating) couriered to `../dispatch-web`. + Touches the kernel turn loop (injection at the tool-result boundary) — a + contract-first design pass needed before summoning. +4. **CLI flag to open/activate an FE tab:** optional CLI flag (new or existing + conversation) that makes an already-open frontend open the conversation as a + tab and mark it active. Does NOT exist today — no backend→FE "open/focus + conversation" push (only the inverse, `POST /conversations/:id/close`). + Needs a new broadcast WS op + endpoint/flag here, and FE handling couriered + to `../dispatch-web`. +5. **`todo` tool** — a per-conversation task-list tool the model maintains + (like opencode's todowrite/todoread), as a standard tool extension; likely a + surface so the FE can render the live list. +6. **`web_search` tool** — a web search tool (like old dispatch's; + reference-only source at `../dispatch-source`), as a standard tool extension. + +(Done and dropped from the list: CLI; dedup / storage growth.) |
