summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/adapters/ws/index.test.ts112
-rw-r--r--src/adapters/ws/index.ts18
-rw-r--r--src/adapters/ws/logic.test.ts59
-rw-r--r--src/adapters/ws/logic.ts39
-rw-r--r--src/core/chunks/index.ts8
-rw-r--r--src/core/chunks/reducer.test.ts406
-rw-r--r--src/core/chunks/reducer.ts168
-rw-r--r--src/core/chunks/selectors.ts51
-rw-r--r--src/core/chunks/types.ts31
-rw-r--r--src/core/wire/conformance.test.ts181
-rw-r--r--src/core/wire/conformance.ts100
-rw-r--r--src/core/wire/index.ts6
-rw-r--r--src/features/conversation-cache/cache.test.ts173
-rw-r--r--src/features/conversation-cache/cache.ts71
-rw-r--r--src/features/conversation-cache/index.ts8
-rw-r--r--src/features/conversation-cache/logic.test.ts140
-rw-r--r--src/features/conversation-cache/logic.ts77
-rw-r--r--src/features/conversation-cache/types.ts42
18 files changed, 1680 insertions, 10 deletions
diff --git a/src/adapters/ws/index.test.ts b/src/adapters/ws/index.test.ts
index 92b8753..961f919 100644
--- a/src/adapters/ws/index.test.ts
+++ b/src/adapters/ws/index.test.ts
@@ -231,4 +231,116 @@ describe("createSurfaceSocket", () => {
payload: 1,
});
});
+
+ it("routes chat.delta to onChat", () => {
+ const ws = fakeSocket();
+ const onMessage = vi.fn();
+ const onChat = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ onChat,
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ const event = { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" };
+ ws.invokeMessage(JSON.stringify({ type: "chat.delta", event }));
+ expect(onChat).toHaveBeenCalledOnce();
+ expect(onChat).toHaveBeenCalledWith({ type: "chat.delta", event });
+ expect(onMessage).not.toHaveBeenCalled();
+ });
+
+ it("routes chat.error to onChat", () => {
+ const ws = fakeSocket();
+ const onMessage = vi.fn();
+ const onChat = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ onChat,
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ ws.invokeMessage(JSON.stringify({ type: "chat.error", message: "bad request" }));
+ expect(onChat).toHaveBeenCalledOnce();
+ expect(onChat).toHaveBeenCalledWith({ type: "chat.error", message: "bad request" });
+ expect(onMessage).not.toHaveBeenCalled();
+ });
+
+ it("still routes surface catalog/surface to onMessage", () => {
+ const ws = fakeSocket();
+ const onMessage = vi.fn();
+ const onChat = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ onChat,
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ ws.invokeMessage(JSON.stringify({ type: "catalog", catalog: [] }));
+ expect(onMessage).toHaveBeenCalledOnce();
+ expect(onMessage).toHaveBeenCalledWith({ type: "catalog", catalog: [] });
+ expect(onChat).not.toHaveBeenCalled();
+
+ ws.invokeMessage(
+ JSON.stringify({ type: "surface", spec: { id: "s1", region: "r", title: "S", fields: [] } }),
+ );
+ expect(onMessage).toHaveBeenCalledTimes(2);
+ });
+
+ it("send accepts and serializes a chat.send message", () => {
+ const ws = fakeSocket();
+ const handle = createSurfaceSocket({
+ url: "ws://test",
+ onMessage: vi.fn(),
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ handle.send({ type: "chat.send", message: "hello" });
+ expect(ws.sent).toHaveLength(1);
+ expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "chat.send", message: "hello" });
+ });
+
+ it("onChat absent is safe (surface-only usage does not throw)", () => {
+ const ws = fakeSocket();
+ const onMessage = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ expect(() => {
+ ws.invokeMessage(
+ JSON.stringify({
+ type: "chat.delta",
+ event: { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "x" },
+ }),
+ );
+ ws.invokeMessage(JSON.stringify({ type: "chat.error", message: "boom" }));
+ }).not.toThrow();
+ expect(onMessage).not.toHaveBeenCalled();
+ });
+
+ it("chat send is queued until open then flushed", () => {
+ const ws = fakeSocket();
+ const handle = createSurfaceSocket({
+ url: "ws://test",
+ onMessage: vi.fn(),
+ socketFactory: () => ws,
+ });
+
+ handle.send({ type: "chat.send", message: "queued" });
+ expect(ws.sent).toHaveLength(0);
+
+ ws.resolveOpen();
+ expect(ws.sent).toHaveLength(1);
+ expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "chat.send", message: "queued" });
+ });
});
diff --git a/src/adapters/ws/index.ts b/src/adapters/ws/index.ts
index 40eda2b..54a501c 100644
--- a/src/adapters/ws/index.ts
+++ b/src/adapters/ws/index.ts
@@ -1,4 +1,9 @@
-import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
+import type {
+ ChatDeltaMessage,
+ ChatErrorMessage,
+ WsClientMessage,
+} from "@dispatch/transport-contract";
+import type { SurfaceServerMessage } from "@dispatch/ui-contract";
import { nextBackoffMs, parseServerMessage, serialize } from "./logic";
export interface WebSocketLike {
@@ -12,12 +17,13 @@ export interface WebSocketLike {
export interface SurfaceSocketOptions {
url: string;
onMessage: (msg: SurfaceServerMessage) => void;
+ onChat?: (msg: ChatDeltaMessage | ChatErrorMessage) => void;
onReopen?: () => void;
socketFactory?: (url: string) => WebSocketLike;
}
export interface SurfaceSocketHandle {
- send(msg: SurfaceClientMessage): void;
+ send(msg: WsClientMessage): void;
close(): void;
}
@@ -52,7 +58,11 @@ export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHa
if (disposed) return;
const msg = parseServerMessage(ev.data);
if (msg !== null) {
- opts.onMessage(msg);
+ if (msg.type === "chat.delta" || msg.type === "chat.error") {
+ opts.onChat?.(msg as ChatDeltaMessage | ChatErrorMessage);
+ } else {
+ opts.onMessage(msg as SurfaceServerMessage);
+ }
}
};
@@ -76,7 +86,7 @@ export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHa
connect(false);
return {
- send(msg: SurfaceClientMessage): void {
+ send(msg: WsClientMessage): void {
if (disposed) return;
const raw = serialize(msg);
if (isOpen) {
diff --git a/src/adapters/ws/logic.test.ts b/src/adapters/ws/logic.test.ts
index 62ae6a0..546afe1 100644
--- a/src/adapters/ws/logic.test.ts
+++ b/src/adapters/ws/logic.test.ts
@@ -21,6 +21,22 @@ describe("serialize", () => {
const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "click" };
expect(JSON.parse(serialize(msg))).toEqual(msg);
});
+
+ it("serializes a chat.send message", () => {
+ const msg = { type: "chat.send" as const, message: "hello" };
+ expect(JSON.parse(serialize(msg))).toEqual(msg);
+ });
+
+ it("serializes a chat.send message with all fields", () => {
+ const msg = {
+ type: "chat.send" as const,
+ conversationId: "c1",
+ message: "hello",
+ model: "openai/gpt-4",
+ cwd: "/tmp",
+ };
+ expect(JSON.parse(serialize(msg))).toEqual(msg);
+ });
});
describe("parseServerMessage", () => {
@@ -135,6 +151,49 @@ describe("parseServerMessage", () => {
parseServerMessage(JSON.stringify({ type: "error", surfaceId: 42, message: "boom" })),
).toBeNull();
});
+
+ it("parses a chat.delta message", () => {
+ const event = { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hello" };
+ const data = JSON.stringify({ type: "chat.delta", event });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({ type: "chat.delta", event });
+ });
+
+ it("parses a chat.error message with conversationId", () => {
+ const data = JSON.stringify({
+ type: "chat.error",
+ conversationId: "c1",
+ message: "bad request",
+ });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({ type: "chat.error", conversationId: "c1", message: "bad request" });
+ });
+
+ it("parses a chat.error message without conversationId", () => {
+ const data = JSON.stringify({ type: "chat.error", message: "no conversation" });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({ type: "chat.error", message: "no conversation" });
+ });
+
+ it("returns null for chat.delta with non-object event", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "chat.delta", event: "nope" }))).toBeNull();
+ });
+
+ it("returns null for chat.delta with missing event.type", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "chat.delta", event: {} }))).toBeNull();
+ });
+
+ it("returns null for chat.error with non-string message", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "chat.error", message: 42 }))).toBeNull();
+ });
+
+ it("returns null for chat.error with invalid conversationId type", () => {
+ expect(
+ parseServerMessage(
+ JSON.stringify({ type: "chat.error", conversationId: 42, message: "boom" }),
+ ),
+ ).toBeNull();
+ });
});
describe("round-trip: parseServerMessage(serialize(...))", () => {
diff --git a/src/adapters/ws/logic.ts b/src/adapters/ws/logic.ts
index 83a5802..6592f1b 100644
--- a/src/adapters/ws/logic.ts
+++ b/src/adapters/ws/logic.ts
@@ -1,16 +1,27 @@
import type {
+ ChatDeltaMessage,
+ ChatErrorMessage,
+ WsClientMessage,
+ WsServerMessage,
+} from "@dispatch/transport-contract";
+import type {
CatalogMessage,
- SurfaceClientMessage,
SurfaceErrorMessage,
SurfaceMessage,
- SurfaceServerMessage,
SurfaceUpdateMessage,
} from "@dispatch/ui-contract";
-const VALID_SERVER_TYPES = new Set(["catalog", "surface", "update", "error"]);
+const VALID_SERVER_TYPES = new Set([
+ "catalog",
+ "surface",
+ "update",
+ "error",
+ "chat.delta",
+ "chat.error",
+]);
/** Serialize a client message to a JSON string for the wire. */
-export function serialize(msg: SurfaceClientMessage): string {
+export function serialize(msg: WsClientMessage): string {
return JSON.stringify(msg);
}
@@ -19,10 +30,10 @@ function isRecord(v: unknown): v is Record<string, unknown> {
}
/**
- * Parse a raw server message string into a typed SurfaceServerMessage.
+ * Parse a raw server message string into a typed WsServerMessage.
* Returns null for malformed JSON or shapes that don't match the protocol.
*/
-export function parseServerMessage(data: string): SurfaceServerMessage | null {
+export function parseServerMessage(data: string): WsServerMessage | null {
let parsed: unknown;
try {
parsed = JSON.parse(data);
@@ -72,6 +83,22 @@ export function parseServerMessage(data: string): SurfaceServerMessage | null {
: { type: "error", message: parsed.message };
return msg;
}
+ case "chat.delta": {
+ const event = parsed.event;
+ if (!isRecord(event)) return null;
+ if (typeof event.type !== "string") return null;
+ return { type: "chat.delta", event: event as unknown as ChatDeltaMessage["event"] };
+ }
+ case "chat.error": {
+ if (typeof parsed.message !== "string") return null;
+ const conversationId = parsed.conversationId;
+ if (conversationId !== undefined && typeof conversationId !== "string") return null;
+ const msg: ChatErrorMessage =
+ conversationId !== undefined
+ ? { type: "chat.error", conversationId, message: parsed.message }
+ : { type: "chat.error", message: parsed.message };
+ return msg;
+ }
default:
return null;
}
diff --git a/src/core/chunks/index.ts b/src/core/chunks/index.ts
new file mode 100644
index 0000000..36ba7f4
--- /dev/null
+++ b/src/core/chunks/index.ts
@@ -0,0 +1,8 @@
+export { applyHistory, foldEvent, initialState } from "./reducer";
+export { selectChunks, selectMessages } from "./selectors";
+export type {
+ AccumulatingChunk,
+ ProvisionalChunk,
+ RenderedChunk,
+ TranscriptState,
+} from "./types";
diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts
new file mode 100644
index 0000000..f83edb4
--- /dev/null
+++ b/src/core/chunks/reducer.test.ts
@@ -0,0 +1,406 @@
+import type {
+ StoredChunk,
+ TurnDoneEvent,
+ TurnErrorEvent,
+ TurnReasoningDeltaEvent,
+ TurnSealedEvent,
+ TurnStartEvent,
+ TurnTextDeltaEvent,
+ TurnToolCallEvent,
+ TurnToolResultEvent,
+ TurnUsageEvent,
+} from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import { applyHistory, foldEvent, initialState } from "./reducer";
+import { selectChunks, selectMessages } from "./selectors";
+
+const turnStart = (turnId: string): TurnStartEvent => ({
+ type: "turn-start",
+ conversationId: "c1",
+ turnId,
+});
+
+const textDelta = (turnId: string, delta: string): TurnTextDeltaEvent => ({
+ type: "text-delta",
+ conversationId: "c1",
+ turnId,
+ delta,
+});
+
+const reasoningDelta = (turnId: string, delta: string): TurnReasoningDeltaEvent => ({
+ type: "reasoning-delta",
+ conversationId: "c1",
+ turnId,
+ delta,
+});
+
+const toolCall = (
+ turnId: string,
+ toolCallId: string,
+ toolName: string,
+ input: unknown,
+): TurnToolCallEvent => ({
+ type: "tool-call",
+ conversationId: "c1",
+ turnId,
+ toolCallId,
+ toolName,
+ input,
+});
+
+const toolResult = (
+ turnId: string,
+ toolCallId: string,
+ toolName: string,
+ content: string,
+): TurnToolResultEvent => ({
+ type: "tool-result",
+ conversationId: "c1",
+ turnId,
+ toolCallId,
+ toolName,
+ content,
+ isError: false,
+});
+
+const usageEvent = (turnId: string, inputTokens: number, outputTokens: number): TurnUsageEvent => ({
+ type: "usage",
+ conversationId: "c1",
+ turnId,
+ usage: { inputTokens, outputTokens },
+});
+
+const errorEvent = (turnId: string, message: string, code?: string): TurnErrorEvent =>
+ code !== undefined
+ ? { type: "error", conversationId: "c1", turnId, message, code }
+ : { type: "error", conversationId: "c1", turnId, message };
+
+const doneEvent = (turnId: string): TurnDoneEvent => ({
+ type: "done",
+ conversationId: "c1",
+ turnId,
+ reason: "stop",
+});
+
+const turnSealed = (turnId: string): TurnSealedEvent => ({
+ type: "turn-sealed",
+ conversationId: "c1",
+ turnId,
+});
+
+const storedChunk = (
+ seq: number,
+ role: "user" | "assistant" | "tool" | "system",
+ chunk: StoredChunk["chunk"],
+): StoredChunk => ({
+ seq,
+ role,
+ chunk,
+});
+
+describe("initialState", () => {
+ it("initial state is empty", () => {
+ const s = initialState();
+ expect(s.committed).toEqual([]);
+ expect(s.provisional).toEqual([]);
+ expect(s.accumulating).toBeNull();
+ expect(s.currentTurnId).toBeNull();
+ expect(s.latestUsage).toBeNull();
+ expect(s.sealedTurnId).toBeNull();
+ });
+});
+
+describe("foldEvent — text-delta", () => {
+ it("text-delta accumulates into one TextChunk", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "hello"));
+ expect(s.accumulating).toEqual({ kind: "text", text: "hello" });
+ expect(s.provisional).toEqual([]);
+ });
+
+ it("successive text-deltas extend the same provisional chunk", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "hello "));
+ s = foldEvent(s, textDelta("t1", "world"));
+ expect(s.accumulating).toEqual({ kind: "text", text: "hello world" });
+ expect(s.provisional).toEqual([]);
+ });
+
+ it("text-delta after reasoning-delta flushes thinking and starts text", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, reasoningDelta("t1", "thinking..."));
+ s = foldEvent(s, textDelta("t1", "answer"));
+ expect(s.accumulating).toEqual({ kind: "text", text: "answer" });
+ expect(s.provisional).toHaveLength(1);
+ expect(s.provisional[0]?.chunk).toEqual({ type: "thinking", text: "thinking..." });
+ expect(s.provisional[0]?.role).toBe("assistant");
+ });
+});
+
+describe("foldEvent — reasoning-delta", () => {
+ it("reasoning-delta yields a thinking chunk", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, reasoningDelta("t1", "hmm"));
+ expect(s.accumulating).toEqual({ kind: "thinking", text: "hmm" });
+ });
+
+ it("successive reasoning-deltas extend the same chunk", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, reasoningDelta("t1", "hmm "));
+ s = foldEvent(s, reasoningDelta("t1", "ok"));
+ expect(s.accumulating).toEqual({ kind: "thinking", text: "hmm ok" });
+ });
+});
+
+describe("foldEvent — tool-call then tool-result", () => {
+ it("tool-call then tool-result render in order", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, toolCall("t1", "tc1", "bash", { cmd: "ls" }));
+ s = foldEvent(s, toolResult("t1", "tc1", "bash", "file.txt"));
+ expect(s.provisional).toHaveLength(2);
+ expect(s.provisional[0]?.role).toBe("assistant");
+ expect(s.provisional[0]?.chunk).toEqual({
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "bash",
+ input: { cmd: "ls" },
+ });
+ expect(s.provisional[1]?.role).toBe("tool");
+ expect(s.provisional[1]?.chunk).toEqual({
+ type: "tool-result",
+ toolCallId: "tc1",
+ toolName: "bash",
+ content: "file.txt",
+ isError: false,
+ });
+ });
+
+ it("tool-call flushes accumulating text", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "let me check"));
+ s = foldEvent(s, toolCall("t1", "tc1", "bash", {}));
+ expect(s.provisional).toHaveLength(2);
+ expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "let me check" });
+ expect(s.provisional[1]?.chunk).toMatchObject({ type: "tool-call", toolCallId: "tc1" });
+ expect(s.accumulating).toBeNull();
+ });
+});
+
+describe("foldEvent — turn-sealed", () => {
+ it("turn-sealed sets sealedTurnId", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "hi"));
+ s = foldEvent(s, turnSealed("t1"));
+ expect(s.sealedTurnId).toBe("t1");
+ expect(s.accumulating).toBeNull();
+ expect(s.provisional).toHaveLength(1);
+ expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "hi" });
+ });
+});
+
+describe("foldEvent — usage", () => {
+ it("stores latest usage", () => {
+ let s = initialState();
+ s = foldEvent(s, usageEvent("t1", 100, 50));
+ expect(s.latestUsage).toEqual({ inputTokens: 100, outputTokens: 50 });
+ });
+
+ it("overwrites previous usage", () => {
+ let s = initialState();
+ s = foldEvent(s, usageEvent("t1", 100, 50));
+ s = foldEvent(s, usageEvent("t1", 200, 80));
+ expect(s.latestUsage).toEqual({ inputTokens: 200, outputTokens: 80 });
+ });
+});
+
+describe("foldEvent — error", () => {
+ it("creates error chunk with code", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, errorEvent("t1", "bad", "E001"));
+ expect(s.provisional).toHaveLength(1);
+ expect(s.provisional[0]?.chunk).toEqual({ type: "error", message: "bad", code: "E001" });
+ expect(s.provisional[0]?.role).toBe("assistant");
+ });
+
+ it("creates error chunk without code", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, errorEvent("t1", "bad"));
+ expect(s.provisional).toHaveLength(1);
+ expect(s.provisional[0]?.chunk).toEqual({ type: "error", message: "bad" });
+ });
+});
+
+describe("foldEvent — done", () => {
+ it("flushes accumulating chunk on done", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "hello"));
+ s = foldEvent(s, doneEvent("t1"));
+ expect(s.accumulating).toBeNull();
+ expect(s.provisional).toHaveLength(1);
+ expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "hello" });
+ });
+});
+
+describe("foldEvent — status and tool-output", () => {
+ it("status is a no-op", () => {
+ const s = initialState();
+ const next = foldEvent(s, { type: "status", conversationId: "c1", status: "running" });
+ expect(next).toBe(s);
+ });
+
+ it("tool-output is a no-op", () => {
+ const s = initialState();
+ const next = foldEvent(s, {
+ type: "tool-output",
+ conversationId: "c1",
+ turnId: "t1",
+ toolCallId: "tc1",
+ data: "output",
+ stream: "stdout",
+ });
+ expect(next).toBe(s);
+ });
+});
+
+describe("applyHistory", () => {
+ it("orders committed chunks by seq", () => {
+ const s = initialState();
+ const chunks = [
+ storedChunk(3, "assistant", { type: "text", text: "c" }),
+ storedChunk(1, "user", { type: "text", text: "a" }),
+ storedChunk(2, "assistant", { type: "text", text: "b" }),
+ ];
+ const next = applyHistory(s, chunks);
+ expect(next.committed.map((c) => c.seq)).toEqual([1, 2, 3]);
+ });
+
+ it("is idempotent on duplicate seqs", () => {
+ let s = initialState();
+ const batch1 = [
+ storedChunk(1, "user", { type: "text", text: "a" }),
+ storedChunk(2, "assistant", { type: "text", text: "b" }),
+ ];
+ s = applyHistory(s, batch1);
+ const batch2 = [
+ storedChunk(2, "assistant", { type: "text", text: "b" }),
+ storedChunk(3, "assistant", { type: "text", text: "c" }),
+ ];
+ s = applyHistory(s, batch2);
+ expect(s.committed.map((c) => c.seq)).toEqual([1, 2, 3]);
+ expect(s.committed).toHaveLength(3);
+ });
+
+ it("supersedes & clears provisional once committed", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "hello"));
+ s = foldEvent(s, turnSealed("t1"));
+ expect(s.provisional).toHaveLength(1);
+ expect(s.sealedTurnId).toBe("t1");
+
+ s = applyHistory(s, [storedChunk(1, "assistant", { type: "text", text: "hello" })]);
+ expect(s.provisional).toEqual([]);
+ expect(s.accumulating).toBeNull();
+ expect(s.sealedTurnId).toBeNull();
+ expect(s.committed).toHaveLength(1);
+ });
+
+ it("keeps provisional and accumulating when sealedTurnId is null", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "wip"));
+ s = foldEvent(s, doneEvent("t1"));
+ s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]);
+ expect(s.provisional).toHaveLength(1);
+ expect(s.committed).toHaveLength(1);
+ });
+
+ it("merges new history into existing committed", () => {
+ let s = initialState();
+ s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "a" })]);
+ s = applyHistory(s, [storedChunk(2, "assistant", { type: "text", text: "b" })]);
+ expect(s.committed).toHaveLength(2);
+ expect(s.committed.map((c) => c.seq)).toEqual([1, 2]);
+ });
+});
+
+describe("selectChunks", () => {
+ it("selectChunks marks provisional with seq null", () => {
+ let s = initialState();
+ s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]);
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "wip"));
+ const chunks = selectChunks(s);
+ expect(chunks).toHaveLength(2);
+ expect(chunks[0]?.seq).toBe(1);
+ expect(chunks[0]?.provisional).toBe(false);
+ expect(chunks[1]?.seq).toBeNull();
+ expect(chunks[1]?.provisional).toBe(true);
+ });
+
+ it("returns empty for empty state", () => {
+ expect(selectChunks(initialState())).toEqual([]);
+ });
+
+ it("includes accumulating chunk as provisional", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "building..."));
+ const chunks = selectChunks(s);
+ expect(chunks).toHaveLength(1);
+ expect(chunks[0]?.seq).toBeNull();
+ expect(chunks[0]?.provisional).toBe(true);
+ expect(chunks[0]?.chunk).toEqual({ type: "text", text: "building..." });
+ });
+});
+
+describe("selectMessages", () => {
+ it("selectMessages groups consecutive same-role chunks", () => {
+ let s = initialState();
+ s = applyHistory(s, [
+ storedChunk(1, "user", { type: "text", text: "q1" }),
+ storedChunk(2, "user", { type: "text", text: "q2" }),
+ storedChunk(3, "assistant", { type: "text", text: "a1" }),
+ storedChunk(4, "assistant", { type: "text", text: "a2" }),
+ storedChunk(5, "user", { type: "text", text: "q3" }),
+ ]);
+ const msgs = selectMessages(s);
+ expect(msgs).toHaveLength(3);
+ expect(msgs[0]?.role).toBe("user");
+ expect(msgs[0]?.chunks).toHaveLength(2);
+ expect(msgs[1]?.role).toBe("assistant");
+ expect(msgs[1]?.chunks).toHaveLength(2);
+ expect(msgs[2]?.role).toBe("user");
+ expect(msgs[2]?.chunks).toHaveLength(1);
+ });
+
+ it("returns empty for empty state", () => {
+ expect(selectMessages(initialState())).toEqual([]);
+ });
+
+ it("mixes committed and provisional in messages", () => {
+ let s = initialState();
+ s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]);
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "a1"));
+ s = foldEvent(s, textDelta("t1", "a2"));
+ const msgs = selectMessages(s);
+ expect(msgs).toHaveLength(2);
+ expect(msgs[0]?.role).toBe("user");
+ expect(msgs[0]?.chunks).toHaveLength(1);
+ expect(msgs[1]?.role).toBe("assistant");
+ expect(msgs[1]?.chunks).toHaveLength(1);
+ expect(msgs[1]?.chunks[0]).toEqual({ type: "text", text: "a1a2" });
+ });
+});
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts
new file mode 100644
index 0000000..0a8ea54
--- /dev/null
+++ b/src/core/chunks/reducer.ts
@@ -0,0 +1,168 @@
+import type { AgentEvent, Chunk, StoredChunk } from "@dispatch/wire";
+import type { AccumulatingChunk, ProvisionalChunk, TranscriptState } from "./types";
+
+/** The initial empty transcript state. */
+export function initialState(): TranscriptState {
+ return {
+ committed: [],
+ provisional: [],
+ accumulating: null,
+ currentTurnId: null,
+ latestUsage: null,
+ sealedTurnId: null,
+ };
+}
+
+function flushAccumulating(
+ provisional: readonly ProvisionalChunk[],
+ acc: AccumulatingChunk | null,
+): readonly ProvisionalChunk[] {
+ if (acc === null) return provisional;
+ const chunk: Chunk =
+ acc.kind === "text" ? { type: "text", text: acc.text } : { type: "thinking", text: acc.text };
+ return [...provisional, { role: "assistant", chunk }];
+}
+
+/**
+ * Merge authoritative seq-keyed chunks into the committed history.
+ * Dedupes by seq (new wins), keeps seq-monotonic order, idempotent.
+ * When sealedTurnId is set, drops all provisional chunks (now superseded)
+ * and clears sealedTurnId.
+ */
+export function applyHistory(
+ state: TranscriptState,
+ chunks: readonly StoredChunk[],
+): TranscriptState {
+ const seqMap = new Map<number, StoredChunk>();
+ for (const c of state.committed) seqMap.set(c.seq, c);
+ for (const c of chunks) seqMap.set(c.seq, c);
+ const committed = Array.from(seqMap.values()).sort((a, b) => a.seq - b.seq);
+
+ if (state.sealedTurnId !== null) {
+ return {
+ ...state,
+ committed,
+ provisional: [],
+ accumulating: null,
+ sealedTurnId: null,
+ };
+ }
+
+ return { ...state, committed };
+}
+
+/**
+ * Fold one live AgentEvent into the provisional state.
+ *
+ * - `turn-start` records the turnId.
+ * - `text-delta` extends the current accumulating TextChunk (or starts one).
+ * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one).
+ * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and
+ * add a new provisional chunk.
+ * - `usage` stores the latest Usage.
+ * - `done` finalizes any accumulating chunk (turn still provisional).
+ * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId.
+ * - `status` and `tool-output` are ignored (best-effort no-ops).
+ */
+export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState {
+ switch (event.type) {
+ case "status":
+ case "tool-output":
+ return state;
+
+ case "turn-start":
+ return { ...state, currentTurnId: event.turnId };
+
+ case "text-delta": {
+ const acc = state.accumulating;
+ if (acc !== null && acc.kind === "text") {
+ return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } };
+ }
+ const provisional = flushAccumulating(state.provisional, acc);
+ return {
+ ...state,
+ provisional,
+ accumulating: { kind: "text", text: event.delta },
+ };
+ }
+
+ case "reasoning-delta": {
+ const acc = state.accumulating;
+ if (acc !== null && acc.kind === "thinking") {
+ return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } };
+ }
+ const provisional = flushAccumulating(state.provisional, acc);
+ return {
+ ...state,
+ provisional,
+ accumulating: { kind: "thinking", text: event.delta },
+ };
+ }
+
+ case "tool-call": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ const chunk: Chunk = {
+ type: "tool-call",
+ toolCallId: event.toolCallId,
+ toolName: event.toolName,
+ input: event.input,
+ };
+ return {
+ ...state,
+ provisional: [...provisional, { role: "assistant", chunk }],
+ accumulating: null,
+ };
+ }
+
+ case "tool-result": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ const chunk: Chunk = {
+ type: "tool-result",
+ toolCallId: event.toolCallId,
+ toolName: event.toolName,
+ content: event.content,
+ isError: event.isError,
+ };
+ return {
+ ...state,
+ provisional: [...provisional, { role: "tool", chunk }],
+ accumulating: null,
+ };
+ }
+
+ case "error": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ const chunk: Chunk =
+ event.code !== undefined
+ ? { type: "error", message: event.message, code: event.code }
+ : { type: "error", message: event.message };
+ return {
+ ...state,
+ provisional: [...provisional, { role: "assistant", chunk }],
+ accumulating: null,
+ };
+ }
+
+ case "usage":
+ return { ...state, latestUsage: event.usage };
+
+ case "done": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ return {
+ ...state,
+ provisional,
+ accumulating: null,
+ };
+ }
+
+ case "turn-sealed": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ return {
+ ...state,
+ provisional,
+ accumulating: null,
+ sealedTurnId: event.turnId,
+ };
+ }
+ }
+}
diff --git a/src/core/chunks/selectors.ts b/src/core/chunks/selectors.ts
new file mode 100644
index 0000000..8fb832f
--- /dev/null
+++ b/src/core/chunks/selectors.ts
@@ -0,0 +1,51 @@
+import type { ChatMessage, Chunk } from "@dispatch/wire";
+import type { RenderedChunk, TranscriptState } from "./types";
+
+/**
+ * Select all chunks for rendering: committed first (seq order),
+ * then provisional (seq: null).
+ */
+export function selectChunks(state: TranscriptState): readonly RenderedChunk[] {
+ const result: RenderedChunk[] = [];
+ for (const c of state.committed) {
+ result.push({ seq: c.seq, role: c.role, chunk: c.chunk, provisional: false });
+ }
+ for (const p of state.provisional) {
+ result.push({ seq: null, role: p.role, chunk: p.chunk, provisional: true });
+ }
+ if (state.accumulating !== null) {
+ const chunk: Chunk =
+ state.accumulating.kind === "text"
+ ? { type: "text", text: state.accumulating.text }
+ : { type: "thinking", text: state.accumulating.text };
+ result.push({ seq: null, role: "assistant", chunk, provisional: true });
+ }
+ return result;
+}
+
+/**
+ * Group consecutive same-role rendered chunks into ChatMessages.
+ */
+export function selectMessages(state: TranscriptState): readonly ChatMessage[] {
+ const rendered = selectChunks(state);
+ const first = rendered[0];
+ if (first === undefined) return [];
+
+ const messages: ChatMessage[] = [];
+ let role = first.role;
+ let chunks: Chunk[] = [first.chunk];
+
+ for (let i = 1; i < rendered.length; i++) {
+ const rc = rendered[i];
+ if (rc === undefined) continue;
+ if (rc.role === role) {
+ chunks.push(rc.chunk);
+ } else {
+ messages.push({ role, chunks });
+ role = rc.role;
+ chunks = [rc.chunk];
+ }
+ }
+ messages.push({ role, chunks });
+ return messages;
+}
diff --git a/src/core/chunks/types.ts b/src/core/chunks/types.ts
new file mode 100644
index 0000000..3792445
--- /dev/null
+++ b/src/core/chunks/types.ts
@@ -0,0 +1,31 @@
+import type { Chunk, Role, StoredChunk, Usage } from "@dispatch/wire";
+
+/** A chunk being accumulated from streaming deltas (text or thinking). */
+export interface AccumulatingChunk {
+ readonly kind: "text" | "thinking";
+ readonly text: string;
+}
+
+/** A provisional chunk that has no authoritative seq yet. */
+export interface ProvisionalChunk {
+ readonly role: Role;
+ readonly chunk: Chunk;
+}
+
+/** The transcript reducer state. Holds committed history + live in-flight turn. */
+export interface TranscriptState {
+ readonly committed: readonly StoredChunk[];
+ readonly provisional: readonly ProvisionalChunk[];
+ readonly accumulating: AccumulatingChunk | null;
+ readonly currentTurnId: string | null;
+ readonly latestUsage: Usage | null;
+ readonly sealedTurnId: string | null;
+}
+
+/** A chunk ready for rendering: either committed (with seq) or provisional. */
+export interface RenderedChunk {
+ readonly seq: number | null;
+ readonly role: Role;
+ readonly chunk: Chunk;
+ readonly provisional: boolean;
+}
diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts
new file mode 100644
index 0000000..c0f276f
--- /dev/null
+++ b/src/core/wire/conformance.test.ts
@@ -0,0 +1,181 @@
+import type { ChatSendMessage, ConversationHistoryResponse } from "@dispatch/transport-contract";
+import type { AgentEvent, StoredChunk } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import {
+ assertAgentEventExhaustive,
+ assertChunkExhaustive,
+ assertWsClientMessageExhaustive,
+ assertWsServerMessageExhaustive,
+} from "./conformance";
+
+describe("StoredChunk round-trips JSON", () => {
+ it("preserves shape through JSON serialize/deserialize", () => {
+ const original: StoredChunk = {
+ seq: 42,
+ role: "assistant",
+ chunk: { type: "text", text: "hello" },
+ };
+ const roundTripped: StoredChunk = JSON.parse(JSON.stringify(original)) as StoredChunk;
+ expect(roundTripped).toEqual(original);
+ expect(roundTripped.seq).toBe(42);
+ expect(roundTripped.role).toBe("assistant");
+ expect(roundTripped.chunk.type).toBe("text");
+ });
+});
+
+describe("classifies every AgentEvent type", () => {
+ const samples: AgentEvent[] = [
+ { type: "status", conversationId: "c1", status: "idle" },
+ { type: "turn-start", conversationId: "c1", turnId: "t1" },
+ { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" },
+ { type: "reasoning-delta", conversationId: "c1", turnId: "t1", delta: "thinking" },
+ {
+ type: "tool-call",
+ conversationId: "c1",
+ turnId: "t1",
+ toolCallId: "tc1",
+ toolName: "read",
+ input: {},
+ },
+ {
+ type: "tool-result",
+ conversationId: "c1",
+ turnId: "t1",
+ toolCallId: "tc1",
+ toolName: "read",
+ content: "ok",
+ isError: false,
+ },
+ {
+ type: "tool-output",
+ conversationId: "c1",
+ turnId: "t1",
+ toolCallId: "tc1",
+ data: "out",
+ stream: "stdout",
+ },
+ {
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ usage: { inputTokens: 10, outputTokens: 20 },
+ },
+ { type: "error", conversationId: "c1", turnId: "t1", message: "oops" },
+ { type: "done", conversationId: "c1", turnId: "t1", reason: "complete" },
+ { type: "turn-sealed", conversationId: "c1", turnId: "t1" },
+ ];
+
+ it("returns a stable label for every AgentEvent.type variant", () => {
+ const labels = samples.map(assertAgentEventExhaustive);
+ expect(labels).toEqual([
+ "status",
+ "turn-start",
+ "text-delta",
+ "reasoning-delta",
+ "tool-call",
+ "tool-result",
+ "tool-output",
+ "usage",
+ "error",
+ "done",
+ "turn-sealed",
+ ]);
+ });
+
+ it("covers all 11 AgentEvent variants", () => {
+ expect(samples).toHaveLength(11);
+ });
+});
+
+describe("classifies every Chunk type", () => {
+ it("returns a stable label for each Chunk.type variant", () => {
+ const chunks = [
+ { type: "text" as const, text: "a" },
+ { type: "thinking" as const, text: "b" },
+ { type: "tool-call" as const, toolCallId: "tc", toolName: "n", input: null },
+ {
+ type: "tool-result" as const,
+ toolCallId: "tc",
+ toolName: "n",
+ content: "c",
+ isError: false,
+ },
+ { type: "error" as const, message: "e" },
+ { type: "system" as const, text: "s" },
+ ];
+ const labels = chunks.map(assertChunkExhaustive);
+ expect(labels).toEqual(["text", "thinking", "tool-call", "tool-result", "error", "system"]);
+ });
+});
+
+describe("classifies every WsServerMessage type", () => {
+ it("returns a stable label for each variant", () => {
+ const msgs = [
+ { type: "catalog" as const, catalog: [] },
+ { type: "surface" as const, spec: { id: "s", region: "r", title: "S", fields: [] } },
+ {
+ type: "update" as const,
+ update: { surfaceId: "s", spec: { id: "s", region: "r", title: "S", fields: [] } },
+ },
+ { type: "error" as const, message: "e" },
+ {
+ type: "chat.delta" as const,
+ event: { type: "done" as const, conversationId: "c", turnId: "t", reason: "r" },
+ },
+ { type: "chat.error" as const, message: "e" },
+ ];
+ const labels = msgs.map(assertWsServerMessageExhaustive);
+ expect(labels).toEqual(["catalog", "surface", "update", "error", "chat.delta", "chat.error"]);
+ });
+});
+
+describe("classifies every WsClientMessage type", () => {
+ it("returns a stable label for each variant", () => {
+ const msgs = [
+ { type: "subscribe" as const, surfaceId: "s" },
+ { type: "unsubscribe" as const, surfaceId: "s" },
+ { type: "invoke" as const, surfaceId: "s", actionId: "a" },
+ { type: "chat.send" as const, message: "hi" },
+ ];
+ const labels = msgs.map(assertWsClientMessageExhaustive);
+ expect(labels).toEqual(["subscribe", "unsubscribe", "invoke", "chat.send"]);
+ });
+});
+
+describe("ChatSendMessage shape is constructible", () => {
+ it("constructs a minimal ChatSendMessage", () => {
+ const msg: ChatSendMessage = { type: "chat.send", message: "hello" };
+ expect(msg.type).toBe("chat.send");
+ expect(msg.message).toBe("hello");
+ });
+
+ it("constructs a full ChatSendMessage", () => {
+ const msg: ChatSendMessage = {
+ type: "chat.send",
+ conversationId: "c1",
+ message: "hello",
+ model: "default/gpt-4",
+ cwd: "/tmp",
+ };
+ expect(msg.conversationId).toBe("c1");
+ expect(msg.model).toBe("default/gpt-4");
+ expect(msg.cwd).toBe("/tmp");
+ });
+});
+
+describe("ConversationHistoryResponse shape is constructible", () => {
+ it("constructs a response with chunks", () => {
+ const resp: ConversationHistoryResponse = {
+ chunks: [{ seq: 1, role: "user", chunk: { type: "text", text: "hi" } }],
+ latestSeq: 1,
+ };
+ expect(resp.chunks).toHaveLength(1);
+ expect(resp.latestSeq).toBe(1);
+ });
+
+ it("constructs an empty (caught-up) response", () => {
+ const resp: ConversationHistoryResponse = { chunks: [], latestSeq: 5 };
+ expect(resp.chunks).toHaveLength(0);
+ expect(resp.latestSeq).toBe(5);
+ });
+});
diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts
new file mode 100644
index 0000000..5d75a60
--- /dev/null
+++ b/src/core/wire/conformance.ts
@@ -0,0 +1,100 @@
+import type { WsClientMessage, WsServerMessage } from "@dispatch/transport-contract";
+import type { AgentEvent, Chunk } from "@dispatch/wire";
+
+/**
+ * Compile-time exhaustiveness guard for `AgentEvent.type`.
+ * If a variant is added/removed/renamed in `@dispatch/wire`, this function's
+ * default branch becomes reachable → TypeScript error at build time.
+ */
+export function assertAgentEventExhaustive(event: AgentEvent): string {
+ switch (event.type) {
+ case "status":
+ return "status";
+ case "turn-start":
+ return "turn-start";
+ case "text-delta":
+ return "text-delta";
+ case "reasoning-delta":
+ return "reasoning-delta";
+ case "tool-call":
+ return "tool-call";
+ case "tool-result":
+ return "tool-result";
+ case "tool-output":
+ return "tool-output";
+ case "usage":
+ return "usage";
+ case "error":
+ return "error";
+ case "done":
+ return "done";
+ case "turn-sealed":
+ return "turn-sealed";
+ default:
+ return event satisfies never;
+ }
+}
+
+/**
+ * Compile-time exhaustiveness guard for `Chunk.type`.
+ */
+export function assertChunkExhaustive(chunk: Chunk): string {
+ switch (chunk.type) {
+ case "text":
+ return "text";
+ case "thinking":
+ return "thinking";
+ case "tool-call":
+ return "tool-call";
+ case "tool-result":
+ return "tool-result";
+ case "error":
+ return "error";
+ case "system":
+ return "system";
+ default:
+ return chunk satisfies never;
+ }
+}
+
+/**
+ * Compile-time exhaustiveness guard for `WsServerMessage.type`.
+ * Covers both surface ops and chat ops.
+ */
+export function assertWsServerMessageExhaustive(msg: WsServerMessage): string {
+ switch (msg.type) {
+ case "catalog":
+ return "catalog";
+ case "surface":
+ return "surface";
+ case "update":
+ return "update";
+ case "error":
+ return "error";
+ case "chat.delta":
+ return "chat.delta";
+ case "chat.error":
+ return "chat.error";
+ default:
+ return msg satisfies never;
+ }
+}
+
+/**
+ * Compile-time exhaustiveness guard for `WsClientMessage.type`.
+ * Covers both surface ops and chat ops.
+ */
+export function assertWsClientMessageExhaustive(msg: WsClientMessage): string {
+ switch (msg.type) {
+ case "subscribe":
+ return "subscribe";
+ case "unsubscribe":
+ return "unsubscribe";
+ case "invoke":
+ return "invoke";
+ case "chat.send":
+ return "chat.send";
+ default:
+ return msg satisfies never;
+ }
+}
diff --git a/src/core/wire/index.ts b/src/core/wire/index.ts
new file mode 100644
index 0000000..ae6b3e6
--- /dev/null
+++ b/src/core/wire/index.ts
@@ -0,0 +1,6 @@
+export {
+ assertAgentEventExhaustive,
+ assertChunkExhaustive,
+ assertWsClientMessageExhaustive,
+ assertWsServerMessageExhaustive,
+} from "./conformance";
diff --git a/src/features/conversation-cache/cache.test.ts b/src/features/conversation-cache/cache.test.ts
new file mode 100644
index 0000000..c68ed0d
--- /dev/null
+++ b/src/features/conversation-cache/cache.test.ts
@@ -0,0 +1,173 @@
+import type { StoredChunk } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import { createConversationCache } from "./cache";
+import type { ConversationCacheIndexEntry, ConversationChunkStore } from "./types";
+
+const chunk = (seq: number, role: "user" | "assistant" = "user"): StoredChunk => ({
+ seq,
+ role,
+ chunk: { type: "text", text: `chunk-${seq}` },
+});
+
+/**
+ * In-memory fake ConversationChunkStore — the ONLY allowed fake.
+ * An outermost edge: simulates the storage port without any real I/O.
+ */
+function createFakeStore(): ConversationChunkStore {
+ const store = new Map<string, StoredChunk[]>();
+
+ return {
+ async load(conversationId) {
+ return store.get(conversationId) ?? [];
+ },
+
+ async append(conversationId, chunks) {
+ const existing = store.get(conversationId) ?? [];
+ const existingSeqs = new Set(existing.map((c) => c.seq));
+ const toAdd = chunks.filter((c) => !existingSeqs.has(c.seq));
+ store.set(
+ conversationId,
+ [...existing, ...toAdd].sort((a, b) => a.seq - b.seq),
+ );
+ },
+
+ async delete(conversationId) {
+ store.delete(conversationId);
+ },
+
+ async index() {
+ const entries: ConversationCacheIndexEntry[] = [];
+ for (const [id, chunks] of store) {
+ if (chunks.length === 0) continue;
+ let maxSeq = 0;
+ for (const c of chunks) {
+ if (c.seq > maxSeq) maxSeq = c.seq;
+ }
+ entries.push({
+ conversationId: id,
+ chunkCount: chunks.length,
+ maxSeq,
+ });
+ }
+ return entries;
+ },
+ };
+}
+
+describe("cache.load", () => {
+ it("returns stored chunks", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store);
+ await store.append("conv-1", [chunk(1), chunk(2)]);
+ const result = await cache.load("conv-1");
+ expect(result).toEqual([chunk(1), chunk(2)]);
+ });
+
+ it("returns empty array for absent conversation", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store);
+ const result = await cache.load("nonexistent");
+ expect(result).toEqual([]);
+ });
+});
+
+describe("cache.commit", () => {
+ it("appends only new chunks", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store);
+ await store.append("conv-1", [chunk(1), chunk(2)]);
+
+ const merged = await cache.commit("conv-1", [chunk(2), chunk(3)]);
+ expect(merged).toEqual([chunk(1), chunk(2), chunk(3)]);
+
+ // Verify store has all chunks
+ const stored = await store.load("conv-1");
+ expect(stored).toEqual([chunk(1), chunk(2), chunk(3)]);
+ });
+
+ it("returns full merged result", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store);
+
+ const merged = await cache.commit("conv-1", [chunk(3), chunk(1)]);
+ expect(merged).toEqual([chunk(1), chunk(3)]);
+ });
+
+ it("is idempotent — re-committing same chunks is a no-op", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store);
+
+ await cache.commit("conv-1", [chunk(1), chunk(2)]);
+ const merged = await cache.commit("conv-1", [chunk(1), chunk(2)]);
+ expect(merged).toEqual([chunk(1), chunk(2)]);
+
+ const stored = await store.load("conv-1");
+ expect(stored).toEqual([chunk(1), chunk(2)]);
+ });
+});
+
+describe("cache.sinceSeq", () => {
+ it("returns max seq from cache", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store);
+ await store.append("conv-1", [chunk(1), chunk(5), chunk(3)]);
+ expect(await cache.sinceSeq("conv-1")).toBe(5);
+ });
+
+ it("returns 0 for empty conversation", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store);
+ expect(await cache.sinceSeq("conv-1")).toBe(0);
+ });
+});
+
+describe("cache.evictIfOverBudget", () => {
+ it("deletes selected conversations", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store, { maxChunks: 5 });
+
+ await store.append("a", [chunk(1), chunk(2)]);
+ await store.append("b", [chunk(1), chunk(2)]);
+ await store.append("c", [chunk(1)]);
+
+ // Total = 5, max = 5, under budget
+ const evicted = await cache.evictIfOverBudget(null);
+ expect(evicted).toEqual([]);
+
+ // Add more to go over budget
+ await store.append("d", [chunk(1), chunk(2), chunk(3)]);
+ // Total = 8, max = 5, need to evict 3+ chunks
+
+ const evicted2 = await cache.evictIfOverBudget(null);
+ expect(evicted2.length).toBeGreaterThan(0);
+
+ // Verify evicted conversations are deleted
+ for (const id of evicted2) {
+ expect(await store.load(id)).toEqual([]);
+ }
+ });
+
+ it("never evicts the active conversation", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store, { maxChunks: 3 });
+
+ await store.append("active", [chunk(1), chunk(2), chunk(3)]);
+ await store.append("other", [chunk(1), chunk(2)]);
+
+ // Total = 5, max = 3, need to evict 2+ chunks
+ const evicted = await cache.evictIfOverBudget("active");
+ expect(evicted).not.toContain("active");
+ expect(evicted).toContain("other");
+ });
+
+ it("returns empty when under budget", async () => {
+ const store = createFakeStore();
+ const cache = createConversationCache(store, { maxChunks: 100 });
+
+ await store.append("a", [chunk(1)]);
+ await store.append("b", [chunk(1)]);
+
+ const evicted = await cache.evictIfOverBudget(null);
+ expect(evicted).toEqual([]);
+ });
+});
diff --git a/src/features/conversation-cache/cache.ts b/src/features/conversation-cache/cache.ts
new file mode 100644
index 0000000..4aab487
--- /dev/null
+++ b/src/features/conversation-cache/cache.ts
@@ -0,0 +1,71 @@
+import type { StoredChunk } from "@dispatch/wire";
+import { nextSinceSeq, reconcileCache, selectEvictions } from "./logic";
+import type { ConversationChunkStore } from "./types";
+
+export interface ConversationCache {
+ /** Load all cached chunks for a conversation. */
+ load(conversationId: string): Promise<readonly StoredChunk[]>;
+
+ /**
+ * Load + reconcile + append new chunks.
+ * Returns the merged cache (the new authoritative cache for this conversation).
+ */
+ commit(conversationId: string, incoming: readonly StoredChunk[]): Promise<readonly StoredChunk[]>;
+
+ /** Return the `?sinceSeq=` cursor for the next incremental sync. */
+ sinceSeq(conversationId: string): Promise<number>;
+
+ /**
+ * Evict conversations over budget.
+ * Returns the evicted conversationIds.
+ */
+ evictIfOverBudget(activeConversationId: string | null): Promise<readonly string[]>;
+}
+
+export interface ConversationCacheOptions {
+ /** Maximum total chunks across all conversations before eviction triggers. */
+ readonly maxChunks?: number;
+}
+
+const DEFAULT_MAX_CHUNKS = 10_000;
+
+/**
+ * Create a conversation cache backed by the injected storage port.
+ *
+ * The ONLY impurity is the injected `store`; all logic delegates to pure functions.
+ */
+export function createConversationCache(
+ store: ConversationChunkStore,
+ opts?: ConversationCacheOptions,
+): ConversationCache {
+ const maxChunks = opts?.maxChunks ?? DEFAULT_MAX_CHUNKS;
+
+ return {
+ async load(conversationId) {
+ return store.load(conversationId);
+ },
+
+ async commit(conversationId, incoming) {
+ const cached = await store.load(conversationId);
+ const { merged, toAppend } = reconcileCache(cached, incoming);
+ if (toAppend.length > 0) {
+ await store.append(conversationId, toAppend);
+ }
+ return merged;
+ },
+
+ async sinceSeq(conversationId) {
+ const cached = await store.load(conversationId);
+ return nextSinceSeq(cached);
+ },
+
+ async evictIfOverBudget(activeConversationId) {
+ const idx = await store.index();
+ const toEvict = selectEvictions(idx, { maxChunks, activeConversationId });
+ for (const id of toEvict) {
+ await store.delete(id);
+ }
+ return toEvict;
+ },
+ };
+}
diff --git a/src/features/conversation-cache/index.ts b/src/features/conversation-cache/index.ts
new file mode 100644
index 0000000..ba3f69a
--- /dev/null
+++ b/src/features/conversation-cache/index.ts
@@ -0,0 +1,8 @@
+export type { ConversationCache, ConversationCacheOptions } from "./cache";
+export { createConversationCache } from "./cache";
+export { nextSinceSeq, reconcileCache, selectEvictions } from "./logic";
+export type {
+ ConversationCacheIndexEntry,
+ ConversationChunkStore,
+ ReconcileResult,
+} from "./types";
diff --git a/src/features/conversation-cache/logic.test.ts b/src/features/conversation-cache/logic.test.ts
new file mode 100644
index 0000000..858460a
--- /dev/null
+++ b/src/features/conversation-cache/logic.test.ts
@@ -0,0 +1,140 @@
+import type { StoredChunk } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import { nextSinceSeq, reconcileCache, selectEvictions } from "./logic";
+import type { ConversationCacheIndexEntry } from "./types";
+
+const chunk = (seq: number, role: "user" | "assistant" = "user"): StoredChunk => ({
+ seq,
+ role,
+ chunk: { type: "text", text: `chunk-${seq}` },
+});
+
+describe("reconcileCache", () => {
+ it("merges and dedupes by seq", () => {
+ const cached = [chunk(1), chunk(2)];
+ const incoming = [chunk(2), chunk(3)];
+ const result = reconcileCache(cached, incoming);
+ expect(result.merged).toEqual([chunk(1), chunk(2), chunk(3)]);
+ });
+
+ it("toAppend excludes already-cached seqs", () => {
+ const cached = [chunk(1), chunk(2)];
+ const incoming = [chunk(2), chunk(3)];
+ const result = reconcileCache(cached, incoming);
+ expect(result.toAppend).toEqual([chunk(3)]);
+ });
+
+ it("tolerates out-of-order incoming", () => {
+ const cached = [chunk(1)];
+ const incoming = [chunk(5), chunk(3), chunk(2)];
+ const result = reconcileCache(cached, incoming);
+ expect(result.merged).toEqual([chunk(1), chunk(2), chunk(3), chunk(5)]);
+ expect(result.toAppend).toEqual([chunk(5), chunk(3), chunk(2)]);
+ });
+
+ it("returns empty merged and toAppend when both inputs are empty", () => {
+ const result = reconcileCache([], []);
+ expect(result.merged).toEqual([]);
+ expect(result.toAppend).toEqual([]);
+ });
+
+ it("handles empty cached with incoming", () => {
+ const incoming = [chunk(3), chunk(1)];
+ const result = reconcileCache([], incoming);
+ expect(result.merged).toEqual([chunk(1), chunk(3)]);
+ expect(result.toAppend).toEqual([chunk(3), chunk(1)]);
+ });
+
+ it("handles cached with empty incoming", () => {
+ const cached = [chunk(1), chunk(2)];
+ const result = reconcileCache(cached, []);
+ expect(result.merged).toEqual([chunk(1), chunk(2)]);
+ expect(result.toAppend).toEqual([]);
+ });
+
+ it("is idempotent — re-reconciling same incoming produces same result", () => {
+ const cached = [chunk(1)];
+ const incoming = [chunk(2), chunk(3)];
+ const first = reconcileCache(cached, incoming);
+ const second = reconcileCache(first.merged, incoming);
+ expect(second.merged).toEqual(first.merged);
+ expect(second.toAppend).toEqual([]);
+ });
+});
+
+describe("nextSinceSeq", () => {
+ it("returns max seq", () => {
+ const cached = [chunk(1), chunk(5), chunk(3)];
+ expect(nextSinceSeq(cached)).toBe(5);
+ });
+
+ it("returns 0 when empty", () => {
+ expect(nextSinceSeq([])).toBe(0);
+ });
+
+ it("returns single seq for single chunk", () => {
+ expect(nextSinceSeq([chunk(42)])).toBe(42);
+ });
+});
+
+describe("selectEvictions", () => {
+ it("never evicts the active conversation", () => {
+ const index: ConversationCacheIndexEntry[] = [
+ { conversationId: "active", chunkCount: 100, maxSeq: 100, lastAccess: 1000 },
+ { conversationId: "other", chunkCount: 50, maxSeq: 50, lastAccess: 1 },
+ ];
+ const result = selectEvictions(index, { maxChunks: 50, activeConversationId: "active" });
+ expect(result).not.toContain("active");
+ expect(result).toContain("other");
+ });
+
+ it("evicts LRU until under budget", () => {
+ const index: ConversationCacheIndexEntry[] = [
+ { conversationId: "a", chunkCount: 30, maxSeq: 30, lastAccess: 100 },
+ { conversationId: "b", chunkCount: 30, maxSeq: 30, lastAccess: 50 },
+ { conversationId: "c", chunkCount: 30, maxSeq: 30, lastAccess: 200 },
+ { conversationId: "d", chunkCount: 30, maxSeq: 30, lastAccess: 10 },
+ ];
+ // Total = 120, max = 60, need to evict 60+ chunks
+ // LRU order: d(10), b(50), a(100), c(200)
+ const result = selectEvictions(index, { maxChunks: 60, activeConversationId: null });
+ expect(result).toEqual(["d", "b"]);
+ });
+
+ it("is a no-op under budget", () => {
+ const index: ConversationCacheIndexEntry[] = [
+ { conversationId: "a", chunkCount: 10, maxSeq: 10, lastAccess: 100 },
+ { conversationId: "b", chunkCount: 10, maxSeq: 10, lastAccess: 50 },
+ ];
+ const result = selectEvictions(index, { maxChunks: 100, activeConversationId: null });
+ expect(result).toEqual([]);
+ });
+
+ it("returns empty for empty index", () => {
+ const result = selectEvictions([], { maxChunks: 100, activeConversationId: null });
+ expect(result).toEqual([]);
+ });
+
+ it("tie-breaks by smaller maxSeq when lastAccess is equal", () => {
+ const index: ConversationCacheIndexEntry[] = [
+ { conversationId: "a", chunkCount: 30, maxSeq: 100, lastAccess: 50 },
+ { conversationId: "b", chunkCount: 30, maxSeq: 50, lastAccess: 50 },
+ { conversationId: "c", chunkCount: 30, maxSeq: 200, lastAccess: 50 },
+ ];
+ // Total = 90, max = 60, need to evict 30+ chunks
+ // All have same lastAccess, tie-break by maxSeq: b(50), a(100), c(200)
+ const result = selectEvictions(index, { maxChunks: 60, activeConversationId: null });
+ expect(result).toEqual(["b"]);
+ });
+
+ it("handles missing lastAccess (treated as 0)", () => {
+ const index: ConversationCacheIndexEntry[] = [
+ { conversationId: "a", chunkCount: 30, maxSeq: 30, lastAccess: 100 },
+ { conversationId: "b", chunkCount: 30, maxSeq: 30 },
+ ];
+ // Total = 60, max = 30, need to evict 30+ chunks
+ // b has no lastAccess (0), a has 100
+ const result = selectEvictions(index, { maxChunks: 30, activeConversationId: null });
+ expect(result).toEqual(["b"]);
+ });
+});
diff --git a/src/features/conversation-cache/logic.ts b/src/features/conversation-cache/logic.ts
new file mode 100644
index 0000000..4a4479e
--- /dev/null
+++ b/src/features/conversation-cache/logic.ts
@@ -0,0 +1,77 @@
+import type { StoredChunk } from "@dispatch/wire";
+import type { ConversationCacheIndexEntry, ReconcileResult } from "./types";
+
+/**
+ * Merge authoritative seq-keyed chunks with cached chunks.
+ *
+ * Deduplicates by `seq`, produces seq-monotonic order.
+ * `toAppend` = the incoming chunks whose `seq` is not already in `cached`
+ * (exactly what to persist). Idempotent; tolerant of out-of-order/overlapping `incoming`.
+ */
+export function reconcileCache(
+ cached: readonly StoredChunk[],
+ incoming: readonly StoredChunk[],
+): ReconcileResult {
+ const seen = new Set<number>();
+ for (const chunk of cached) {
+ seen.add(chunk.seq);
+ }
+
+ const toAppend: StoredChunk[] = [];
+ for (const chunk of incoming) {
+ if (!seen.has(chunk.seq)) {
+ toAppend.push(chunk);
+ seen.add(chunk.seq);
+ }
+ }
+
+ const merged = [...cached, ...toAppend].sort((a, b) => a.seq - b.seq);
+ return { merged, toAppend };
+}
+
+/**
+ * Return the max committed `seq`, or `0` if empty.
+ * This is the `?sinceSeq=` cursor for the next incremental sync.
+ */
+export function nextSinceSeq(cached: readonly StoredChunk[]): number {
+ if (cached.length === 0) return 0;
+ let max = 0;
+ for (const chunk of cached) {
+ if (chunk.seq > max) max = chunk.seq;
+ }
+ return max;
+}
+
+/**
+ * Choose conversationIds to evict to get total cached chunks under `maxChunks`.
+ *
+ * LRU eviction: oldest `lastAccess` first, tie-break smaller `maxSeq`.
+ * NEVER evicts the `activeConversationId`.
+ * Returns [] when under budget.
+ */
+export function selectEvictions(
+ index: readonly ConversationCacheIndexEntry[],
+ opts: { maxChunks: number; activeConversationId: string | null },
+): readonly string[] {
+ const totalChunks = index.reduce((sum, entry) => sum + entry.chunkCount, 0);
+ if (totalChunks <= opts.maxChunks) return [];
+
+ const candidates = index
+ .filter((entry) => entry.conversationId !== opts.activeConversationId)
+ .sort((a, b) => {
+ const aAccess = a.lastAccess ?? 0;
+ const bAccess = b.lastAccess ?? 0;
+ if (aAccess !== bAccess) return aAccess - bAccess;
+ return a.maxSeq - b.maxSeq;
+ });
+
+ let remaining = totalChunks;
+ const evictions: string[] = [];
+ for (const entry of candidates) {
+ if (remaining <= opts.maxChunks) break;
+ evictions.push(entry.conversationId);
+ remaining -= entry.chunkCount;
+ }
+
+ return evictions;
+}
diff --git a/src/features/conversation-cache/types.ts b/src/features/conversation-cache/types.ts
new file mode 100644
index 0000000..2a349cc
--- /dev/null
+++ b/src/features/conversation-cache/types.ts
@@ -0,0 +1,42 @@
+import type { StoredChunk } from "@dispatch/wire";
+
+/** Metadata entry for a cached conversation, used by eviction logic. */
+export interface ConversationCacheIndexEntry {
+ readonly conversationId: string;
+ readonly chunkCount: number;
+ readonly maxSeq: number;
+ readonly lastAccess?: number;
+}
+
+/**
+ * Storage port for conversation chunk persistence.
+ *
+ * The IndexedDB implementation lives in `src/adapters/idb/` (separate unit);
+ * this interface is the contract the cache logic depends on.
+ *
+ * All methods MUST be idempotent on `seq`: re-appending an existing seq is a no-op.
+ */
+export interface ConversationChunkStore {
+ /** Load all cached chunks for a conversation, seq-ordered. Returns [] if absent. */
+ load(conversationId: string): Promise<readonly StoredChunk[]>;
+
+ /**
+ * Append committed chunks to a conversation's cache.
+ * MUST be idempotent on `seq`: re-appending an existing seq is a no-op.
+ */
+ append(conversationId: string, chunks: readonly StoredChunk[]): Promise<void>;
+
+ /** Delete all cached data for a conversation. */
+ delete(conversationId: string): Promise<void>;
+
+ /** Return metadata for all cached conversations (for eviction). */
+ index(): Promise<readonly ConversationCacheIndexEntry[]>;
+}
+
+/** Result of reconciling cached chunks with incoming authoritative chunks. */
+export interface ReconcileResult {
+ /** The merged, deduplicated, seq-ordered chunk list. */
+ readonly merged: readonly StoredChunk[];
+ /** The subset of incoming chunks that need to be appended (not already cached). */
+ readonly toAppend: readonly StoredChunk[];
+}