summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-12 18:37:09 +0900
committerAdam Malczewski <[email protected]>2026-06-12 18:37:09 +0900
commitdbf77ba78ff840e0ed5f6294030523fe3ab121fa (patch)
treee768aef3edd0c126212058c3d1433355594c49be /packages
parent6689eb51b467d8e370f31495840d88661f978168 (diff)
downloaddispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.tar.gz
dispatch-dbf77ba78ff840e0ed5f6294030523fe3ab121fa.zip
feat(history): CR-5 windowed reads — ?limit= / ?beforeSeq= on GET /conversations/:id
Selection sinceSeq < seq < beforeSeq; newest-limit window, ascending; positive- integer validation (400, store never sees an invalid window); 1-based gap-free seq codified as the contractual has-older mechanism (no earliestSeq field). transport-contract 0.9.0->0.10.0, wire 0.6.0->0.6.1 (doc-only). conversation-store +8 tests, transport-http +20; 935 vitest + 112 bun green. Live-verified: 6/6 probe checks OK. FE courier: frontend-history-windowing-handoff.md
Diffstat (limited to 'packages')
-rw-r--r--packages/conversation-store/src/store.test.ts88
-rw-r--r--packages/conversation-store/src/store.ts45
-rw-r--r--packages/transport-contract/package.json2
-rw-r--r--packages/transport-contract/src/index.ts48
-rw-r--r--packages/transport-http/src/app.test.ts169
-rw-r--r--packages/transport-http/src/app.ts39
-rw-r--r--packages/transport-http/src/index.ts3
-rw-r--r--packages/transport-http/src/logic.test.ts54
-rw-r--r--packages/transport-http/src/logic.ts27
-rw-r--r--packages/wire/package.json2
-rw-r--r--packages/wire/src/index.ts16
11 files changed, 470 insertions, 23 deletions
diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts
index 49b0e58..5b07eca 100644
--- a/packages/conversation-store/src/store.test.ts
+++ b/packages/conversation-store/src/store.test.ts
@@ -499,6 +499,94 @@ describe("ConversationStore", () => {
});
});
+describe("ConversationStore loadSince windowing", () => {
+ let storage: StorageNamespace;
+
+ beforeEach(() => {
+ storage = createMemoryStorage();
+ });
+
+ // Append `count` single-chunk user messages so seq runs 1..count, gap-free.
+ async function seed(store: ReturnType<typeof createConversationStore>, count: number) {
+ const messages: ChatMessage[] = [];
+ for (let i = 1; i <= count; i++) {
+ messages.push({ role: "user", chunks: [{ type: "text", text: `m${i}` }] });
+ }
+ await store.append("conv1", messages);
+ }
+
+ it("limit returns the newest N of the selection, ascending by seq", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 5);
+ const chunks = await store.loadSince("conv1", 0, { limit: 2 });
+ expect(chunks.map((c) => c.seq)).toEqual([4, 5]);
+ });
+
+ it("limit >= selection size returns the whole selection (exact, not truncated)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 3);
+ const exactlyAll = await store.loadSince("conv1", 0, { limit: 3 });
+ expect(exactlyAll.map((c) => c.seq)).toEqual([1, 2, 3]);
+ const overAll = await store.loadSince("conv1", 0, { limit: 99 });
+ expect(overAll.map((c) => c.seq)).toEqual([1, 2, 3]);
+ });
+
+ it("beforeSeq bounds the selection exclusively (seq < beforeSeq)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 5);
+ const chunks = await store.loadSince("conv1", 0, { beforeSeq: 3 });
+ expect(chunks.map((c) => c.seq)).toEqual([1, 2]);
+ });
+
+ it("sinceSeq + beforeSeq combine to sinceSeq < seq < beforeSeq", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 6);
+ const chunks = await store.loadSince("conv1", 2, { beforeSeq: 5 });
+ expect(chunks.map((c) => c.seq)).toEqual([3, 4]);
+ });
+
+ it("beforeSeq + limit: newest N below the bound, ascending (page older history in)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 8);
+ const chunks = await store.loadSince("conv1", 0, { beforeSeq: 6, limit: 2 });
+ expect(chunks.map((c) => c.seq)).toEqual([4, 5]);
+ });
+
+ it("empty selection returns [] (beforeSeq=1, and sinceSeq past the tail)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 4);
+ expect(await store.loadSince("conv1", 0, { beforeSeq: 1 })).toEqual([]);
+ expect(await store.loadSince("conv1", 4, { limit: 3 })).toEqual([]);
+ });
+
+ it("non-positive / non-integer limit and beforeSeq are treated as absent", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 4);
+ const all = [1, 2, 3, 4];
+ expect((await store.loadSince("conv1", 0, { limit: 0 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { limit: -2 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { limit: 1.5 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { beforeSeq: 0 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { beforeSeq: -3 })).map((c) => c.seq)).toEqual(all);
+ expect((await store.loadSince("conv1", 0, { beforeSeq: 2.7 })).map((c) => c.seq)).toEqual(all);
+ });
+
+ it("window omitted is identical to today's behavior (regression guard)", async () => {
+ const store = createConversationStore(storage);
+ await seed(store, 5);
+ const base = await store.loadSince("conv1", 1);
+ const withEmptyWindow = await store.loadSince("conv1", 1, {});
+ // A caller whose window fields happen to be undefined (e.g. unset query
+ // params) — modelled as an optional-field record, not explicit `undefined`
+ // literals (which exactOptionalPropertyTypes rejects on the contract).
+ const undefinedFieldsWindow: { beforeSeq?: number; limit?: number } = {};
+ const withUndefinedFields = await store.loadSince("conv1", 1, undefinedFieldsWindow);
+ expect(base.map((c) => c.seq)).toEqual([2, 3, 4, 5]);
+ expect(withEmptyWindow).toEqual(base);
+ expect(withUndefinedFields).toEqual(base);
+ });
+});
+
describe("ConversationStore metrics", () => {
let storage: StorageNamespace;
diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts
index 0948f64..0a42917 100644
--- a/packages/conversation-store/src/store.ts
+++ b/packages/conversation-store/src/store.ts
@@ -23,9 +23,33 @@ import { reconcileWithReport } from "./reconcile.js";
export interface ConversationStore {
readonly append: (conversationId: string, messages: readonly ChatMessage[]) => Promise<void>;
readonly load: (conversationId: string) => Promise<ChatMessage[]>;
+ /**
+ * Read the conversation's persisted chunks as a SELECTION + optional WINDOW,
+ * ascending by seq. The raw append-order log; NOT reconciled (a dangling
+ * tool-call is returned as-is — repair is a turn-path concern).
+ *
+ * - **Selection** — `sinceSeq` is an exclusive lower bound (`seq > sinceSeq`;
+ * omitted/`0`/non-positive/non-integer = from the start). When
+ * `window.beforeSeq` is given it is an exclusive upper bound
+ * (`seq < beforeSeq`). Together: `sinceSeq < seq < beforeSeq`.
+ * - **Window** — `window.limit` returns only the NEWEST `limit` chunks of the
+ * selection; the result STAYS ASCENDING by seq. A selection with ≤ `limit`
+ * chunks is returned whole (exact, not truncated).
+ * - **Omitted = unchanged** — `window` absent (or both its fields undefined)
+ * is byte-identical to the pre-windowing behavior, so existing callers that
+ * pass no third argument are unaffected.
+ * - **Garbage-in is forgiving** — a non-positive or non-integer `limit` (or
+ * `beforeSeq`) is treated as ABSENT (full selection); this method never
+ * throws on bad window input. The transport validates and 400s upstream.
+ *
+ * Seq numbering is 1-based and gap-free, so a client derives "older chunks
+ * exist" purely from the oldest returned `seq > 1`; there is deliberately no
+ * `earliestSeq`/high-water-mark API.
+ */
readonly loadSince: (
conversationId: string,
sinceSeq?: number,
+ window?: { readonly beforeSeq?: number; readonly limit?: number },
) => Promise<readonly StoredChunk[]>;
readonly appendMetrics: (conversationId: string, metrics: TurnMetrics) => Promise<void>;
readonly loadMetrics: (conversationId: string) => Promise<readonly TurnMetrics[]>;
@@ -37,6 +61,16 @@ export interface ConversationStore {
export const conversationStoreHandle = defineService<ConversationStore>("conversation-store/store");
+/**
+ * Coerce a window bound to a positive integer, or `undefined` (= absent) for any
+ * non-positive / non-integer / undefined input. Keeps `loadSince` total.
+ */
+function positiveInt(value: number | undefined): number | undefined {
+ if (value === undefined) return undefined;
+ if (!Number.isInteger(value) || value <= 0) return undefined;
+ return value;
+}
+
interface PersistedChunkEntry {
readonly chunk: Chunk;
readonly role: Role;
@@ -118,23 +152,32 @@ export function createConversationStore(
return repaired;
},
- async loadSince(conversationId, sinceSeq) {
+ async loadSince(conversationId, sinceSeq, window) {
const prefix = chunkPrefix(conversationId);
const keys = await storage.keys(prefix);
const sorted = [...keys].sort();
const result: StoredChunk[] = [];
const minSeq = sinceSeq ?? 0;
+ // Forgiving: a non-positive / non-integer bound is treated as ABSENT.
+ const beforeSeq = positiveInt(window?.beforeSeq);
+ const limit = positiveInt(window?.limit);
for (const key of sorted) {
const seq = parseSeq(key.split(":").pop() ?? null);
if (seq <= minSeq) continue;
+ if (beforeSeq !== undefined && seq >= beforeSeq) continue;
const value = await storage.get(key);
if (value === null) continue;
const entry = JSON.parse(value) as PersistedChunkEntry;
result.push({ seq, role: entry.role, chunk: entry.chunk });
}
+ // Window: keep only the NEWEST `limit` chunks, still ascending by seq.
+ if (limit !== undefined && result.length > limit) {
+ return result.slice(result.length - limit);
+ }
+
return result;
},
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index 2e9d586..84ceada 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/transport-contract",
- "version": "0.9.0",
+ "version": "0.10.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index b000147..b0f6e20 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -67,25 +67,49 @@ export interface ModelsResponse {
}
/**
- * Response body for `GET /conversations/:id?sinceSeq=<n>` — the incremental
- * read-side history endpoint a long-lived client uses to (re)hydrate a
- * conversation cheaply.
+ * Response body for
+ * `GET /conversations/:id?sinceSeq=<n>&beforeSeq=<s>&limit=<k>` — the
+ * incremental read-side history endpoint a long-lived client uses to
+ * (re)hydrate a conversation cheaply. All three query params are OPTIONAL and
+ * combine as one SELECTION + one WINDOW:
+ *
+ * - **Selection** — `sinceSeq` (exclusive lower bound, `seq > n`; omitted/0 =
+ * from the start) and `beforeSeq` (exclusive upper bound, `seq < s`; omitted
+ * = to the end). Together: `n < seq < s`.
+ * - **Window** — `limit=<k>` returns only the NEWEST `k` chunks of the
+ * selection (the response stays ASCENDING by seq). A selection with ≤ `k`
+ * chunks is returned whole. `limit` omitted = the full selection — exactly
+ * the pre-windowing behavior, so existing clients are unchanged.
+ * - `limit` and `beforeSeq` must be POSITIVE integers (`sinceSeq` may be 0);
+ * malformed, zero, or negative values → HTTP 400 `{ error }`.
+ *
+ * Intended client flows: fresh load = `?sinceSeq=0&limit=<k>` (newest window);
+ * tail sync = `?sinceSeq=<cursor>` (no limit); page older history in =
+ * `?beforeSeq=<oldestLoadedSeq>&limit=<k>`.
+ *
+ * Seq numbering is **1-based and gap-free** (a CONTRACTUAL GUARANTEE — see
+ * `StoredChunk` in `@dispatch/wire`): a client can derive "older chunks exist"
+ * purely from `oldestLoaded.seq > 1`; there is deliberately no
+ * `earliestSeq`/`hasOlder` response field.
*
* `chunks` is the RAW, append-order, seq-ordered slice of the conversation log
- * with `seq > sinceSeq` (or the whole log when `sinceSeq` is omitted/0). It is
- * NOT reconciled: a dangling tool-call is returned as-is (rendered as an
- * interrupted call). Reconciliation is a turn-path concern — the server repairs
- * history only when it feeds a provider, never on this read path — which is what
- * preserves the per-chunk `seq` cursor invariant (a synthesized repair chunk
- * would have no seq).
+ * selected + windowed as above. It is NOT reconciled: a dangling tool-call is
+ * returned as-is (rendered as an interrupted call). Reconciliation is a
+ * turn-path concern — the server repairs history only when it feeds a provider,
+ * never on this read path — which is what preserves the per-chunk `seq` cursor
+ * invariant (a synthesized repair chunk would have no seq).
*
* `latestSeq` is the `seq` of the LAST chunk in this response, or — when the
* slice is empty (the client is already caught up) — the requested `sinceSeq`
* (0 for a full read of an empty conversation). So after applying the response a
* client's new cursor is always `latestSeq`, and an empty `chunks` means
- * "nothing new past your cursor". (A true server-side high-water mark
- * independent of the filter is deferred until a consumer needs it — it would
- * require widening the store contract.)
+ * "nothing new past your cursor". CAVEAT (windowed reads): `latestSeq` is a
+ * TAIL-sync cursor — on a `beforeSeq` backfill page (or any `limit`ed read that
+ * did not reach the log's true tail) it describes the returned window, NOT the
+ * conversation's high-water mark, so a client must not regress its sync cursor
+ * from a backfill response. (A true server-side high-water mark independent of
+ * the filter is deferred until a consumer needs it — it would require widening
+ * the store contract.)
*/
export interface ConversationHistoryResponse {
readonly chunks: readonly StoredChunk[];
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts
index c26a868..88de38f 100644
--- a/packages/transport-http/src/app.test.ts
+++ b/packages/transport-http/src/app.test.ts
@@ -86,10 +86,19 @@ function createFakeConversationStore(
async load() {
return [];
},
- async loadSince(conversationId, sinceSeq) {
+ async loadSince(conversationId, sinceSeq, window) {
const chunks = store.get(conversationId) ?? [];
const minSeq = sinceSeq ?? 0;
- return chunks.filter((c) => c.seq > minSeq);
+ const beforeSeq = window?.beforeSeq;
+ const limit = window?.limit;
+ const selected = chunks.filter(
+ (c) => c.seq > minSeq && (beforeSeq === undefined || c.seq < beforeSeq),
+ );
+ // Window: keep only the NEWEST `limit`, still ascending by seq.
+ if (limit !== undefined && selected.length > limit) {
+ return selected.slice(selected.length - limit);
+ }
+ return selected;
},
async appendMetrics() {},
async loadMetrics(conversationId) {
@@ -710,6 +719,162 @@ describe("GET /conversations/:id", () => {
const res = await app.request("/conversations/conv1?sinceSeq=-1");
expect(res.status).toBe(400);
});
+
+ const sixChunks: StoredChunk[] = [
+ { seq: 1, role: "user", chunk: { type: "text", text: "one" } },
+ { seq: 2, role: "assistant", chunk: { type: "text", text: "two" } },
+ { seq: 3, role: "user", chunk: { type: "text", text: "three" } },
+ { seq: 4, role: "assistant", chunk: { type: "text", text: "four" } },
+ { seq: 5, role: "user", chunk: { type: "text", text: "five" } },
+ { seq: 6, role: "assistant", chunk: { type: "text", text: "six" } },
+ ];
+
+ function appWithChunks(chunks: StoredChunk[]) {
+ const store = new Map<string, StoredChunk[]>([["conv1", chunks]]);
+ return createApp({
+ conversationStore: createFakeConversationStore(store),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+ }
+
+ it("?limit=N returns only the newest N chunks, ascending, latestSeq = last seq", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?limit=2");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([5, 6]);
+ expect(body.latestSeq).toBe(6);
+ });
+
+ it("?limit=N with N >= conversation size returns the full log", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?limit=10");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4, 5, 6]);
+ expect(body.latestSeq).toBe(6);
+ });
+
+ it("?beforeSeq=S returns only chunks with seq < S", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?beforeSeq=3");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([1, 2]);
+ expect(body.latestSeq).toBe(2);
+ });
+
+ it("?beforeSeq=S&limit=N returns the newest N below S, ascending", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?beforeSeq=5&limit=2");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ // selection = seq 1..4; newest 2 = [3, 4]
+ expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]);
+ expect(body.latestSeq).toBe(4);
+ });
+
+ it("?sinceSeq=A&beforeSeq=B returns A < seq < B", async () => {
+ const app = appWithChunks(sixChunks);
+ const res = await app.request("/conversations/conv1?sinceSeq=2&beforeSeq=5");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([3, 4]);
+ expect(body.latestSeq).toBe(4);
+ });
+
+ describe("window param validation → 400 and store not called with an invalid window", () => {
+ function appCapturingWindow() {
+ const calls: {
+ readonly sinceSeq: number | undefined;
+ readonly window: { readonly beforeSeq?: number; readonly limit?: number } | undefined;
+ }[] = [];
+ const store: ConversationStore = {
+ async append() {},
+ async load() {
+ return [];
+ },
+ async loadSince(_conversationId, sinceSeq, window) {
+ calls.push({ sinceSeq, window });
+ return [];
+ },
+ async appendMetrics() {},
+ async loadMetrics() {
+ return [];
+ },
+ async getCwd() {
+ return null;
+ },
+ async setCwd() {},
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+ return { app, calls };
+ }
+
+ const cases: readonly { readonly name: string; readonly query: string }[] = [
+ { name: "limit=0", query: "limit=0" },
+ { name: "limit=-1", query: "limit=-1" },
+ { name: "limit=abc", query: "limit=abc" },
+ { name: "beforeSeq=0", query: "beforeSeq=0" },
+ { name: "beforeSeq=1.5", query: "beforeSeq=1.5" },
+ ];
+
+ for (const { name, query } of cases) {
+ it(`${name} → 400 { error } and loadSince is never called`, async () => {
+ const { app, calls } = appCapturingWindow();
+ const res = await app.request(`/conversations/conv1?${query}`);
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(typeof body.error).toBe("string");
+ expect(body.error.length).toBeGreaterThan(0);
+ expect(calls).toHaveLength(0);
+ });
+ }
+ });
+
+ it("no params → byte-identical to the no-window read (regression guard)", async () => {
+ // Rest-param spy: record how many args the route actually passes, so we
+ // can prove the third (window) arg is OMITTED entirely — not merely
+ // forwarded as undefined — preserving the existing two-arg call shape.
+ const argCounts: number[] = [];
+ const store: ConversationStore = {
+ async append() {},
+ async load() {
+ return [];
+ },
+ loadSince(...args: Parameters<ConversationStore["loadSince"]>) {
+ argCounts.push(args.length);
+ const sinceSeq = args[1] ?? 0;
+ return Promise.resolve(sampleChunks.filter((c) => c.seq > sinceSeq));
+ },
+ async appendMetrics() {},
+ async loadMetrics() {
+ return [];
+ },
+ async getCwd() {
+ return null;
+ },
+ async setCwd() {},
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+
+ const res = await app.request("/conversations/conv1");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { chunks: readonly StoredChunk[]; latestSeq: number };
+ expect(body.chunks.map((c) => c.seq)).toEqual([1, 2, 3, 4]);
+ expect(body.latestSeq).toBe(4);
+ // Called once, with exactly two arguments (no window arg).
+ expect(argCounts).toEqual([2]);
+ });
});
describe("GET /conversations/:id/metrics", () => {
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index 11d2850..ae24922 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -17,9 +17,11 @@ import {
computeExpectedCacheRate,
isParseError,
isSinceSeqError,
+ isWindowParamError,
parseChatBody,
parseSinceSeq,
parseWarmBody,
+ parseWindowParam,
serializeEventLine,
} from "./logic.js";
import {
@@ -147,8 +149,43 @@ export function createApp(opts: CreateServerOptions): Hono {
return c.json({ error: sinceSeqResult.error }, 400);
}
+ // `limit` / `beforeSeq` are optional positive-integer history-window
+ // params. The store is deliberately forgiving (a 0/negative bound is
+ // treated as ABSENT), so we MUST reject malformed values here and never
+ // forward an invalid window.
+ const beforeSeqResult = parseWindowParam(c.req.query("beforeSeq"), "beforeSeq");
+ if (isWindowParamError(beforeSeqResult)) {
+ log.warn("conversations: invalid beforeSeq", {
+ conversationId,
+ error: beforeSeqResult.error,
+ });
+ return c.json({ error: beforeSeqResult.error }, 400);
+ }
+ const limitResult = parseWindowParam(c.req.query("limit"), "limit");
+ if (isWindowParamError(limitResult)) {
+ log.warn("conversations: invalid limit", {
+ conversationId,
+ error: limitResult.error,
+ });
+ return c.json({ error: limitResult.error }, 400);
+ }
+
+ // Include only the fields actually provided (exactOptionalPropertyTypes),
+ // and omit the window argument entirely when neither was given — keeping
+ // the pre-windowing call shape byte-identical for existing callers.
+ const window: { readonly beforeSeq?: number; readonly limit?: number } | undefined =
+ beforeSeqResult !== undefined || limitResult !== undefined
+ ? {
+ ...(beforeSeqResult !== undefined ? { beforeSeq: beforeSeqResult } : {}),
+ ...(limitResult !== undefined ? { limit: limitResult } : {}),
+ }
+ : undefined;
+
try {
- const chunks = await opts.conversationStore.loadSince(conversationId, sinceSeqResult);
+ const chunks =
+ window !== undefined
+ ? await opts.conversationStore.loadSince(conversationId, sinceSeqResult, window)
+ : await opts.conversationStore.loadSince(conversationId, sinceSeqResult);
const latestSeq =
chunks.length > 0 ? (chunks[chunks.length - 1]?.seq ?? sinceSeqResult) : sinceSeqResult;
log.info("conversations: read", {
diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts
index 718aaef..1a42259 100644
--- a/packages/transport-http/src/index.ts
+++ b/packages/transport-http/src/index.ts
@@ -7,13 +7,16 @@ export type {
ParseResult,
SinceSeqResult,
WarmBodyParsed,
+ WindowParamResult,
} from "./logic.js";
export {
computeCachePct,
isParseError,
isSinceSeqError,
+ isWindowParamError,
parseChatBody,
parseSinceSeq,
+ parseWindowParam,
serializeEventLine,
} from "./logic.js";
export type {
diff --git a/packages/transport-http/src/logic.test.ts b/packages/transport-http/src/logic.test.ts
index 19b47ef..9a7bda2 100644
--- a/packages/transport-http/src/logic.test.ts
+++ b/packages/transport-http/src/logic.test.ts
@@ -4,8 +4,10 @@ import {
computeExpectedCacheRate,
isParseError,
isSinceSeqError,
+ isWindowParamError,
parseChatBody,
parseSinceSeq,
+ parseWindowParam,
serializeEventLine,
} from "./logic.js";
@@ -173,6 +175,58 @@ describe("parseSinceSeq", () => {
});
});
+describe("parseWindowParam", () => {
+ it("returns undefined (absent) when undefined", () => {
+ expect(parseWindowParam(undefined, "limit")).toBeUndefined();
+ });
+
+ it("returns undefined (absent) when empty string", () => {
+ expect(parseWindowParam("", "limit")).toBeUndefined();
+ });
+
+ it("parses a valid positive integer", () => {
+ expect(parseWindowParam("1", "limit")).toBe(1);
+ expect(parseWindowParam("42", "beforeSeq")).toBe(42);
+ });
+
+ it("returns ParseError for zero (store would treat it as absent)", () => {
+ const result = parseWindowParam("0", "limit");
+ expect(isWindowParamError(result)).toBe(true);
+ if (isWindowParamError(result)) {
+ expect(result.error).toContain("limit");
+ expect(result.error).toContain("positive integer");
+ }
+ });
+
+ it("returns ParseError for a negative integer", () => {
+ expect(isWindowParamError(parseWindowParam("-1", "limit"))).toBe(true);
+ });
+
+ it("returns ParseError for a non-integer", () => {
+ expect(isWindowParamError(parseWindowParam("1.5", "beforeSeq"))).toBe(true);
+ });
+
+ it("returns ParseError for a non-numeric string", () => {
+ const result = parseWindowParam("abc", "beforeSeq");
+ expect(isWindowParamError(result)).toBe(true);
+ if (isWindowParamError(result)) {
+ expect(result.error).toContain("beforeSeq");
+ }
+ });
+
+ it("names the param in the error message", () => {
+ const limit = parseWindowParam("0", "limit");
+ const before = parseWindowParam("0", "beforeSeq");
+ if (isWindowParamError(limit)) expect(limit.error).toContain("limit");
+ if (isWindowParamError(before)) expect(before.error).toContain("beforeSeq");
+ });
+
+ it("isWindowParamError is false for absent and for a valid number", () => {
+ expect(isWindowParamError(undefined)).toBe(false);
+ expect(isWindowParamError(5)).toBe(false);
+ });
+});
+
describe("serializeEventLine", () => {
it("serializes an event as JSON followed by newline", () => {
const event: AgentEvent = {
diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts
index bddedf0..0008110 100644
--- a/packages/transport-http/src/logic.ts
+++ b/packages/transport-http/src/logic.ts
@@ -72,6 +72,33 @@ export function isSinceSeqError(result: SinceSeqResult): result is ParseError {
return typeof result === "object";
}
+/**
+ * Result of parsing an OPTIONAL positive-integer history-window query param
+ * (`limit` / `beforeSeq`): `undefined` = the param was absent (omit it from the
+ * store window), a `number` = a valid positive integer, or a {@link ParseError}
+ * for a malformed / zero / negative value (the route 400s on it).
+ */
+export type WindowParamResult = number | undefined | ParseError;
+
+/**
+ * Parse an optional `limit` / `beforeSeq` query param. Unlike `sinceSeq` these
+ * must be STRICTLY POSITIVE integers when present (zero is rejected, since the
+ * store treats a zero bound as absent and would silently return the full log).
+ * Absent (`undefined` / empty) is the valid "no window" case → `undefined`.
+ */
+export function parseWindowParam(raw: string | undefined, name: string): WindowParamResult {
+ if (raw === undefined || raw === "") return undefined;
+ const n = Number(raw);
+ if (!Number.isInteger(n) || n <= 0) {
+ return { error: `${name} must be a positive integer` };
+ }
+ return n;
+}
+
+export function isWindowParamError(result: WindowParamResult): result is ParseError {
+ return typeof result === "object" && result !== null;
+}
+
export interface WarmBodyParsed {
readonly conversationId: string;
readonly model?: string;
diff --git a/packages/wire/package.json b/packages/wire/package.json
index f4ede19..d00772d 100644
--- a/packages/wire/package.json
+++ b/packages/wire/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/wire",
- "version": "0.6.0",
+ "version": "0.6.1",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 28aab87..4fdf389 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -124,11 +124,17 @@ export interface ChatMessage {
/**
* A persisted chunk plus its sync metadata. The append-only conversation log
- * stamps every chunk with a monotonic, gap-free, per-conversation `seq` (the
- * sync cursor, assigned in append order) and records the `role` of the message
- * it belongs to. This makes a flat seq-ordered stream both incrementally
- * syncable ("give me chunks after seq N") and regroupable into messages by the
- * client. `chunk` is the content unit — `Chunk` carries no storage/sync cursor
+ * stamps every chunk with a **1-based**, monotonic, gap-free, per-conversation
+ * `seq` (the sync cursor, assigned in append order) and records the `role` of
+ * the message it belongs to. This makes a flat seq-ordered stream both
+ * incrementally syncable ("give me chunks after seq N") and regroupable into
+ * messages by the client.
+ *
+ * The 1-based start is a CONTRACTUAL GUARANTEE (not an implementation detail):
+ * a conversation's first chunk is always `seq === 1` and numbering never skips,
+ * so a client holding only a windowed suffix of the log can derive "older
+ * chunks exist server-side" purely from `oldestLoaded.seq > 1` — no separate
+ * has-older flag is needed (or provided). `chunk` is the content unit — `Chunk` carries no storage/sync cursor
* (`seq` lives here on the envelope, not on the chunk, since it is assigned by
* the store and the provider has no use for it). A chunk MAY still carry
* generation provenance assigned at production time (e.g. a tool chunk's