diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 18:37:09 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 18:37:09 +0900 |
| commit | dbf77ba78ff840e0ed5f6294030523fe3ab121fa (patch) | |
| tree | e768aef3edd0c126212058c3d1433355594c49be /packages | |
| parent | 6689eb51b467d8e370f31495840d88661f978168 (diff) | |
| download | dispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.tar.gz dispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.zip | |
feat(history): CR-5 windowed reads — ?limit= / ?beforeSeq= on GET /conversations/:id
Selection sinceSeq < seq < beforeSeq; newest-limit window, ascending; positive-
integer validation (400, store never sees an invalid window); 1-based gap-free
seq codified as the contractual has-older mechanism (no earliestSeq field).
transport-contract 0.9.0->0.10.0, wire 0.6.0->0.6.1 (doc-only).
conversation-store +8 tests, transport-http +20; 935 vitest + 112 bun green.
Live-verified: 6/6 probe checks OK. FE courier: frontend-history-windowing-handoff.md
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/conversation-store/src/store.test.ts | 88 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.ts | 45 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 2 | ||||
| -rw-r--r-- | packages/transport-contract/src/index.ts | 48 | ||||
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 169 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 39 | ||||
| -rw-r--r-- | packages/transport-http/src/index.ts | 3 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.test.ts | 54 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.ts | 27 | ||||
| -rw-r--r-- | packages/wire/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 16 |
11 files changed, 470 insertions, 23 deletions
diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts index 49b0e58..5b07eca 100644 --- a/packages/conversation-store/src/store.test.ts +++ b/packages/conversation-store/src/store.test.ts @@ -499,6 +499,94 @@ describe("ConversationStore", () => { }); }); +describe("ConversationStore loadSince windowing", () => { + let storage: StorageNamespace; + + beforeEach(() => { + storage = createMemoryStorage(); + }); + + // Append `count` single-chunk user messages so seq runs 1..count, gap-free. + async function seed(store: ReturnType<typeof createConversationStore>, count: number) { + const messages: ChatMessage[] = []; + for (let i = 1; i <= count; i++) { + messages.push({ role: "user", chunks: [{ type: "text", text: `m${i}` }] }); + } + await store.append("conv1", messages); + } + + it("limit returns the newest N of the selection, ascending by seq", async () => { + const store = createConversationStore(storage); + await seed(store, 5); + const chunks = await store.loadSince("conv1", 0, { limit: 2 }); + expect(chunks.map((c) => c.seq)).toEqual([4, 5]); + }); + + it("limit >= selection size returns the whole selection (exact, not truncated)", async () => { + const store = createConversationStore(storage); + await seed(store, 3); + const exactlyAll = await store.loadSince("conv1", 0, { limit: 3 }); + expect(exactlyAll.map((c) => c.seq)).toEqual([1, 2, 3]); + const overAll = await store.loadSince("conv1", 0, { limit: 99 }); + expect(overAll.map((c) => c.seq)).toEqual([1, 2, 3]); + }); + + it("beforeSeq bounds the selection exclusively (seq < beforeSeq)", async () => { + const store = createConversationStore(storage); + await seed(store, 5); + const chunks = await store.loadSince("conv1", 0, { beforeSeq: 3 }); + expect(chunks.map((c) => c.seq)).toEqual([1, 2]); + }); + + it("sinceSeq + beforeSeq combine to sinceSeq < seq < beforeSeq", async () => { + const store = createConversationStore(storage); + await seed(store, 6); + const chunks = await store.loadSince("conv1", 2, { beforeSeq: 5 }); + expect(chunks.map((c) => c.seq)).toEqual([3, 4]); + }); + + it("beforeSeq + limit: newest N below the bound, ascending (page older history in)", async () => { + const store = createConversationStore(storage); + await seed(store, 8); + const chunks = await store.loadSince("conv1", 0, { beforeSeq: 6, limit: 2 }); + expect(chunks.map((c) => c.seq)).toEqual([4, 5]); + }); + + it("empty selection returns [] (beforeSeq=1, and sinceSeq past the tail)", async () => { + const store = createConversationStore(storage); + await seed(store, 4); + expect(await store.loadSince("conv1", 0, { beforeSeq: 1 })).toEqual([]); + expect(await store.loadSince("conv1", 4, { limit: 3 })).toEqual([]); + }); + + it("non-positive / non-integer limit and beforeSeq are treated as absent", async () => { + const store = createConversationStore(storage); + await seed(store, 4); + const all = [1, 2, 3, 4]; + expect((await store.loadSince("conv1", 0, { limit: 0 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { limit: -2 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { limit: 1.5 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { beforeSeq: 0 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { beforeSeq: -3 })).map((c) => c.seq)).toEqual(all); + expect((await store.loadSince("conv1", 0, { beforeSeq: 2.7 })).map((c) => c.seq)).toEqual(all); + }); + + it("window omitted is identical to today's behavior (regression guard)", async () => { + const store = createConversationStore(storage); + await seed(store, 5); + const base = await store.loadSince("conv1", 1); + const withEmptyWindow = await store.loadSince("conv1", 1, {}); + // A caller whose window fields happen to be undefined (e.g. unset query + // params) — modelled as an optional-field record, not explicit `undefined` + // literals (which exactOptionalPropertyTypes rejects on the contract). + const undefinedFieldsWindow: { beforeSeq?: number; limit?: number } = {}; + const withUndefinedFields = await store.loadSince("conv1", 1, undefinedFieldsWindow); + expect(base.map((c) => c.seq)).toEqual([2, 3, 4, 5]); + expect(withEmptyWindow).toEqual(base); + expect(withUndefinedFields).toEqual(base); + }); +}); + describe("ConversationStore metrics", () => { let storage: StorageNamespace; diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts index 0948f64..0a42917 100644 --- a/packages/conversation-store/src/store.ts +++ b/packages/conversation-store/src/store.ts @@ -23,9 +23,33 @@ import { reconcileWithReport } from "./reconcile.js"; export interface ConversationStore { readonly append: (conversationId: string, messages: readonly ChatMessage[]) => Promise<void>; readonly load: (conversationId: string) => Promise<ChatMessage[]>; + /** + * Read the conversation's persisted chunks as a SELECTION + optional WINDOW, + * ascending by seq. The raw append-order log; NOT reconciled (a dangling + * tool-call is returned as-is — repair is a turn-path concern). + * + * - **Selection** — `sinceSeq` is an exclusive lower bound (`seq > sinceSeq`; + * omitted/`0`/non-positive/non-integer = from the start). When + * `window.beforeSeq` is given it is an exclusive upper bound + * (`seq < beforeSeq`). Together: `sinceSeq < seq < beforeSeq`. + * - **Window** — `window.limit` returns only the NEWEST `limit` chunks of the + * selection; the result STAYS ASCENDING by seq. A selection with ≤ `limit` + * chunks is returned whole (exact, not truncated). + * - **Omitted = unchanged** — `window` absent (or both its fields undefined) + * is byte-identical to the pre-windowing behavior, so existing callers that + * pass no third argument are unaffected. + * - **Garbage-in is forgiving** — a non-positive or non-integer `limit` (or + * `beforeSeq`) is treated as ABSENT (full selection); this method never + * throws on bad window input. The transport validates and 400s upstream. + * + * Seq numbering is 1-based and gap-free, so a client derives "older chunks + * exist" purely from the oldest returned `seq > 1`; there is deliberately no + * `earliestSeq`/high-water-mark API. + */ readonly loadSince: ( conversationId: string, sinceSeq?: number, + window?: { readonly beforeSeq?: number; readonly limit?: number }, ) => Promise<readonly StoredChunk[]>; readonly appendMetrics: (conversationId: string, metrics: TurnMetrics) => Promise<void>; readonly loadMetrics: (conversationId: string) => Promise<readonly TurnMetrics[]>; @@ -37,6 +61,16 @@ export interface ConversationStore { export const conversationStoreHandle = defineService<ConversationStore>("conversation-store/store"); +/** + * Coerce a window bound to a positive integer, or `undefined` (= absent) for any + * non-positive / non-integer / undefined input. Keeps `loadSince` total. + */ +function positiveInt(value: number | undefined): number | undefined { + if (value === undefined) return undefined; + if (!Number.isInteger(value) || value <= 0) return undefined; + return value; +} + interface PersistedChunkEntry { readonly chunk: Chunk; readonly role: Role; @@ -118,23 +152,32 @@ export function createConversationStore( return repaired; }, - async loadSince(conversationId, sinceSeq) { + async loadSince(conversationId, sinceSeq, window) { const prefix = chunkPrefix(conversationId); const keys = await storage.keys(prefix); const sorted = [...keys].sort(); const result: StoredChunk[] = []; const minSeq = sinceSeq ?? 0; + // Forgiving: a non-positive / non-integer bound is treated as ABSENT. + const beforeSeq = positiveInt(window?.beforeSeq); + const limit = positiveInt(window?.limit); for (const key of sorted) { const seq = parseSeq(key.split(":").pop() ?? null); if (seq <= minSeq) continue; + if (beforeSeq !== undefined && seq >= beforeSeq) continue; const value = await storage.get(key); if (value === null) continue; const entry = JSON.parse(value) as PersistedChunkEntry; result.push({ seq, role: entry.role, chunk: entry.chunk }); } + // Window: keep only the NEWEST `limit` chunks, still ascending by seq. + if (limit !== undefined && result.length > limit) { + return result.slice(result.length - limit); + } + return result; }, diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 2e9d586..84ceada 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.9.0", + "version": "0.10.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index b000147..b0f6e20 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -67,25 +67,49 @@ export interface ModelsResponse { } /** - * Response body for `GET /conversations/:id?sinceSeq=<n>` — the incremental - * read-side history endpoint a long-lived client uses to (re)hydrate a - * conversation cheaply. + * Response body for + * `GET /conversations/:id?sinceSeq=<n>&beforeSeq=<s>&limit=<k>` — the + * incremental read-side history endpoint a long-lived client uses to + * (re)hydrate a conversation cheaply. All three query params are OPTIONAL and + * combine as one SELECTION + one WINDOW: + * + * - **Selection** — `sinceSeq` (exclusive lower bound, `seq > n`; omitted/0 = + * from the start) and `beforeSeq` (exclusive upper bound, `seq < s`; omitted + * = to the end). Together: `n < seq < s`. + * - **Window** — `limit=<k>` returns only the NEWEST `k` chunks of the + * selection (the response stays ASCENDING by seq). A selection with ≤ `k` + * chunks is returned whole. `limit` omitted = the full selection — exactly + * the pre-windowing behavior, so existing clients are unchanged. + * - `limit` and `beforeSeq` must be POSITIVE integers (`sinceSeq` may be 0); + * malformed, zero, or negative values → HTTP 400 `{ error }`. + * + * Intended client flows: fresh load = `?sinceSeq=0&limit=<k>` (newest window); + * tail sync = `?sinceSeq=<cursor>` (no limit); page older history in = + * `?beforeSeq=<oldestLoadedSeq>&limit=<k>`. + * + * Seq numbering is **1-based and gap-free** (a CONTRACTUAL GUARANTEE — see + * `StoredChunk` in `@dispatch/wire`): a client can derive "older chunks exist" + * purely from `oldestLoaded.seq > 1`; there is deliberately no + * `earliestSeq`/`hasOlder` response field. * * `chunks` is the RAW, append-order, seq-ordered slice of the conversation log - * with `seq > sinceSeq` (or the whole log when `sinceSeq` is omitted/0). It is - * NOT reconciled: a dangling tool-call is returned as-is (rendered as an - * interrupted call). Reconciliation is a turn-path concern — the server repairs - * history only when it feeds a provider, never on this read path — which is what - * preserves the per-chunk `seq` cursor invariant (a synthesized repair chunk - * would have no seq). + * selected + windowed as above. It is NOT reconciled: a dangling tool-call is + * returned as-is (rendered as an interrupted call). Reconciliation is a + * turn-path concern — the server repairs history only when it feeds a provider, + * never on this read path — which is what preserves the per-chunk `seq` cursor + * invariant (a synthesized repair chunk would have no seq). * * `latestSeq` is the `seq` of the LAST chunk in this response, or — when the * slice is empty (the client is already caught up) — the requested `sinceSeq` * (0 for a full read of an empty conversation). So after applying the response a * client's new cursor is always `latestSeq`, and an empty `chunks` means - * "nothing new past your cursor". (A true server-side high-water mark - * independent of the filter is deferred until a consumer needs it — it would - * require widening the store contract.) + * "nothing new past your cursor". CAVEAT (windowed reads): `latestSeq` is a + * TAIL-sync cursor — on a `beforeSeq` backfill page (or any `limit`ed read that + * did not reach the log's true tail) it describes the returned window, NOT the + * conversation's high-water mark, so a client must not regress its sync cursor + * from a backfill response. (A true server-side high-water mark independent of + * the filter is deferred until a consumer needs it — it would require widening + * the store contract.) */ export interface ConversationHistoryResponse { readonly chunks: readonly StoredChunk[]; diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index c26a868..88de38f 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -86,10 +86,19 @@ function createFakeConversationStore( async load() { return []; }, - async loadSince(conversationId, sinceSeq) { + async loadSince(conversationId, sinceSeq, window) { const chunks = store.get(conversationId) ?? []; const minSeq = sinceSeq ?? 0; - return chunks.filter((c) => c.seq > minSeq); + const beforeSeq = window?.beforeSeq; + const limit = window?.limit; + const selected = chunks.filter( + (c) => c.seq > minSeq && (beforeSeq === undefined || c.seq < beforeSeq), + ); + // Window: keep only the NEWEST `limit`, still ascending by seq. + if (limit !== undefined && selected.length > limit) { + return selected.slice(selected.length - limit); + } + return selected; }, async appendMetrics() {}, async loadMetrics(conversationId) { @@ -710,6 +719,162 @@ describe("GET /conversations/:id", () => { const res = await app.request("/conversations/conv1?sinceSeq=-1"); expect(res.status).toBe(400); }); + + const sixChunks: StoredChunk[] = [ + { seq: 1, role: "user", chunk: { type: "text", text: "one" } }, + { seq: 2, role: "assistant", chunk: { type: "text", text: "two" } }, + { seq: 3, role: "user", chunk: { type: "text", text: "three" } }, + { seq: 4, role: "assistant", chunk: { type: "text", text: "four" } }, + { seq: 5, role: "user", chunk: { type: "text", text: "five" } }, + { seq: 6, role: "assistant", chunk: { type: "text", text: "six" } }, + ]; + + function appWithChunks(chunks: StoredChunk[]) { + const store = new Map<string, StoredChunk[]>([["conv1", chunks]]); + return createApp({ + conversationStore: createFakeConversationStore(store), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + } + + it("?limit=N returns only the newest N chunks, ascending, latestSeq = last seq", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?limit=2"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([5, 6]); + expect(body.latestSeq).toBe(6); + }); + + it("?limit=N with N >= conversation size returns the full log", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?limit=10"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4, 5, 6]); + expect(body.latestSeq).toBe(6); + }); + + it("?beforeSeq=S returns only chunks with seq < S", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?beforeSeq=3"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([1, 2]); + expect(body.latestSeq).toBe(2); + }); + + it("?beforeSeq=S&limit=N returns the newest N below S, ascending", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?beforeSeq=5&limit=2"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + // selection = seq 1..4; newest 2 = [3, 4] + expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]); + expect(body.latestSeq).toBe(4); + }); + + it("?sinceSeq=A&beforeSeq=B returns A < seq < B", async () => { + const app = appWithChunks(sixChunks); + const res = await app.request("/conversations/conv1?sinceSeq=2&beforeSeq=5"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]); + expect(body.latestSeq).toBe(4); + }); + + describe("window param validation → 400 and store not called with an invalid window", () => { + function appCapturingWindow() { + const calls: { + readonly sinceSeq: number | undefined; + readonly window: { readonly beforeSeq?: number; readonly limit?: number } | undefined; + }[] = []; + const store: ConversationStore = { + async append() {}, + async load() { + return []; + }, + async loadSince(_conversationId, sinceSeq, window) { + calls.push({ sinceSeq, window }); + return []; + }, + async appendMetrics() {}, + async loadMetrics() { + return []; + }, + async getCwd() { + return null; + }, + async setCwd() {}, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + return { app, calls }; + } + + const cases: readonly { readonly name: string; readonly query: string }[] = [ + { name: "limit=0", query: "limit=0" }, + { name: "limit=-1", query: "limit=-1" }, + { name: "limit=abc", query: "limit=abc" }, + { name: "beforeSeq=0", query: "beforeSeq=0" }, + { name: "beforeSeq=1.5", query: "beforeSeq=1.5" }, + ]; + + for (const { name, query } of cases) { + it(`${name} → 400 { error } and loadSince is never called`, async () => { + const { app, calls } = appCapturingWindow(); + const res = await app.request(`/conversations/conv1?${query}`); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(typeof body.error).toBe("string"); + expect(body.error.length).toBeGreaterThan(0); + expect(calls).toHaveLength(0); + }); + } + }); + + it("no params → byte-identical to the no-window read (regression guard)", async () => { + // Rest-param spy: record how many args the route actually passes, so we + // can prove the third (window) arg is OMITTED entirely — not merely + // forwarded as undefined — preserving the existing two-arg call shape. + const argCounts: number[] = []; + const store: ConversationStore = { + async append() {}, + async load() { + return []; + }, + loadSince(...args: Parameters<ConversationStore["loadSince"]>) { + argCounts.push(args.length); + const sinceSeq = args[1] ?? 0; + return Promise.resolve(sampleChunks.filter((c) => c.seq > sinceSeq)); + }, + async appendMetrics() {}, + async loadMetrics() { + return []; + }, + async getCwd() { + return null; + }, + async setCwd() {}, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + + const res = await app.request("/conversations/conv1"); + expect(res.status).toBe(200); + const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number }; + expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4]); + expect(body.latestSeq).toBe(4); + // Called once, with exactly two arguments (no window arg). + expect(argCounts).toEqual([2]); + }); }); describe("GET /conversations/:id/metrics", () => { diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 11d2850..ae24922 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -17,9 +17,11 @@ import { computeExpectedCacheRate, isParseError, isSinceSeqError, + isWindowParamError, parseChatBody, parseSinceSeq, parseWarmBody, + parseWindowParam, serializeEventLine, } from "./logic.js"; import { @@ -147,8 +149,43 @@ export function createApp(opts: CreateServerOptions): Hono { return c.json({ error: sinceSeqResult.error }, 400); } + // `limit` / `beforeSeq` are optional positive-integer history-window + // params. The store is deliberately forgiving (a 0/negative bound is + // treated as ABSENT), so we MUST reject malformed values here and never + // forward an invalid window. + const beforeSeqResult = parseWindowParam(c.req.query("beforeSeq"), "beforeSeq"); + if (isWindowParamError(beforeSeqResult)) { + log.warn("conversations: invalid beforeSeq", { + conversationId, + error: beforeSeqResult.error, + }); + return c.json({ error: beforeSeqResult.error }, 400); + } + const limitResult = parseWindowParam(c.req.query("limit"), "limit"); + if (isWindowParamError(limitResult)) { + log.warn("conversations: invalid limit", { + conversationId, + error: limitResult.error, + }); + return c.json({ error: limitResult.error }, 400); + } + + // Include only the fields actually provided (exactOptionalPropertyTypes), + // and omit the window argument entirely when neither was given — keeping + // the pre-windowing call shape byte-identical for existing callers. + const window: { readonly beforeSeq?: number; readonly limit?: number } | undefined = + beforeSeqResult !== undefined || limitResult !== undefined + ? { + ...(beforeSeqResult !== undefined ? { beforeSeq: beforeSeqResult } : {}), + ...(limitResult !== undefined ? { limit: limitResult } : {}), + } + : undefined; + try { - const chunks = await opts.conversationStore.loadSince(conversationId, sinceSeqResult); + const chunks = + window !== undefined + ? await opts.conversationStore.loadSince(conversationId, sinceSeqResult, window) + : await opts.conversationStore.loadSince(conversationId, sinceSeqResult); const latestSeq = chunks.length > 0 ? (chunks[chunks.length - 1]?.seq ?? sinceSeqResult) : sinceSeqResult; log.info("conversations: read", { diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts index 718aaef..1a42259 100644 --- a/packages/transport-http/src/index.ts +++ b/packages/transport-http/src/index.ts @@ -7,13 +7,16 @@ export type { ParseResult, SinceSeqResult, WarmBodyParsed, + WindowParamResult, } from "./logic.js"; export { computeCachePct, isParseError, isSinceSeqError, + isWindowParamError, parseChatBody, parseSinceSeq, + parseWindowParam, serializeEventLine, } from "./logic.js"; export type { diff --git a/packages/transport-http/src/logic.test.ts b/packages/transport-http/src/logic.test.ts index 19b47ef..9a7bda2 100644 --- a/packages/transport-http/src/logic.test.ts +++ b/packages/transport-http/src/logic.test.ts @@ -4,8 +4,10 @@ import { computeExpectedCacheRate, isParseError, isSinceSeqError, + isWindowParamError, parseChatBody, parseSinceSeq, + parseWindowParam, serializeEventLine, } from "./logic.js"; @@ -173,6 +175,58 @@ describe("parseSinceSeq", () => { }); }); +describe("parseWindowParam", () => { + it("returns undefined (absent) when undefined", () => { + expect(parseWindowParam(undefined, "limit")).toBeUndefined(); + }); + + it("returns undefined (absent) when empty string", () => { + expect(parseWindowParam("", "limit")).toBeUndefined(); + }); + + it("parses a valid positive integer", () => { + expect(parseWindowParam("1", "limit")).toBe(1); + expect(parseWindowParam("42", "beforeSeq")).toBe(42); + }); + + it("returns ParseError for zero (store would treat it as absent)", () => { + const result = parseWindowParam("0", "limit"); + expect(isWindowParamError(result)).toBe(true); + if (isWindowParamError(result)) { + expect(result.error).toContain("limit"); + expect(result.error).toContain("positive integer"); + } + }); + + it("returns ParseError for a negative integer", () => { + expect(isWindowParamError(parseWindowParam("-1", "limit"))).toBe(true); + }); + + it("returns ParseError for a non-integer", () => { + expect(isWindowParamError(parseWindowParam("1.5", "beforeSeq"))).toBe(true); + }); + + it("returns ParseError for a non-numeric string", () => { + const result = parseWindowParam("abc", "beforeSeq"); + expect(isWindowParamError(result)).toBe(true); + if (isWindowParamError(result)) { + expect(result.error).toContain("beforeSeq"); + } + }); + + it("names the param in the error message", () => { + const limit = parseWindowParam("0", "limit"); + const before = parseWindowParam("0", "beforeSeq"); + if (isWindowParamError(limit)) expect(limit.error).toContain("limit"); + if (isWindowParamError(before)) expect(before.error).toContain("beforeSeq"); + }); + + it("isWindowParamError is false for absent and for a valid number", () => { + expect(isWindowParamError(undefined)).toBe(false); + expect(isWindowParamError(5)).toBe(false); + }); +}); + describe("serializeEventLine", () => { it("serializes an event as JSON followed by newline", () => { const event: AgentEvent = { diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts index bddedf0..0008110 100644 --- a/packages/transport-http/src/logic.ts +++ b/packages/transport-http/src/logic.ts @@ -72,6 +72,33 @@ export function isSinceSeqError(result: SinceSeqResult): result is ParseError { return typeof result === "object"; } +/** + * Result of parsing an OPTIONAL positive-integer history-window query param + * (`limit` / `beforeSeq`): `undefined` = the param was absent (omit it from the + * store window), a `number` = a valid positive integer, or a {@link ParseError} + * for a malformed / zero / negative value (the route 400s on it). + */ +export type WindowParamResult = number | undefined | ParseError; + +/** + * Parse an optional `limit` / `beforeSeq` query param. Unlike `sinceSeq` these + * must be STRICTLY POSITIVE integers when present (zero is rejected, since the + * store treats a zero bound as absent and would silently return the full log). + * Absent (`undefined` / empty) is the valid "no window" case → `undefined`. + */ +export function parseWindowParam(raw: string | undefined, name: string): WindowParamResult { + if (raw === undefined || raw === "") return undefined; + const n = Number(raw); + if (!Number.isInteger(n) || n <= 0) { + return { error: `${name} must be a positive integer` }; + } + return n; +} + +export function isWindowParamError(result: WindowParamResult): result is ParseError { + return typeof result === "object" && result !== null; +} + export interface WarmBodyParsed { readonly conversationId: string; readonly model?: string; diff --git a/packages/wire/package.json b/packages/wire/package.json index f4ede19..d00772d 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.6.0", + "version": "0.6.1", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 28aab87..4fdf389 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -124,11 +124,17 @@ export interface ChatMessage { /** * A persisted chunk plus its sync metadata. The append-only conversation log - * stamps every chunk with a monotonic, gap-free, per-conversation `seq` (the - * sync cursor, assigned in append order) and records the `role` of the message - * it belongs to. This makes a flat seq-ordered stream both incrementally - * syncable ("give me chunks after seq N") and regroupable into messages by the - * client. `chunk` is the content unit — `Chunk` carries no storage/sync cursor + * stamps every chunk with a **1-based**, monotonic, gap-free, per-conversation + * `seq` (the sync cursor, assigned in append order) and records the `role` of + * the message it belongs to. This makes a flat seq-ordered stream both + * incrementally syncable ("give me chunks after seq N") and regroupable into + * messages by the client. + * + * The 1-based start is a CONTRACTUAL GUARANTEE (not an implementation detail): + * a conversation's first chunk is always `seq === 1` and numbering never skips, + * so a client holding only a windowed suffix of the log can derive "older + * chunks exist server-side" purely from `oldestLoaded.seq > 1` — no separate + * has-older flag is needed (or provided). `chunk` is the content unit — `Chunk` carries no storage/sync cursor * (`seq` lives here on the envelope, not on the chunk, since it is assigned by * the store and the provider has no use for it). A chunk MAY still carry * generation provenance assigned at production time (e.g. a tool chunk's |
