summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-12 18:37:09 +0900
committerAdam Malczewski <[email protected]>2026-06-12 18:37:09 +0900
commitdbf77ba78ff840e0ed5f6294030523fe3ab121fa (patch)
treee768aef3edd0c126212058c3d1433355594c49be
parent6689eb51b467d8e370f31495840d88661f978168 (diff)
downloaddispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.tar.gz
dispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.zip
feat(history): CR-5 windowed reads — ?limit= / ?beforeSeq= on GET /conversations/:id
Selection sinceSeq < seq < beforeSeq; newest-limit window, ascending; positive- integer validation (400, store never sees an invalid window); 1-based gap-free seq codified as the contractual has-older mechanism (no earliestSeq field). transport-contract 0.9.0->0.10.0, wire 0.6.0->0.6.1 (doc-only). conversation-store +8 tests, transport-http +20; 935 vitest + 112 bun green. Live-verified: 6/6 probe checks OK. FE courier: frontend-history-windowing-handoff.md
-rw-r--r--frontend-history-windowing-handoff.md70
-rw-r--r--packages/conversation-store/src/store.test.ts88
-rw-r--r--packages/conversation-store/src/store.ts45
-rw-r--r--packages/transport-contract/package.json2
-rw-r--r--packages/transport-contract/src/index.ts48
-rw-r--r--packages/transport-http/src/app.test.ts169
-rw-r--r--packages/transport-http/src/app.ts39
-rw-r--r--packages/transport-http/src/index.ts3
-rw-r--r--packages/transport-http/src/logic.test.ts54
-rw-r--r--packages/transport-http/src/logic.ts27
-rw-r--r--packages/wire/package.json2
-rw-r--r--packages/wire/src/index.ts16
-rw-r--r--tasks.md80
13 files changed, 614 insertions, 29 deletions
diff --git a/frontend-history-windowing-handoff.md b/frontend-history-windowing-handoff.md
new file mode 100644
index 0000000..f9244b1
--- /dev/null
+++ b/frontend-history-windowing-handoff.md
@@ -0,0 +1,70 @@
+# Backend → frontend handoff — CR-5: history windowing (`limit` / `beforeSeq`)
+
+> **From:** arch-rewrite · **To:** dispatch-web · **Courier:** the user.
+> Reply to `backend-handoff-chat-limit.md` (CR-5). 2026-06-12. SHIPPED.
+
+## What shipped
+
+`GET /conversations/:id` now takes two OPTIONAL query params on top of
+`sinceSeq` (all combinable; authoritative spec = the
+`ConversationHistoryResponse` JSDoc in `@dispatch/transport-contract`):
+
+1. **`limit=<k>`** — returns only the NEWEST `k` chunks of the selection,
+ still ASCENDING by seq. A selection with ≤ `k` chunks is returned whole
+ (your `limit=192` against a short conversation gets the normal full
+ response, exact). `limit` absent → exactly the previous behavior.
+2. **`beforeSeq=<s>`** — restricts the selection to `seq < s` (exclusive).
+ Combined semantics: `sinceSeq < seq < beforeSeq`; with `limit`: the newest
+ `k` chunks below `s`, ascending — your "Show earlier messages" page-in path.
+
+Your three flows, verbatim from your handoff, all work as written:
+
+- Fresh load: `?sinceSeq=0&limit=192`
+- Tail sync: `?sinceSeq=<maxCachedSeq>` (unchanged)
+- Page older in: `?beforeSeq=<oldestLoadedSeq>&limit=<ceil(L/4)>`
+
+## Ask #3 — our pick: the seq invariant, no new field (your "cheapest option")
+
+We CONFIRM IN WRITING, as a contractual guarantee (now codified in the
+`StoredChunk` doc in `@dispatch/wire` and referenced from the history-response
+doc): **per-conversation `seq` is 1-based, monotonic, and gap-free** — a
+conversation's first chunk is always `seq === 1` and numbering never skips.
+
+So derive it exactly as you proposed: `hasOlder = oldestLoaded.seq > 1`.
+There is deliberately NO `earliestSeq`/`hasOlder` response field.
+
+## Validation (new, only for the new params)
+
+`limit` and `beforeSeq` must be **positive integers** when present
+(`sinceSeq` keeps its existing semantics — `0` = from the start). Malformed,
+zero, or negative values → **HTTP 400 `{ error }`** (the error message names
+the offending param). Don't send `beforeSeq=0` — and you never need to:
+`oldestLoaded.seq === 1` already means there is nothing older.
+
+## `latestSeq` caveat (important for your cursor logic)
+
+`latestSeq` semantics are UNCHANGED (seq of the last returned chunk; the
+requested `sinceSeq` when the slice is empty) — but on a **windowed** read it
+describes the returned window, NOT the conversation's high-water mark:
+
+- A fresh `?sinceSeq=0&limit=192` load DID reach the true tail → `latestSeq`
+ is a valid sync cursor.
+- A `?beforeSeq=...` backfill page did NOT → do not regress your tail cursor
+ from a backfill response. (Your seq-keyed dedup cache makes this natural —
+ just don't feed backfill `latestSeq` into the tail cursor.)
+
+## Versions (re-pin + re-mirror)
+
+- `@dispatch/transport-contract` **`0.9.0 → 0.10.0`** — the param/validation/
+ caveat docs above (response TYPE shape unchanged; no new fields).
+- `@dispatch/wire` **`0.6.0 → 0.6.1`** — doc-only: the 1-based seq guarantee
+ codified on `StoredChunk`.
+
+## Test coverage (backend, for your confidence)
+
+- conversation-store: +8 windowing tests (newest-N ascending, bounds,
+ combined bounds, page-in, empty selection, garbage-in, no-window regression
+ guard; the "gap-free 1-based seq" test now backs a written contract).
+- transport-http: +20 route/param tests incl. all five 400 validation cases
+ and a no-params byte-identical regression guard.
+- Full suite: typecheck clean · biome clean · 935 vitest + 112 bun tests green.
diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts
index 49b0e58..5b07eca 100644
--- a/packages/conversation-store/src/store.test.ts
+++ b/packages/conversation-store/src/store.test.ts
@@ -499,6 +499,94 @@ describe("ConversationStore", () => {
});
});
+describe("ConversationStore loadSince windowing", () => {
+ let storage: StorageNamespace;
+
+ beforeEach(() => {
+ storage = createMemoryStorage();
+ });
+
+ // Append `count` single-chunk user messages so seq runs 1..count, gap-free.
+ async function seed(store: ReturnType<typeof createConversationStore>, count: number) {
+ const messages: ChatMessage[] = [];
+ for (let i = 1; i <= count; i++) {
+ messages.push({ role: "user", chunks: [{ type: "text", text: `m${i}` }] });
+ }
+ await store.append("conv1", messages);
+ }
+
+ it("limit returns the newest N of the selection, ascending by seq", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 5);
+ const chunks = await store.loadSince("conv1", 0, { limit: 2 });
+ expect(chunks.map((c) => c.seq)).toEqual([4, 5]);
+ });
+
+ it("limit >= selection size returns the whole selection (exact, not truncated)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 3);
+ const exactlyAll = await store.loadSince("conv1", 0, { limit: 3 });
+ expect(exactlyAll.map((c) => c.seq)).toEqual([1, 2, 3]);
+ const overAll = await store.loadSince("conv1", 0, { limit: 99 });
+ expect(overAll.map((c) => c.seq)).toEqual([1, 2, 3]);
+ });
+
+ it("beforeSeq bounds the selection exclusively (seq < beforeSeq)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 5);
+ const chunks = await store.loadSince("conv1", 0, { beforeSeq: 3 });
+ expect(chunks.map((c) => c.seq)).toEqual([1, 2]);
+ });
+
+ it("sinceSeq + beforeSeq combine to sinceSeq < seq < beforeSeq", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 6);
+ const chunks = await store.loadSince("conv1", 2, { beforeSeq: 5 });
+ expect(chunks.map((c) => c.seq)).toEqual([3, 4]);
+ });
+
+ it("beforeSeq + limit: newest N below the bound, ascending (page older history in)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 8);
+ const chunks = await store.loadSince("conv1", 0, { beforeSeq: 6, limit: 2 });
+ expect(chunks.map((c) => c.seq)).toEqual([4, 5]);
+ });
+
+ it("empty selection returns [] (beforeSeq=1, and sinceSeq past the tail)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 4);
+ expect(await store.loadSince("conv1", 0, { beforeSeq: 1 })).toEqual([]);
+ expect(await store.loadSince("conv1", 4, { limit: 3 })).toEqual([]);
+ });
+
+ it("non-positive / non-integer limit and beforeSeq are treated as absent", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 4);
+ const all = [1, 2, 3, 4];
+ expect((await store.loadSince("conv1", 0, { limit: 0 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { limit: -2 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { limit: 1.5 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { beforeSeq: 0 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { beforeSeq: -3 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { beforeSeq: 2.7 })).map((c) => c.seq)).toEqual(all);
+ });
+
+ it("window omitted is identical to today's behavior (regression guard)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 5);
+ const base = await store.loadSince("conv1", 1);
+ const withEmptyWindow = await store.loadSince("conv1", 1, {});
+ // A caller whose window fields happen to be undefined (e.g. unset query
+ // params) — modelled as an optional-field record, not explicit `undefined`
+ // literals (which exactOptionalPropertyTypes rejects on the contract).
+ const undefinedFieldsWindow: { beforeSeq?: number; limit?: number } = {};
+ const withUndefinedFields = await store.loadSince("conv1", 1, undefinedFieldsWindow);
+ expect(base.map((c) => c.seq)).toEqual([2, 3, 4, 5]);
+ expect(withEmptyWindow).toEqual(base);
+ expect(withUndefinedFields).toEqual(base);
+ });
+});
+
describe("ConversationStore metrics", () => {
let storage: StorageNamespace;
diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts
index 0948f64..0a42917 100644
--- a/packages/conversation-store/src/store.ts
+++ b/packages/conversation-store/src/store.ts
@@ -23,9 +23,33 @@ import { reconcileWithReport } from "./reconcile.js";
export interface ConversationStore {
readonly append: (conversationId: string, messages: readonly ChatMessage[]) => Promise<void>;
readonly load: (conversationId: string) => Promise<ChatMessage[]>;
+ /**
+ * Read the conversation's persisted chunks as a SELECTION + optional WINDOW,
+ * ascending by seq. The raw append-order log; NOT reconciled (a dangling
+ * tool-call is returned as-is — repair is a turn-path concern).
+ *
+ * - **Selection** — `sinceSeq` is an exclusive lower bound (`seq > sinceSeq`;
+ * omitted/`0`/non-positive/non-integer = from the start). When
+ * `window.beforeSeq` is given it is an exclusive upper bound
+ * (`seq < beforeSeq`). Together: `sinceSeq < seq < beforeSeq`.
+ * - **Window** — `window.limit` returns only the NEWEST `limit` chunks of the
+ * selection; the result STAYS ASCENDING by seq. A selection with ≤ `limit`
+ * chunks is returned whole (exact, not truncated).
+ * - **Omitted = unchanged** — `window` absent (or both its fields undefined)
+ * is byte-identical to the pre-windowing behavior, so existing callers that
+ * pass no third argument are unaffected.
+ * - **Garbage-in is forgiving** — a non-positive or non-integer `limit` (or
+ * `beforeSeq`) is treated as ABSENT (full selection); this method never
+ * throws on bad window input. The transport validates and 400s upstream.
+ *
+ * Seq numbering is 1-based and gap-free, so a client derives "older chunks
+ * exist" purely from the oldest returned `seq > 1`; there is deliberately no
+ * `earliestSeq`/high-water-mark API.
+ */
readonly loadSince: (
conversationId: string,
sinceSeq?: number,
+ window?: { readonly beforeSeq?: number; readonly limit?: number },
) => Promise<readonly StoredChunk[]>;
readonly appendMetrics: (conversationId: string, metrics: TurnMetrics) => Promise<void>;
readonly loadMetrics: (conversationId: string) => Promise<readonly TurnMetrics[]>;
@@ -37,6 +61,16 @@ export interface ConversationStore {
export const conversationStoreHandle = defineService<ConversationStore>("conversation-store/store");
+/**
+ * Coerce a window bound to a positive integer, or `undefined` (= absent) for any
+ * non-positive / non-integer / undefined input. Keeps `loadSince` total.
+ */
+function positiveInt(value: number | undefined): number | undefined {
+ if (value === undefined) return undefined;
+ if (!Number.isInteger(value) || value <= 0) return undefined;
+ return value;
+}
+
interface PersistedChunkEntry {
readonly chunk: Chunk;
readonly role: Role;
@@ -118,23 +152,32 @@ export function createConversationStore(
return repaired;
},
- async loadSince(conversationId, sinceSeq) {
+ async loadSince(conversationId, sinceSeq, window) {
const prefix = chunkPrefix(conversationId);
const keys = await storage.keys(prefix);
const sorted = [...keys].sort();
const result: StoredChunk[] = [];
const minSeq = sinceSeq ?? 0;
+ // Forgiving: a non-positive / non-integer bound is treated as ABSENT.
+ const beforeSeq = positiveInt(window?.beforeSeq);
+ const limit = positiveInt(window?.limit);
for (const key of sorted) {
const seq = parseSeq(key.split(":").pop() ?? null);
if (seq <= minSeq) continue;
+ if (beforeSeq !== undefined && seq >= beforeSeq) continue;
const value = await storage.get(key);
if (value === null) continue;
const entry = JSON.parse(value) as PersistedChunkEntry;
result.push({ seq, role: entry.role, chunk: entry.chunk });
}
+ // Window: keep only the NEWEST `limit` chunks, still ascending by seq.
+ if (limit !== undefined && result.length > limit) {
+ return result.slice(result.length - limit);
+ }
+
return result;
},
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index 2e9d586..84ceada 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/transport-contract",
- "version": "0.9.0",
+ "version": "0.10.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index b000147..b0f6e20 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -67,25 +67,49 @@ export interface ModelsResponse {
}
/**
- * Response body for `GET /conversations/:id?sinceSeq=<n>` — the incremental
- * read-side history endpoint a long-lived client uses to (re)hydrate a
- * conversation cheaply.
+ * Response body for
+ * `GET /conversations/:id?sinceSeq=<n>&beforeSeq=<s>&limit=<k>` — the
+ * incremental read-side history endpoint a long-lived client uses to
+ * (re)hydrate a conversation cheaply. All three query params are OPTIONAL and
+ * combine as one SELECTION + one WINDOW:
+ *
+ * - **Selection** — `sinceSeq` (exclusive lower bound, `seq > n`; omitted/0 =
+ * from the start) and `beforeSeq` (exclusive upper bound, `seq < s`; omitted
+ * = to the end). Together: `n < seq < s`.
+ * - **Window** — `limit=<k>` returns only the NEWEST `k` chunks of the
+ * selection (the response stays ASCENDING by seq). A selection with ≤ `k`
+ * chunks is returned whole. `limit` omitted = the full selection — exactly
+ * the pre-windowing behavior, so existing clients are unchanged.
+ * - `limit` and `beforeSeq` must be POSITIVE integers (`sinceSeq` may be 0);
+ * malformed, zero, or negative values → HTTP 400 `{ error }`.
+ *
+ * Intended client flows: fresh load = `?sinceSeq=0&limit=<k>` (newest window);
+ * tail sync = `?sinceSeq=<cursor>` (no limit); page older history in =
+ * `?beforeSeq=<oldestLoadedSeq>&limit=<k>`.
+ *
+ * Seq numbering is **1-based and gap-free** (a CONTRACTUAL GUARANTEE — see
+ * `StoredChunk` in `@dispatch/wire`): a client can derive "older chunks exist"
+ * purely from `oldestLoaded.seq > 1`; there is deliberately no
+ * `earliestSeq`/`hasOlder` response field.
*
* `chunks` is the RAW, append-order, seq-ordered slice of the conversation log
- * with `seq > sinceSeq` (or the whole log when `sinceSeq` is omitted/0). It is
- * NOT reconciled: a dangling tool-call is returned as-is (rendered as an
- * interrupted call). Reconciliation is a turn-path concern — the server repairs
- * history only when it feeds a provider, never on this read path — which is what
- * preserves the per-chunk `seq` cursor invariant (a synthesized repair chunk
- * would have no seq).
+ * selected + windowed as above. It is NOT reconciled: a dangling tool-call is
+ * returned as-is (rendered as an interrupted call). Reconciliation is a
+ * turn-path concern — the server repairs history only when it feeds a provider,
+ * never on this read path — which is what preserves the per-chunk `seq` cursor
+ * invariant (a synthesized repair chunk would have no seq).
*
* `latestSeq` is the `seq` of the LAST chunk in this response, or — when the
* slice is empty (the client is already caught up) — the requested `sinceSeq`
* (0 for a full read of an empty conversation). So after applying the response a
* client's new cursor is always `latestSeq`, and an empty `chunks` means
- * "nothing new past your cursor". (A true server-side high-water mark
- * independent of the filter is deferred until a consumer needs it — it would
- * require widening the store contract.)
+ * "nothing new past your cursor". CAVEAT (windowed reads): `latestSeq` is a
+ * TAIL-sync cursor — on a `beforeSeq` backfill page (or any `limit`ed read that
+ * did not reach the log's true tail) it describes the returned window, NOT the
+ * conversation's high-water mark, so a client must not regress its sync cursor
+ * from a backfill response. (A true server-side high-water mark independent of
+ * the filter is deferred until a consumer needs it — it would require widening
+ * the store contract.)
*/
export interface ConversationHistoryResponse {
readonly chunks: readonly StoredChunk[];
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts
index c26a868..88de38f 100644
--- a/packages/transport-http/src/app.test.ts
+++ b/packages/transport-http/src/app.test.ts
@@ -86,10 +86,19 @@ function createFakeConversationStore(
async load() {
return [];
},
- async loadSince(conversationId, sinceSeq) {
+ async loadSince(conversationId, sinceSeq, window) {
const chunks = store.get(conversationId) ?? [];
const minSeq = sinceSeq ?? 0;
- return chunks.filter((c) => c.seq > minSeq);
+ const beforeSeq = window?.beforeSeq;
+ const limit = window?.limit;
+ const selected = chunks.filter(
+ (c) => c.seq > minSeq && (beforeSeq === undefined || c.seq < beforeSeq),
+ );
+ // Window: keep only the NEWEST `limit`, still ascending by seq.
+ if (limit !== undefined && selected.length > limit) {
+ return selected.slice(selected.length - limit);
+ }
+ return selected;
},
async appendMetrics() {},
async loadMetrics(conversationId) {
@@ -710,6 +719,162 @@ describe("GET /conversations/:id", () => {
const res = await app.request("/conversations/conv1?sinceSeq=-1");
expect(res.status).toBe(400);
});
+
+ const sixChunks: StoredChunk[] = [
+ { seq: 1, role: "user", chunk: { type: "text", text: "one" } },
+ { seq: 2, role: "assistant", chunk: { type: "text", text: "two" } },
+ { seq: 3, role: "user", chunk: { type: "text", text: "three" } },
+ { seq: 4, role: "assistant", chunk: { type: "text", text: "four" } },
+ { seq: 5, role: "user", chunk: { type: "text", text: "five" } },
+ { seq: 6, role: "assistant", chunk: { type: "text", text: "six" } },
+ ];
+
+ function appWithChunks(chunks: StoredChunk[]) {
+ const store = new Map<string, StoredChunk[]>([["conv1", chunks]]);
+ return createApp({
+ conversationStore: createFakeConversationStore(store),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+ }
+
+ it("?limit=N returns only the newest N chunks, ascending, latestSeq = last seq", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?limit=2");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([5, 6]);
+ expect(body.latestSeq).toBe(6);
+ });
+
+ it("?limit=N with N >= conversation size returns the full log", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?limit=10");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4, 5, 6]);
+ expect(body.latestSeq).toBe(6);
+ });
+
+ it("?beforeSeq=S returns only chunks with seq < S", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?beforeSeq=3");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([1, 2]);
+ expect(body.latestSeq).toBe(2);
+ });
+
+ it("?beforeSeq=S&limit=N returns the newest N below S, ascending", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?beforeSeq=5&limit=2");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ // selection = seq 1..4; newest 2 = [3, 4]
+ expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]);
+ expect(body.latestSeq).toBe(4);
+ });
+
+ it("?sinceSeq=A&beforeSeq=B returns A < seq < B", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?sinceSeq=2&beforeSeq=5");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]);
+ expect(body.latestSeq).toBe(4);
+ });
+
+ describe("window param validation → 400 and store not called with an invalid window", () => {
+ function appCapturingWindow() {
+ const calls: {
+ readonly sinceSeq: number | undefined;
+ readonly window: { readonly beforeSeq?: number; readonly limit?: number } | undefined;
+ }[] = [];
+ const store: ConversationStore = {
+ async append() {},
+ async load() {
+ return [];
+ },
+ async loadSince(_conversationId, sinceSeq, window) {
+ calls.push({ sinceSeq, window });
+ return [];
+ },
+ async appendMetrics() {},
+ async loadMetrics() {
+ return [];
+ },
+ async getCwd() {
+ return null;
+ },
+ async setCwd() {},
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+ return { app, calls };
+ }
+
+ const cases: readonly { readonly name: string; readonly query: string }[] = [
+ { name: "limit=0", query: "limit=0" },
+ { name: "limit=-1", query: "limit=-1" },
+ { name: "limit=abc", query: "limit=abc" },
+ { name: "beforeSeq=0", query: "beforeSeq=0" },
+ { name: "beforeSeq=1.5", query: "beforeSeq=1.5" },
+ ];
+
+ for (const { name, query } of cases) {
+ it(`${name} → 400 { error } and loadSince is never called`, async () => {
+ const { app, calls } = appCapturingWindow();
+ const res = await app.request(`/conversations/conv1?${query}`);
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(typeof body.error).toBe("string");
+ expect(body.error.length).toBeGreaterThan(0);
+ expect(calls).toHaveLength(0);
+ });
+ }
+ });
+
+ it("no params → byte-identical to the no-window read (regression guard)", async () => {
+ // Rest-param spy: record how many args the route actually passes, so we
+ // can prove the third (window) arg is OMITTED entirely — not merely
+ // forwarded as undefined — preserving the existing two-arg call shape.
+ const argCounts: number[] = [];
+ const store: ConversationStore = {
+ async append() {},
+ async load() {
+ return [];
+ },
+ loadSince(...args: Parameters<ConversationStore["loadSince"]>) {
+ argCounts.push(args.length);
+ const sinceSeq = args[1] ?? 0;
+ return Promise.resolve(sampleChunks.filter((c) => c.seq > sinceSeq));
+ },
+ async appendMetrics() {},
+ async loadMetrics() {
+ return [];
+ },
+ async getCwd() {
+ return null;
+ },
+ async setCwd() {},
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+
+ const res = await app.request("/conversations/conv1");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4]);
+ expect(body.latestSeq).toBe(4);
+ // Called once, with exactly two arguments (no window arg).
+ expect(argCounts).toEqual([2]);
+ });
});
describe("GET /conversations/:id/metrics", () => {
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index 11d2850..ae24922 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -17,9 +17,11 @@ import {
computeExpectedCacheRate,
isParseError,
isSinceSeqError,
+ isWindowParamError,
parseChatBody,
parseSinceSeq,
parseWarmBody,
+ parseWindowParam,
serializeEventLine,
} from "./logic.js";
import {
@@ -147,8 +149,43 @@ export function createApp(opts: CreateServerOptions): Hono {
return c.json({ error: sinceSeqResult.error }, 400);
}
+ // `limit` / `beforeSeq` are optional positive-integer history-window
+ // params. The store is deliberately forgiving (a 0/negative bound is
+ // treated as ABSENT), so we MUST reject malformed values here and never
+ // forward an invalid window.
+ const beforeSeqResult = parseWindowParam(c.req.query("beforeSeq"), "beforeSeq");
+ if (isWindowParamError(beforeSeqResult)) {
+ log.warn("conversations: invalid beforeSeq", {
+ conversationId,
+ error: beforeSeqResult.error,
+ });
+ return c.json({ error: beforeSeqResult.error }, 400);
+ }
+ const limitResult = parseWindowParam(c.req.query("limit"), "limit");
+ if (isWindowParamError(limitResult)) {
+ log.warn("conversations: invalid limit", {
+ conversationId,
+ error: limitResult.error,
+ });
+ return c.json({ error: limitResult.error }, 400);
+ }
+
+ // Include only the fields actually provided (exactOptionalPropertyTypes),
+ // and omit the window argument entirely when neither was given — keeping
+ // the pre-windowing call shape byte-identical for existing callers.
+ const window: { readonly beforeSeq?: number; readonly limit?: number } | undefined =
+ beforeSeqResult !== undefined || limitResult !== undefined
+ ? {
+ ...(beforeSeqResult !== undefined ? { beforeSeq: beforeSeqResult } : {}),
+ ...(limitResult !== undefined ? { limit: limitResult } : {}),
+ }
+ : undefined;
+
try {
- const chunks = await opts.conversationStore.loadSince(conversationId, sinceSeqResult);
+ const chunks =
+ window !== undefined
+ ? await opts.conversationStore.loadSince(conversationId, sinceSeqResult, window)
+ : await opts.conversationStore.loadSince(conversationId, sinceSeqResult);
const latestSeq =
chunks.length > 0 ? (chunks[chunks.length - 1]?.seq ?? sinceSeqResult) : sinceSeqResult;
log.info("conversations: read", {
diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts
index 718aaef..1a42259 100644
--- a/packages/transport-http/src/index.ts
+++ b/packages/transport-http/src/index.ts
@@ -7,13 +7,16 @@ export type {
ParseResult,
SinceSeqResult,
WarmBodyParsed,
+ WindowParamResult,
} from "./logic.js";
export {
computeCachePct,
isParseError,
isSinceSeqError,
+ isWindowParamError,
parseChatBody,
parseSinceSeq,
+ parseWindowParam,
serializeEventLine,
} from "./logic.js";
export type {
diff --git a/packages/transport-http/src/logic.test.ts b/packages/transport-http/src/logic.test.ts
index 19b47ef..9a7bda2 100644
--- a/packages/transport-http/src/logic.test.ts
+++ b/packages/transport-http/src/logic.test.ts
@@ -4,8 +4,10 @@ import {
computeExpectedCacheRate,
isParseError,
isSinceSeqError,
+ isWindowParamError,
parseChatBody,
parseSinceSeq,
+ parseWindowParam,
serializeEventLine,
} from "./logic.js";
@@ -173,6 +175,58 @@ describe("parseSinceSeq", () => {
});
});
+describe("parseWindowParam", () => {
+ it("returns undefined (absent) when undefined", () => {
+ expect(parseWindowParam(undefined, "limit")).toBeUndefined();
+ });
+
+ it("returns undefined (absent) when empty string", () => {
+ expect(parseWindowParam("", "limit")).toBeUndefined();
+ });
+
+ it("parses a valid positive integer", () => {
+ expect(parseWindowParam("1", "limit")).toBe(1);
+ expect(parseWindowParam("42", "beforeSeq")).toBe(42);
+ });
+
+ it("returns ParseError for zero (store would treat it as absent)", () => {
+ const result = parseWindowParam("0", "limit");
+ expect(isWindowParamError(result)).toBe(true);
+ if (isWindowParamError(result)) {
+ expect(result.error).toContain("limit");
+ expect(result.error).toContain("positive integer");
+ }
+ });
+
+ it("returns ParseError for a negative integer", () => {
+ expect(isWindowParamError(parseWindowParam("-1", "limit"))).toBe(true);
+ });
+
+ it("returns ParseError for a non-integer", () => {
+ expect(isWindowParamError(parseWindowParam("1.5", "beforeSeq"))).toBe(true);
+ });
+
+ it("returns ParseError for a non-numeric string", () => {
+ const result = parseWindowParam("abc", "beforeSeq");
+ expect(isWindowParamError(result)).toBe(true);
+ if (isWindowParamError(result)) {
+ expect(result.error).toContain("beforeSeq");
+ }
+ });
+
+ it("names the param in the error message", () => {
+ const limit = parseWindowParam("0", "limit");
+ const before = parseWindowParam("0", "beforeSeq");
+ if (isWindowParamError(limit)) expect(limit.error).toContain("limit");
+ if (isWindowParamError(before)) expect(before.error).toContain("beforeSeq");
+ });
+
+ it("isWindowParamError is false for absent and for a valid number", () => {
+ expect(isWindowParamError(undefined)).toBe(false);
+ expect(isWindowParamError(5)).toBe(false);
+ });
+});
+
describe("serializeEventLine", () => {
it("serializes an event as JSON followed by newline", () => {
const event: AgentEvent = {
diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts
index bddedf0..0008110 100644
--- a/packages/transport-http/src/logic.ts
+++ b/packages/transport-http/src/logic.ts
@@ -72,6 +72,33 @@ export function isSinceSeqError(result: SinceSeqResult): result is ParseError {
return typeof result === "object";
}
+/**
+ * Result of parsing an OPTIONAL positive-integer history-window query param
+ * (`limit` / `beforeSeq`): `undefined` = the param was absent (omit it from the
+ * store window), a `number` = a valid positive integer, or a {@link ParseError}
+ * for a malformed / zero / negative value (the route 400s on it).
+ */
+export type WindowParamResult = number | undefined | ParseError;
+
+/**
+ * Parse an optional `limit` / `beforeSeq` query param. Unlike `sinceSeq` these
+ * must be STRICTLY POSITIVE integers when present (zero is rejected, since the
+ * store treats a zero bound as absent and would silently return the full log).
+ * Absent (`undefined` / empty) is the valid "no window" case → `undefined`.
+ */
+export function parseWindowParam(raw: string | undefined, name: string): WindowParamResult {
+ if (raw === undefined || raw === "") return undefined;
+ const n = Number(raw);
+ if (!Number.isInteger(n) || n <= 0) {
+ return { error: `${name} must be a positive integer` };
+ }
+ return n;
+}
+
+export function isWindowParamError(result: WindowParamResult): result is ParseError {
+ return typeof result === "object" && result !== null;
+}
+
export interface WarmBodyParsed {
readonly conversationId: string;
readonly model?: string;
diff --git a/packages/wire/package.json b/packages/wire/package.json
index f4ede19..d00772d 100644
--- a/packages/wire/package.json
+++ b/packages/wire/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/wire",
- "version": "0.6.0",
+ "version": "0.6.1",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 28aab87..4fdf389 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -124,11 +124,17 @@ export interface ChatMessage {
/**
* A persisted chunk plus its sync metadata. The append-only conversation log
- * stamps every chunk with a monotonic, gap-free, per-conversation `seq` (the
- * sync cursor, assigned in append order) and records the `role` of the message
- * it belongs to. This makes a flat seq-ordered stream both incrementally
- * syncable ("give me chunks after seq N") and regroupable into messages by the
- * client. `chunk` is the content unit — `Chunk` carries no storage/sync cursor
+ * stamps every chunk with a **1-based**, monotonic, gap-free, per-conversation
+ * `seq` (the sync cursor, assigned in append order) and records the `role` of
+ * the message it belongs to. This makes a flat seq-ordered stream both
+ * incrementally syncable ("give me chunks after seq N") and regroupable into
+ * messages by the client.
+ *
+ * The 1-based start is a CONTRACTUAL GUARANTEE (not an implementation detail):
+ * a conversation's first chunk is always `seq === 1` and numbering never skips,
+ * so a client holding only a windowed suffix of the log can derive "older
+ * chunks exist server-side" purely from `oldestLoaded.seq > 1` — no separate
+ * has-older flag is needed (or provided). `chunk` is the content unit — `Chunk` carries no storage/sync cursor
* (`seq` lives here on the envelope, not on the chunk, since it is assigned by
* the store and the provider has no use for it). A chunk MAY still carry
* generation provenance assigned at production time (e.g. a tool chunk's
diff --git a/tasks.md b/tasks.md
index f77577a..205d9c9 100644
--- a/tasks.md
+++ b/tasks.md
@@ -26,8 +26,11 @@ extension through the host):
## How to run
```bash
-KEY1=$(grep DISPATCH_API_KEY_OPENCODE1 .env | cut -d= -f2)
-PORT=4567 DISPATCH_API_KEY="$KEY1" bun packages/host-bin/src/main.ts # boots app + collector
+# .env auto-loads DISPATCH_API_KEY (do NOT re-export) and pins BACKEND_PORT (beats PORT).
+# Private probe instance: override the port + ISOLATE data paths (ORCHESTRATOR §8):
+BACKEND_PORT=4567 SURFACE_WS_PORT=4569 DISPATCH_DB=/tmp/opencode/probe/dispatch.db \
+ DISPATCH_TRACE_DB=/tmp/opencode/probe/traces.db DISPATCH_JOURNAL=/tmp/opencode/probe/app.ndjson \
+ bun packages/host-bin/src/main.ts # boots app + collector
curl -s -X POST localhost:4567/chat -H 'content-type: application/json' \
-d '{"conversationId":"c1","message":"Say hello in 3 words."}' # field = conversationId
```
@@ -323,8 +326,35 @@ FE courier in: `../dispatch-web/backend-handoff-cache-warming.md` (+ CR-1/CR-2 f
2 automatic warms @5s each pushing future `nextWarmAt`, mid-turn close → `abortedTurn:true` +
`done.reason:"aborted"` + warming disabled, catalog scopes + table field present, echo present.
+## History windowing — FE CR-5 (DONE)
+FE courier in: `../dispatch-web/backend-handoff-chat-limit.md` (+ living `backend-handoff.md` §2
+CR-5). Courier out: `frontend-history-windowing-handoff.md`. User-gated call: ask #3 shipped as
+the INVARIANT option (no new field) — seq is contractually **1-based, monotonic, gap-free**; FE
+derives `hasOlder` from `chunks[0].seq > 1`.
+- **Wave 0 (orchestrator, contracts):** `limit`/`beforeSeq` query-param semantics + validation +
+ `latestSeq` windowed-read caveat documented on `ConversationHistoryResponse`
+ (`@dispatch/transport-contract` `0.9.0→0.10.0`); 1-based seq guarantee codified on
+ `StoredChunk` (`@dispatch/wire` `0.6.0→0.6.1`, doc-only).
+- **Wave 1 — `conversation-store`:** additive `loadSince(id, sinceSeq?, window?: { beforeSeq?,
+ limit? })` — selection `sinceSeq < seq < beforeSeq`, newest-`limit` window, result stays
+ ascending; garbage-in treated as absent (transport validates upstream). +8 tests.
+- **Wave 2 — `transport-http`:** parses + validates the params (positive integers; malformed/
+ zero/negative → 400 `{ error }`, store never called with an invalid window); two-arg call
+ shape preserved when no params (regression-guarded). +20 tests.
+- 935 vitest + 112 bun tests, typecheck + biome clean. **LIVE-VERIFIED** (isolated boot, real
+ flash turns): firstSeq=1; `limit=2`→`[5,6]` ascending w/ correct `latestSeq`; `limit=9999`→
+ full log; `beforeSeq=3`→`[1,2]`; `beforeSeq=3&limit=1`→`[2]`; `limit=0`/`beforeSeq=0`/
+ `limit=abc`→400×3. `RESULT: OK` ×6.
+- **Scar tissue (process):** (1) probing with a PRIVATE boot was overkill — the windowing checks
+ are read-only GETs and the dev stack was running; prefer probing `bin/up`/`up2` or asking the
+ user (ORCHESTRATOR §8 updated). (2) The §8 boot recipe was stale (`DISPATCH_API_KEY_OPENCODE1`
+ doesn't exist; an empty re-export OVERRIDES `.env` → "No providers registered"; `.env`'s
+ `BACKEND_PORT` beats `PORT`; un-isolated data paths spawn a duplicate collector on the dev
+ DB) — recipe fixed in §8 + above. (3) Violated the bracket trick once (`pkill -f 'cr5-data'`
+ self-matched → killed parent shell, timeout-with-no-output); the existing §8 rule stands.
+
## Open items
-- **Context window LIMIT (next, sibling of context size):** expose the selected model's max
+- **Context window LIMIT (deferred, sibling of context size):** expose the selected model's max
context-window token limit so the FE can render `contextSize / limit` (e.g. `1286 / 200000`).
Source = the provider/model catalog (`ModelInfo`); likely a field on the models response
and/or stamped alongside `contextSize`. "context size" = current usage (DONE); "context
@@ -341,9 +371,47 @@ FE courier in: `../dispatch-web/backend-handoff-cache-warming.md` (+ CR-1/CR-2 f
chain).
## Roadmap
-1. **CLI** — done.
-2. **Web frontend** (in progress, SEPARATE repo `../dispatch-web`; Svelte +
+1. **Web frontend** (in progress, SEPARATE repo `../dispatch-web`; Svelte +
DaisyUI, same methodology). Slice 2 = browser chat MVP consuming the
wire/transport-contract + metrics. Cross-repo contract changes are couriered
via the user (ORCHESTRATOR §7); `lsp references` does not span repos.
-3. **dedup / storage growth** — after the frontend (see Open items).
+2. **CLI → open-tab handoff (cross-client messaging):** an AI given a short
+ conversation identifier (4+ chars) in chat can use the CLI to send a message
+ INTO that conversation, and an FE tab watching it sees the message + reply
+ live. To the receiving AI the message appears as a REGULAR user message
+ (no special framing). The broadcast plumbing already exists and is
+ live-verified (detached turns + `chat.subscribe` + CR-3 `user-message`
+ event); the gaps are (a) a conversation LIST/discovery endpoint (none
+ exists), (b) short-id / prefix resolution of `conversationId` in the CLI,
+ (c) end-to-end verification of the handoff workflow. CLI semantics:
+ - **read last message** (new): BLOCKING — waits until the in-flight turn
+ settles, then returns ONLY the AI's last message of that turn (not the
+ whole conversation).
+ - **send, no `--queue` flag (default):** BLOCKING — sends, waits for the
+ turn to settle, returns the AI's last message (same shape as the read).
+ - **send with `--queue`:** enqueues the message into the conversation's
+ message queue (roadmap item 3) and exits immediately.
+3. **Message queue + steering injection (backend core; prerequisite for the
+ `--queue` flag in item 2):** a per-conversation queue a client (FE or CLI)
+ can push a message onto while a turn is GENERATING. Delivery semantics:
+ - On the turn's next TOOL CALL, queued messages are injected as "steering"
+ messages returned alongside the tool result (the model sees them
+ mid-turn and can adjust course).
+ - If the turn ends before any tool call fires, the queued messages are
+ COMBINED and sent as a fresh user message starting a NEW turn.
+ - FE queueing UX (queue while generating) couriered to `../dispatch-web`.
+ Touches the kernel turn loop (injection at the tool-result boundary) — a
+ contract-first design pass needed before summoning.
+4. **CLI flag to open/activate an FE tab:** optional CLI flag (new or existing
+ conversation) that makes an already-open frontend open the conversation as a
+ tab and mark it active. Does NOT exist today — no backend→FE "open/focus
+ conversation" push (only the inverse, `POST /conversations/:id/close`).
+ Needs a new broadcast WS op + endpoint/flag here, and FE handling couriered
+ to `../dispatch-web`.
+5. **`todo` tool** — a per-conversation task-list tool the model maintains
+ (like opencode's todowrite/todoread), as a standard tool extension; likely a
+ surface so the FE can render the live list.
+6. **`web_search` tool** — a web search tool (like old dispatch's;
+ reference-only source at `../dispatch-source`), as a standard tool extension.
+
+(Done and dropped from the list: CLI; dedup / storage growth.)