diff options
| -rw-r--r-- | src/adapters/ws/index.test.ts | 112 | ||||
| -rw-r--r-- | src/adapters/ws/index.ts | 18 | ||||
| -rw-r--r-- | src/adapters/ws/logic.test.ts | 59 | ||||
| -rw-r--r-- | src/adapters/ws/logic.ts | 39 | ||||
| -rw-r--r-- | src/core/chunks/index.ts | 8 | ||||
| -rw-r--r-- | src/core/chunks/reducer.test.ts | 406 | ||||
| -rw-r--r-- | src/core/chunks/reducer.ts | 168 | ||||
| -rw-r--r-- | src/core/chunks/selectors.ts | 51 | ||||
| -rw-r--r-- | src/core/chunks/types.ts | 31 | ||||
| -rw-r--r-- | src/core/wire/conformance.test.ts | 181 | ||||
| -rw-r--r-- | src/core/wire/conformance.ts | 100 | ||||
| -rw-r--r-- | src/core/wire/index.ts | 6 | ||||
| -rw-r--r-- | src/features/conversation-cache/cache.test.ts | 173 | ||||
| -rw-r--r-- | src/features/conversation-cache/cache.ts | 71 | ||||
| -rw-r--r-- | src/features/conversation-cache/index.ts | 8 | ||||
| -rw-r--r-- | src/features/conversation-cache/logic.test.ts | 140 | ||||
| -rw-r--r-- | src/features/conversation-cache/logic.ts | 77 | ||||
| -rw-r--r-- | src/features/conversation-cache/types.ts | 42 |
18 files changed, 1680 insertions, 10 deletions
diff --git a/src/adapters/ws/index.test.ts b/src/adapters/ws/index.test.ts index 92b8753..961f919 100644 --- a/src/adapters/ws/index.test.ts +++ b/src/adapters/ws/index.test.ts @@ -231,4 +231,116 @@ describe("createSurfaceSocket", () => { payload: 1, }); }); + + it("routes chat.delta to onChat", () => { + const ws = fakeSocket(); + const onMessage = vi.fn(); + const onChat = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage, + onChat, + socketFactory: () => ws, + }); + + ws.resolveOpen(); + const event = { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" }; + ws.invokeMessage(JSON.stringify({ type: "chat.delta", event })); + expect(onChat).toHaveBeenCalledOnce(); + expect(onChat).toHaveBeenCalledWith({ type: "chat.delta", event }); + expect(onMessage).not.toHaveBeenCalled(); + }); + + it("routes chat.error to onChat", () => { + const ws = fakeSocket(); + const onMessage = vi.fn(); + const onChat = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage, + onChat, + socketFactory: () => ws, + }); + + ws.resolveOpen(); + ws.invokeMessage(JSON.stringify({ type: "chat.error", message: "bad request" })); + expect(onChat).toHaveBeenCalledOnce(); + expect(onChat).toHaveBeenCalledWith({ type: "chat.error", message: "bad request" }); + expect(onMessage).not.toHaveBeenCalled(); + }); + + it("still routes surface catalog/surface to onMessage", () => { + const ws = fakeSocket(); + const onMessage = vi.fn(); + const onChat = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage, + onChat, + socketFactory: () => ws, + }); + + ws.resolveOpen(); + ws.invokeMessage(JSON.stringify({ type: "catalog", catalog: [] })); + expect(onMessage).toHaveBeenCalledOnce(); + expect(onMessage).toHaveBeenCalledWith({ type: "catalog", catalog: [] }); + expect(onChat).not.toHaveBeenCalled(); + + ws.invokeMessage( + JSON.stringify({ type: "surface", spec: { id: "s1", region: "r", title: "S", fields: [] } }), + ); + expect(onMessage).toHaveBeenCalledTimes(2); + }); + + it("send accepts and serializes a chat.send message", () => { + const ws = fakeSocket(); + const handle = createSurfaceSocket({ + url: "ws://test", + onMessage: vi.fn(), + socketFactory: () => ws, + }); + + ws.resolveOpen(); + handle.send({ type: "chat.send", message: "hello" }); + expect(ws.sent).toHaveLength(1); + expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "chat.send", message: "hello" }); + }); + + it("onChat absent is safe (surface-only usage does not throw)", () => { + const ws = fakeSocket(); + const onMessage = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage, + socketFactory: () => ws, + }); + + ws.resolveOpen(); + expect(() => { + ws.invokeMessage( + JSON.stringify({ + type: "chat.delta", + event: { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "x" }, + }), + ); + ws.invokeMessage(JSON.stringify({ type: "chat.error", message: "boom" })); + }).not.toThrow(); + expect(onMessage).not.toHaveBeenCalled(); + }); + + it("chat send is queued until open then flushed", () => { + const ws = fakeSocket(); + const handle = createSurfaceSocket({ + url: "ws://test", + onMessage: vi.fn(), + socketFactory: () => ws, + }); + + handle.send({ type: "chat.send", message: "queued" }); + expect(ws.sent).toHaveLength(0); + + ws.resolveOpen(); + expect(ws.sent).toHaveLength(1); + expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "chat.send", message: "queued" }); + }); }); diff --git a/src/adapters/ws/index.ts b/src/adapters/ws/index.ts index 40eda2b..54a501c 100644 --- a/src/adapters/ws/index.ts +++ b/src/adapters/ws/index.ts @@ -1,4 +1,9 @@ -import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; +import type { + ChatDeltaMessage, + ChatErrorMessage, + WsClientMessage, +} from "@dispatch/transport-contract"; +import type { SurfaceServerMessage } from "@dispatch/ui-contract"; import { nextBackoffMs, parseServerMessage, serialize } from "./logic"; export interface WebSocketLike { @@ -12,12 +17,13 @@ export interface WebSocketLike { export interface SurfaceSocketOptions { url: string; onMessage: (msg: SurfaceServerMessage) => void; + onChat?: (msg: ChatDeltaMessage | ChatErrorMessage) => void; onReopen?: () => void; socketFactory?: (url: string) => WebSocketLike; } export interface SurfaceSocketHandle { - send(msg: SurfaceClientMessage): void; + send(msg: WsClientMessage): void; close(): void; } @@ -52,7 +58,11 @@ export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHa if (disposed) return; const msg = parseServerMessage(ev.data); if (msg !== null) { - opts.onMessage(msg); + if (msg.type === "chat.delta" || msg.type === "chat.error") { + opts.onChat?.(msg as ChatDeltaMessage | ChatErrorMessage); + } else { + opts.onMessage(msg as SurfaceServerMessage); + } } }; @@ -76,7 +86,7 @@ export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHa connect(false); return { - send(msg: SurfaceClientMessage): void { + send(msg: WsClientMessage): void { if (disposed) return; const raw = serialize(msg); if (isOpen) { diff --git a/src/adapters/ws/logic.test.ts b/src/adapters/ws/logic.test.ts index 62ae6a0..546afe1 100644 --- a/src/adapters/ws/logic.test.ts +++ b/src/adapters/ws/logic.test.ts @@ -21,6 +21,22 @@ describe("serialize", () => { const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "click" }; expect(JSON.parse(serialize(msg))).toEqual(msg); }); + + it("serializes a chat.send message", () => { + const msg = { type: "chat.send" as const, message: "hello" }; + expect(JSON.parse(serialize(msg))).toEqual(msg); + }); + + it("serializes a chat.send message with all fields", () => { + const msg = { + type: "chat.send" as const, + conversationId: "c1", + message: "hello", + model: "openai/gpt-4", + cwd: "/tmp", + }; + expect(JSON.parse(serialize(msg))).toEqual(msg); + }); }); describe("parseServerMessage", () => { @@ -135,6 +151,49 @@ describe("parseServerMessage", () => { parseServerMessage(JSON.stringify({ type: "error", surfaceId: 42, message: "boom" })), ).toBeNull(); }); + + it("parses a chat.delta message", () => { + const event = { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hello" }; + const data = JSON.stringify({ type: "chat.delta", event }); + const result = parseServerMessage(data); + expect(result).toEqual({ type: "chat.delta", event }); + }); + + it("parses a chat.error message with conversationId", () => { + const data = JSON.stringify({ + type: "chat.error", + conversationId: "c1", + message: "bad request", + }); + const result = parseServerMessage(data); + expect(result).toEqual({ type: "chat.error", conversationId: "c1", message: "bad request" }); + }); + + it("parses a chat.error message without conversationId", () => { + const data = JSON.stringify({ type: "chat.error", message: "no conversation" }); + const result = parseServerMessage(data); + expect(result).toEqual({ type: "chat.error", message: "no conversation" }); + }); + + it("returns null for chat.delta with non-object event", () => { + expect(parseServerMessage(JSON.stringify({ type: "chat.delta", event: "nope" }))).toBeNull(); + }); + + it("returns null for chat.delta with missing event.type", () => { + expect(parseServerMessage(JSON.stringify({ type: "chat.delta", event: {} }))).toBeNull(); + }); + + it("returns null for chat.error with non-string message", () => { + expect(parseServerMessage(JSON.stringify({ type: "chat.error", message: 42 }))).toBeNull(); + }); + + it("returns null for chat.error with invalid conversationId type", () => { + expect( + parseServerMessage( + JSON.stringify({ type: "chat.error", conversationId: 42, message: "boom" }), + ), + ).toBeNull(); + }); }); describe("round-trip: parseServerMessage(serialize(...))", () => { diff --git a/src/adapters/ws/logic.ts b/src/adapters/ws/logic.ts index 83a5802..6592f1b 100644 --- a/src/adapters/ws/logic.ts +++ b/src/adapters/ws/logic.ts @@ -1,16 +1,27 @@ import type { + ChatDeltaMessage, + ChatErrorMessage, + WsClientMessage, + WsServerMessage, +} from "@dispatch/transport-contract"; +import type { CatalogMessage, - SurfaceClientMessage, SurfaceErrorMessage, SurfaceMessage, - SurfaceServerMessage, SurfaceUpdateMessage, } from "@dispatch/ui-contract"; -const VALID_SERVER_TYPES = new Set(["catalog", "surface", "update", "error"]); +const VALID_SERVER_TYPES = new Set([ + "catalog", + "surface", + "update", + "error", + "chat.delta", + "chat.error", +]); /** Serialize a client message to a JSON string for the wire. */ -export function serialize(msg: SurfaceClientMessage): string { +export function serialize(msg: WsClientMessage): string { return JSON.stringify(msg); } @@ -19,10 +30,10 @@ function isRecord(v: unknown): v is Record<string, unknown> { } /** - * Parse a raw server message string into a typed SurfaceServerMessage. + * Parse a raw server message string into a typed WsServerMessage. * Returns null for malformed JSON or shapes that don't match the protocol. */ -export function parseServerMessage(data: string): SurfaceServerMessage | null { +export function parseServerMessage(data: string): WsServerMessage | null { let parsed: unknown; try { parsed = JSON.parse(data); @@ -72,6 +83,22 @@ export function parseServerMessage(data: string): SurfaceServerMessage | null { : { type: "error", message: parsed.message }; return msg; } + case "chat.delta": { + const event = parsed.event; + if (!isRecord(event)) return null; + if (typeof event.type !== "string") return null; + return { type: "chat.delta", event: event as unknown as ChatDeltaMessage["event"] }; + } + case "chat.error": { + if (typeof parsed.message !== "string") return null; + const conversationId = parsed.conversationId; + if (conversationId !== undefined && typeof conversationId !== "string") return null; + const msg: ChatErrorMessage = + conversationId !== undefined + ? { type: "chat.error", conversationId, message: parsed.message } + : { type: "chat.error", message: parsed.message }; + return msg; + } default: return null; } diff --git a/src/core/chunks/index.ts b/src/core/chunks/index.ts new file mode 100644 index 0000000..36ba7f4 --- /dev/null +++ b/src/core/chunks/index.ts @@ -0,0 +1,8 @@ +export { applyHistory, foldEvent, initialState } from "./reducer"; +export { selectChunks, selectMessages } from "./selectors"; +export type { + AccumulatingChunk, + ProvisionalChunk, + RenderedChunk, + TranscriptState, +} from "./types"; diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts new file mode 100644 index 0000000..f83edb4 --- /dev/null +++ b/src/core/chunks/reducer.test.ts @@ -0,0 +1,406 @@ +import type { + StoredChunk, + TurnDoneEvent, + TurnErrorEvent, + TurnReasoningDeltaEvent, + TurnSealedEvent, + TurnStartEvent, + TurnTextDeltaEvent, + TurnToolCallEvent, + TurnToolResultEvent, + TurnUsageEvent, +} from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { applyHistory, foldEvent, initialState } from "./reducer"; +import { selectChunks, selectMessages } from "./selectors"; + +const turnStart = (turnId: string): TurnStartEvent => ({ + type: "turn-start", + conversationId: "c1", + turnId, +}); + +const textDelta = (turnId: string, delta: string): TurnTextDeltaEvent => ({ + type: "text-delta", + conversationId: "c1", + turnId, + delta, +}); + +const reasoningDelta = (turnId: string, delta: string): TurnReasoningDeltaEvent => ({ + type: "reasoning-delta", + conversationId: "c1", + turnId, + delta, +}); + +const toolCall = ( + turnId: string, + toolCallId: string, + toolName: string, + input: unknown, +): TurnToolCallEvent => ({ + type: "tool-call", + conversationId: "c1", + turnId, + toolCallId, + toolName, + input, +}); + +const toolResult = ( + turnId: string, + toolCallId: string, + toolName: string, + content: string, +): TurnToolResultEvent => ({ + type: "tool-result", + conversationId: "c1", + turnId, + toolCallId, + toolName, + content, + isError: false, +}); + +const usageEvent = (turnId: string, inputTokens: number, outputTokens: number): TurnUsageEvent => ({ + type: "usage", + conversationId: "c1", + turnId, + usage: { inputTokens, outputTokens }, +}); + +const errorEvent = (turnId: string, message: string, code?: string): TurnErrorEvent => + code !== undefined + ? { type: "error", conversationId: "c1", turnId, message, code } + : { type: "error", conversationId: "c1", turnId, message }; + +const doneEvent = (turnId: string): TurnDoneEvent => ({ + type: "done", + conversationId: "c1", + turnId, + reason: "stop", +}); + +const turnSealed = (turnId: string): TurnSealedEvent => ({ + type: "turn-sealed", + conversationId: "c1", + turnId, +}); + +const storedChunk = ( + seq: number, + role: "user" | "assistant" | "tool" | "system", + chunk: StoredChunk["chunk"], +): StoredChunk => ({ + seq, + role, + chunk, +}); + +describe("initialState", () => { + it("initial state is empty", () => { + const s = initialState(); + expect(s.committed).toEqual([]); + expect(s.provisional).toEqual([]); + expect(s.accumulating).toBeNull(); + expect(s.currentTurnId).toBeNull(); + expect(s.latestUsage).toBeNull(); + expect(s.sealedTurnId).toBeNull(); + }); +}); + +describe("foldEvent — text-delta", () => { + it("text-delta accumulates into one TextChunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello")); + expect(s.accumulating).toEqual({ kind: "text", text: "hello" }); + expect(s.provisional).toEqual([]); + }); + + it("successive text-deltas extend the same provisional chunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello ")); + s = foldEvent(s, textDelta("t1", "world")); + expect(s.accumulating).toEqual({ kind: "text", text: "hello world" }); + expect(s.provisional).toEqual([]); + }); + + it("text-delta after reasoning-delta flushes thinking and starts text", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, reasoningDelta("t1", "thinking...")); + s = foldEvent(s, textDelta("t1", "answer")); + expect(s.accumulating).toEqual({ kind: "text", text: "answer" }); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "thinking", text: "thinking..." }); + expect(s.provisional[0]?.role).toBe("assistant"); + }); +}); + +describe("foldEvent — reasoning-delta", () => { + it("reasoning-delta yields a thinking chunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, reasoningDelta("t1", "hmm")); + expect(s.accumulating).toEqual({ kind: "thinking", text: "hmm" }); + }); + + it("successive reasoning-deltas extend the same chunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, reasoningDelta("t1", "hmm ")); + s = foldEvent(s, reasoningDelta("t1", "ok")); + expect(s.accumulating).toEqual({ kind: "thinking", text: "hmm ok" }); + }); +}); + +describe("foldEvent — tool-call then tool-result", () => { + it("tool-call then tool-result render in order", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, toolCall("t1", "tc1", "bash", { cmd: "ls" })); + s = foldEvent(s, toolResult("t1", "tc1", "bash", "file.txt")); + expect(s.provisional).toHaveLength(2); + expect(s.provisional[0]?.role).toBe("assistant"); + expect(s.provisional[0]?.chunk).toEqual({ + type: "tool-call", + toolCallId: "tc1", + toolName: "bash", + input: { cmd: "ls" }, + }); + expect(s.provisional[1]?.role).toBe("tool"); + expect(s.provisional[1]?.chunk).toEqual({ + type: "tool-result", + toolCallId: "tc1", + toolName: "bash", + content: "file.txt", + isError: false, + }); + }); + + it("tool-call flushes accumulating text", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "let me check")); + s = foldEvent(s, toolCall("t1", "tc1", "bash", {})); + expect(s.provisional).toHaveLength(2); + expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "let me check" }); + expect(s.provisional[1]?.chunk).toMatchObject({ type: "tool-call", toolCallId: "tc1" }); + expect(s.accumulating).toBeNull(); + }); +}); + +describe("foldEvent — turn-sealed", () => { + it("turn-sealed sets sealedTurnId", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hi")); + s = foldEvent(s, turnSealed("t1")); + expect(s.sealedTurnId).toBe("t1"); + expect(s.accumulating).toBeNull(); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "hi" }); + }); +}); + +describe("foldEvent — usage", () => { + it("stores latest usage", () => { + let s = initialState(); + s = foldEvent(s, usageEvent("t1", 100, 50)); + expect(s.latestUsage).toEqual({ inputTokens: 100, outputTokens: 50 }); + }); + + it("overwrites previous usage", () => { + let s = initialState(); + s = foldEvent(s, usageEvent("t1", 100, 50)); + s = foldEvent(s, usageEvent("t1", 200, 80)); + expect(s.latestUsage).toEqual({ inputTokens: 200, outputTokens: 80 }); + }); +}); + +describe("foldEvent — error", () => { + it("creates error chunk with code", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, errorEvent("t1", "bad", "E001")); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "error", message: "bad", code: "E001" }); + expect(s.provisional[0]?.role).toBe("assistant"); + }); + + it("creates error chunk without code", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, errorEvent("t1", "bad")); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "error", message: "bad" }); + }); +}); + +describe("foldEvent — done", () => { + it("flushes accumulating chunk on done", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello")); + s = foldEvent(s, doneEvent("t1")); + expect(s.accumulating).toBeNull(); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "hello" }); + }); +}); + +describe("foldEvent — status and tool-output", () => { + it("status is a no-op", () => { + const s = initialState(); + const next = foldEvent(s, { type: "status", conversationId: "c1", status: "running" }); + expect(next).toBe(s); + }); + + it("tool-output is a no-op", () => { + const s = initialState(); + const next = foldEvent(s, { + type: "tool-output", + conversationId: "c1", + turnId: "t1", + toolCallId: "tc1", + data: "output", + stream: "stdout", + }); + expect(next).toBe(s); + }); +}); + +describe("applyHistory", () => { + it("orders committed chunks by seq", () => { + const s = initialState(); + const chunks = [ + storedChunk(3, "assistant", { type: "text", text: "c" }), + storedChunk(1, "user", { type: "text", text: "a" }), + storedChunk(2, "assistant", { type: "text", text: "b" }), + ]; + const next = applyHistory(s, chunks); + expect(next.committed.map((c) => c.seq)).toEqual([1, 2, 3]); + }); + + it("is idempotent on duplicate seqs", () => { + let s = initialState(); + const batch1 = [ + storedChunk(1, "user", { type: "text", text: "a" }), + storedChunk(2, "assistant", { type: "text", text: "b" }), + ]; + s = applyHistory(s, batch1); + const batch2 = [ + storedChunk(2, "assistant", { type: "text", text: "b" }), + storedChunk(3, "assistant", { type: "text", text: "c" }), + ]; + s = applyHistory(s, batch2); + expect(s.committed.map((c) => c.seq)).toEqual([1, 2, 3]); + expect(s.committed).toHaveLength(3); + }); + + it("supersedes & clears provisional once committed", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello")); + s = foldEvent(s, turnSealed("t1")); + expect(s.provisional).toHaveLength(1); + expect(s.sealedTurnId).toBe("t1"); + + s = applyHistory(s, [storedChunk(1, "assistant", { type: "text", text: "hello" })]); + expect(s.provisional).toEqual([]); + expect(s.accumulating).toBeNull(); + expect(s.sealedTurnId).toBeNull(); + expect(s.committed).toHaveLength(1); + }); + + it("keeps provisional and accumulating when sealedTurnId is null", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "wip")); + s = foldEvent(s, doneEvent("t1")); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]); + expect(s.provisional).toHaveLength(1); + expect(s.committed).toHaveLength(1); + }); + + it("merges new history into existing committed", () => { + let s = initialState(); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "a" })]); + s = applyHistory(s, [storedChunk(2, "assistant", { type: "text", text: "b" })]); + expect(s.committed).toHaveLength(2); + expect(s.committed.map((c) => c.seq)).toEqual([1, 2]); + }); +}); + +describe("selectChunks", () => { + it("selectChunks marks provisional with seq null", () => { + let s = initialState(); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "wip")); + const chunks = selectChunks(s); + expect(chunks).toHaveLength(2); + expect(chunks[0]?.seq).toBe(1); + expect(chunks[0]?.provisional).toBe(false); + expect(chunks[1]?.seq).toBeNull(); + expect(chunks[1]?.provisional).toBe(true); + }); + + it("returns empty for empty state", () => { + expect(selectChunks(initialState())).toEqual([]); + }); + + it("includes accumulating chunk as provisional", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "building...")); + const chunks = selectChunks(s); + expect(chunks).toHaveLength(1); + expect(chunks[0]?.seq).toBeNull(); + expect(chunks[0]?.provisional).toBe(true); + expect(chunks[0]?.chunk).toEqual({ type: "text", text: "building..." }); + }); +}); + +describe("selectMessages", () => { + it("selectMessages groups consecutive same-role chunks", () => { + let s = initialState(); + s = applyHistory(s, [ + storedChunk(1, "user", { type: "text", text: "q1" }), + storedChunk(2, "user", { type: "text", text: "q2" }), + storedChunk(3, "assistant", { type: "text", text: "a1" }), + storedChunk(4, "assistant", { type: "text", text: "a2" }), + storedChunk(5, "user", { type: "text", text: "q3" }), + ]); + const msgs = selectMessages(s); + expect(msgs).toHaveLength(3); + expect(msgs[0]?.role).toBe("user"); + expect(msgs[0]?.chunks).toHaveLength(2); + expect(msgs[1]?.role).toBe("assistant"); + expect(msgs[1]?.chunks).toHaveLength(2); + expect(msgs[2]?.role).toBe("user"); + expect(msgs[2]?.chunks).toHaveLength(1); + }); + + it("returns empty for empty state", () => { + expect(selectMessages(initialState())).toEqual([]); + }); + + it("mixes committed and provisional in messages", () => { + let s = initialState(); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "a1")); + s = foldEvent(s, textDelta("t1", "a2")); + const msgs = selectMessages(s); + expect(msgs).toHaveLength(2); + expect(msgs[0]?.role).toBe("user"); + expect(msgs[0]?.chunks).toHaveLength(1); + expect(msgs[1]?.role).toBe("assistant"); + expect(msgs[1]?.chunks).toHaveLength(1); + expect(msgs[1]?.chunks[0]).toEqual({ type: "text", text: "a1a2" }); + }); +}); diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts new file mode 100644 index 0000000..0a8ea54 --- /dev/null +++ b/src/core/chunks/reducer.ts @@ -0,0 +1,168 @@ +import type { AgentEvent, Chunk, StoredChunk } from "@dispatch/wire"; +import type { AccumulatingChunk, ProvisionalChunk, TranscriptState } from "./types"; + +/** The initial empty transcript state. */ +export function initialState(): TranscriptState { + return { + committed: [], + provisional: [], + accumulating: null, + currentTurnId: null, + latestUsage: null, + sealedTurnId: null, + }; +} + +function flushAccumulating( + provisional: readonly ProvisionalChunk[], + acc: AccumulatingChunk | null, +): readonly ProvisionalChunk[] { + if (acc === null) return provisional; + const chunk: Chunk = + acc.kind === "text" ? { type: "text", text: acc.text } : { type: "thinking", text: acc.text }; + return [...provisional, { role: "assistant", chunk }]; +} + +/** + * Merge authoritative seq-keyed chunks into the committed history. + * Dedupes by seq (new wins), keeps seq-monotonic order, idempotent. + * When sealedTurnId is set, drops all provisional chunks (now superseded) + * and clears sealedTurnId. + */ +export function applyHistory( + state: TranscriptState, + chunks: readonly StoredChunk[], +): TranscriptState { + const seqMap = new Map<number, StoredChunk>(); + for (const c of state.committed) seqMap.set(c.seq, c); + for (const c of chunks) seqMap.set(c.seq, c); + const committed = Array.from(seqMap.values()).sort((a, b) => a.seq - b.seq); + + if (state.sealedTurnId !== null) { + return { + ...state, + committed, + provisional: [], + accumulating: null, + sealedTurnId: null, + }; + } + + return { ...state, committed }; +} + +/** + * Fold one live AgentEvent into the provisional state. + * + * - `turn-start` records the turnId. + * - `text-delta` extends the current accumulating TextChunk (or starts one). + * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one). + * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and + * add a new provisional chunk. + * - `usage` stores the latest Usage. + * - `done` finalizes any accumulating chunk (turn still provisional). + * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId. + * - `status` and `tool-output` are ignored (best-effort no-ops). + */ +export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState { + switch (event.type) { + case "status": + case "tool-output": + return state; + + case "turn-start": + return { ...state, currentTurnId: event.turnId }; + + case "text-delta": { + const acc = state.accumulating; + if (acc !== null && acc.kind === "text") { + return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } }; + } + const provisional = flushAccumulating(state.provisional, acc); + return { + ...state, + provisional, + accumulating: { kind: "text", text: event.delta }, + }; + } + + case "reasoning-delta": { + const acc = state.accumulating; + if (acc !== null && acc.kind === "thinking") { + return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } }; + } + const provisional = flushAccumulating(state.provisional, acc); + return { + ...state, + provisional, + accumulating: { kind: "thinking", text: event.delta }, + }; + } + + case "tool-call": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = { + type: "tool-call", + toolCallId: event.toolCallId, + toolName: event.toolName, + input: event.input, + }; + return { + ...state, + provisional: [...provisional, { role: "assistant", chunk }], + accumulating: null, + }; + } + + case "tool-result": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = { + type: "tool-result", + toolCallId: event.toolCallId, + toolName: event.toolName, + content: event.content, + isError: event.isError, + }; + return { + ...state, + provisional: [...provisional, { role: "tool", chunk }], + accumulating: null, + }; + } + + case "error": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = + event.code !== undefined + ? { type: "error", message: event.message, code: event.code } + : { type: "error", message: event.message }; + return { + ...state, + provisional: [...provisional, { role: "assistant", chunk }], + accumulating: null, + }; + } + + case "usage": + return { ...state, latestUsage: event.usage }; + + case "done": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional, + accumulating: null, + }; + } + + case "turn-sealed": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional, + accumulating: null, + sealedTurnId: event.turnId, + }; + } + } +} diff --git a/src/core/chunks/selectors.ts b/src/core/chunks/selectors.ts new file mode 100644 index 0000000..8fb832f --- /dev/null +++ b/src/core/chunks/selectors.ts @@ -0,0 +1,51 @@ +import type { ChatMessage, Chunk } from "@dispatch/wire"; +import type { RenderedChunk, TranscriptState } from "./types"; + +/** + * Select all chunks for rendering: committed first (seq order), + * then provisional (seq: null). + */ +export function selectChunks(state: TranscriptState): readonly RenderedChunk[] { + const result: RenderedChunk[] = []; + for (const c of state.committed) { + result.push({ seq: c.seq, role: c.role, chunk: c.chunk, provisional: false }); + } + for (const p of state.provisional) { + result.push({ seq: null, role: p.role, chunk: p.chunk, provisional: true }); + } + if (state.accumulating !== null) { + const chunk: Chunk = + state.accumulating.kind === "text" + ? { type: "text", text: state.accumulating.text } + : { type: "thinking", text: state.accumulating.text }; + result.push({ seq: null, role: "assistant", chunk, provisional: true }); + } + return result; +} + +/** + * Group consecutive same-role rendered chunks into ChatMessages. + */ +export function selectMessages(state: TranscriptState): readonly ChatMessage[] { + const rendered = selectChunks(state); + const first = rendered[0]; + if (first === undefined) return []; + + const messages: ChatMessage[] = []; + let role = first.role; + let chunks: Chunk[] = [first.chunk]; + + for (let i = 1; i < rendered.length; i++) { + const rc = rendered[i]; + if (rc === undefined) continue; + if (rc.role === role) { + chunks.push(rc.chunk); + } else { + messages.push({ role, chunks }); + role = rc.role; + chunks = [rc.chunk]; + } + } + messages.push({ role, chunks }); + return messages; +} diff --git a/src/core/chunks/types.ts b/src/core/chunks/types.ts new file mode 100644 index 0000000..3792445 --- /dev/null +++ b/src/core/chunks/types.ts @@ -0,0 +1,31 @@ +import type { Chunk, Role, StoredChunk, Usage } from "@dispatch/wire"; + +/** A chunk being accumulated from streaming deltas (text or thinking). */ +export interface AccumulatingChunk { + readonly kind: "text" | "thinking"; + readonly text: string; +} + +/** A provisional chunk that has no authoritative seq yet. */ +export interface ProvisionalChunk { + readonly role: Role; + readonly chunk: Chunk; +} + +/** The transcript reducer state. Holds committed history + live in-flight turn. */ +export interface TranscriptState { + readonly committed: readonly StoredChunk[]; + readonly provisional: readonly ProvisionalChunk[]; + readonly accumulating: AccumulatingChunk | null; + readonly currentTurnId: string | null; + readonly latestUsage: Usage | null; + readonly sealedTurnId: string | null; +} + +/** A chunk ready for rendering: either committed (with seq) or provisional. */ +export interface RenderedChunk { + readonly seq: number | null; + readonly role: Role; + readonly chunk: Chunk; + readonly provisional: boolean; +} diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts new file mode 100644 index 0000000..c0f276f --- /dev/null +++ b/src/core/wire/conformance.test.ts @@ -0,0 +1,181 @@ +import type { ChatSendMessage, ConversationHistoryResponse } from "@dispatch/transport-contract"; +import type { AgentEvent, StoredChunk } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { + assertAgentEventExhaustive, + assertChunkExhaustive, + assertWsClientMessageExhaustive, + assertWsServerMessageExhaustive, +} from "./conformance"; + +describe("StoredChunk round-trips JSON", () => { + it("preserves shape through JSON serialize/deserialize", () => { + const original: StoredChunk = { + seq: 42, + role: "assistant", + chunk: { type: "text", text: "hello" }, + }; + const roundTripped: StoredChunk = JSON.parse(JSON.stringify(original)) as StoredChunk; + expect(roundTripped).toEqual(original); + expect(roundTripped.seq).toBe(42); + expect(roundTripped.role).toBe("assistant"); + expect(roundTripped.chunk.type).toBe("text"); + }); +}); + +describe("classifies every AgentEvent type", () => { + const samples: AgentEvent[] = [ + { type: "status", conversationId: "c1", status: "idle" }, + { type: "turn-start", conversationId: "c1", turnId: "t1" }, + { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" }, + { type: "reasoning-delta", conversationId: "c1", turnId: "t1", delta: "thinking" }, + { + type: "tool-call", + conversationId: "c1", + turnId: "t1", + toolCallId: "tc1", + toolName: "read", + input: {}, + }, + { + type: "tool-result", + conversationId: "c1", + turnId: "t1", + toolCallId: "tc1", + toolName: "read", + content: "ok", + isError: false, + }, + { + type: "tool-output", + conversationId: "c1", + turnId: "t1", + toolCallId: "tc1", + data: "out", + stream: "stdout", + }, + { + type: "usage", + conversationId: "c1", + turnId: "t1", + usage: { inputTokens: 10, outputTokens: 20 }, + }, + { type: "error", conversationId: "c1", turnId: "t1", message: "oops" }, + { type: "done", conversationId: "c1", turnId: "t1", reason: "complete" }, + { type: "turn-sealed", conversationId: "c1", turnId: "t1" }, + ]; + + it("returns a stable label for every AgentEvent.type variant", () => { + const labels = samples.map(assertAgentEventExhaustive); + expect(labels).toEqual([ + "status", + "turn-start", + "text-delta", + "reasoning-delta", + "tool-call", + "tool-result", + "tool-output", + "usage", + "error", + "done", + "turn-sealed", + ]); + }); + + it("covers all 11 AgentEvent variants", () => { + expect(samples).toHaveLength(11); + }); +}); + +describe("classifies every Chunk type", () => { + it("returns a stable label for each Chunk.type variant", () => { + const chunks = [ + { type: "text" as const, text: "a" }, + { type: "thinking" as const, text: "b" }, + { type: "tool-call" as const, toolCallId: "tc", toolName: "n", input: null }, + { + type: "tool-result" as const, + toolCallId: "tc", + toolName: "n", + content: "c", + isError: false, + }, + { type: "error" as const, message: "e" }, + { type: "system" as const, text: "s" }, + ]; + const labels = chunks.map(assertChunkExhaustive); + expect(labels).toEqual(["text", "thinking", "tool-call", "tool-result", "error", "system"]); + }); +}); + +describe("classifies every WsServerMessage type", () => { + it("returns a stable label for each variant", () => { + const msgs = [ + { type: "catalog" as const, catalog: [] }, + { type: "surface" as const, spec: { id: "s", region: "r", title: "S", fields: [] } }, + { + type: "update" as const, + update: { surfaceId: "s", spec: { id: "s", region: "r", title: "S", fields: [] } }, + }, + { type: "error" as const, message: "e" }, + { + type: "chat.delta" as const, + event: { type: "done" as const, conversationId: "c", turnId: "t", reason: "r" }, + }, + { type: "chat.error" as const, message: "e" }, + ]; + const labels = msgs.map(assertWsServerMessageExhaustive); + expect(labels).toEqual(["catalog", "surface", "update", "error", "chat.delta", "chat.error"]); + }); +}); + +describe("classifies every WsClientMessage type", () => { + it("returns a stable label for each variant", () => { + const msgs = [ + { type: "subscribe" as const, surfaceId: "s" }, + { type: "unsubscribe" as const, surfaceId: "s" }, + { type: "invoke" as const, surfaceId: "s", actionId: "a" }, + { type: "chat.send" as const, message: "hi" }, + ]; + const labels = msgs.map(assertWsClientMessageExhaustive); + expect(labels).toEqual(["subscribe", "unsubscribe", "invoke", "chat.send"]); + }); +}); + +describe("ChatSendMessage shape is constructible", () => { + it("constructs a minimal ChatSendMessage", () => { + const msg: ChatSendMessage = { type: "chat.send", message: "hello" }; + expect(msg.type).toBe("chat.send"); + expect(msg.message).toBe("hello"); + }); + + it("constructs a full ChatSendMessage", () => { + const msg: ChatSendMessage = { + type: "chat.send", + conversationId: "c1", + message: "hello", + model: "default/gpt-4", + cwd: "/tmp", + }; + expect(msg.conversationId).toBe("c1"); + expect(msg.model).toBe("default/gpt-4"); + expect(msg.cwd).toBe("/tmp"); + }); +}); + +describe("ConversationHistoryResponse shape is constructible", () => { + it("constructs a response with chunks", () => { + const resp: ConversationHistoryResponse = { + chunks: [{ seq: 1, role: "user", chunk: { type: "text", text: "hi" } }], + latestSeq: 1, + }; + expect(resp.chunks).toHaveLength(1); + expect(resp.latestSeq).toBe(1); + }); + + it("constructs an empty (caught-up) response", () => { + const resp: ConversationHistoryResponse = { chunks: [], latestSeq: 5 }; + expect(resp.chunks).toHaveLength(0); + expect(resp.latestSeq).toBe(5); + }); +}); diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts new file mode 100644 index 0000000..5d75a60 --- /dev/null +++ b/src/core/wire/conformance.ts @@ -0,0 +1,100 @@ +import type { WsClientMessage, WsServerMessage } from "@dispatch/transport-contract"; +import type { AgentEvent, Chunk } from "@dispatch/wire"; + +/** + * Compile-time exhaustiveness guard for `AgentEvent.type`. + * If a variant is added/removed/renamed in `@dispatch/wire`, this function's + * default branch becomes reachable → TypeScript error at build time. + */ +export function assertAgentEventExhaustive(event: AgentEvent): string { + switch (event.type) { + case "status": + return "status"; + case "turn-start": + return "turn-start"; + case "text-delta": + return "text-delta"; + case "reasoning-delta": + return "reasoning-delta"; + case "tool-call": + return "tool-call"; + case "tool-result": + return "tool-result"; + case "tool-output": + return "tool-output"; + case "usage": + return "usage"; + case "error": + return "error"; + case "done": + return "done"; + case "turn-sealed": + return "turn-sealed"; + default: + return event satisfies never; + } +} + +/** + * Compile-time exhaustiveness guard for `Chunk.type`. + */ +export function assertChunkExhaustive(chunk: Chunk): string { + switch (chunk.type) { + case "text": + return "text"; + case "thinking": + return "thinking"; + case "tool-call": + return "tool-call"; + case "tool-result": + return "tool-result"; + case "error": + return "error"; + case "system": + return "system"; + default: + return chunk satisfies never; + } +} + +/** + * Compile-time exhaustiveness guard for `WsServerMessage.type`. + * Covers both surface ops and chat ops. + */ +export function assertWsServerMessageExhaustive(msg: WsServerMessage): string { + switch (msg.type) { + case "catalog": + return "catalog"; + case "surface": + return "surface"; + case "update": + return "update"; + case "error": + return "error"; + case "chat.delta": + return "chat.delta"; + case "chat.error": + return "chat.error"; + default: + return msg satisfies never; + } +} + +/** + * Compile-time exhaustiveness guard for `WsClientMessage.type`. + * Covers both surface ops and chat ops. + */ +export function assertWsClientMessageExhaustive(msg: WsClientMessage): string { + switch (msg.type) { + case "subscribe": + return "subscribe"; + case "unsubscribe": + return "unsubscribe"; + case "invoke": + return "invoke"; + case "chat.send": + return "chat.send"; + default: + return msg satisfies never; + } +} diff --git a/src/core/wire/index.ts b/src/core/wire/index.ts new file mode 100644 index 0000000..ae6b3e6 --- /dev/null +++ b/src/core/wire/index.ts @@ -0,0 +1,6 @@ +export { + assertAgentEventExhaustive, + assertChunkExhaustive, + assertWsClientMessageExhaustive, + assertWsServerMessageExhaustive, +} from "./conformance"; diff --git a/src/features/conversation-cache/cache.test.ts b/src/features/conversation-cache/cache.test.ts new file mode 100644 index 0000000..c68ed0d --- /dev/null +++ b/src/features/conversation-cache/cache.test.ts @@ -0,0 +1,173 @@ +import type { StoredChunk } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { createConversationCache } from "./cache"; +import type { ConversationCacheIndexEntry, ConversationChunkStore } from "./types"; + +const chunk = (seq: number, role: "user" | "assistant" = "user"): StoredChunk => ({ + seq, + role, + chunk: { type: "text", text: `chunk-${seq}` }, +}); + +/** + * In-memory fake ConversationChunkStore — the ONLY allowed fake. + * An outermost edge: simulates the storage port without any real I/O. + */ +function createFakeStore(): ConversationChunkStore { + const store = new Map<string, StoredChunk[]>(); + + return { + async load(conversationId) { + return store.get(conversationId) ?? []; + }, + + async append(conversationId, chunks) { + const existing = store.get(conversationId) ?? []; + const existingSeqs = new Set(existing.map((c) => c.seq)); + const toAdd = chunks.filter((c) => !existingSeqs.has(c.seq)); + store.set( + conversationId, + [...existing, ...toAdd].sort((a, b) => a.seq - b.seq), + ); + }, + + async delete(conversationId) { + store.delete(conversationId); + }, + + async index() { + const entries: ConversationCacheIndexEntry[] = []; + for (const [id, chunks] of store) { + if (chunks.length === 0) continue; + let maxSeq = 0; + for (const c of chunks) { + if (c.seq > maxSeq) maxSeq = c.seq; + } + entries.push({ + conversationId: id, + chunkCount: chunks.length, + maxSeq, + }); + } + return entries; + }, + }; +} + +describe("cache.load", () => { + it("returns stored chunks", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + await store.append("conv-1", [chunk(1), chunk(2)]); + const result = await cache.load("conv-1"); + expect(result).toEqual([chunk(1), chunk(2)]); + }); + + it("returns empty array for absent conversation", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + const result = await cache.load("nonexistent"); + expect(result).toEqual([]); + }); +}); + +describe("cache.commit", () => { + it("appends only new chunks", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + await store.append("conv-1", [chunk(1), chunk(2)]); + + const merged = await cache.commit("conv-1", [chunk(2), chunk(3)]); + expect(merged).toEqual([chunk(1), chunk(2), chunk(3)]); + + // Verify store has all chunks + const stored = await store.load("conv-1"); + expect(stored).toEqual([chunk(1), chunk(2), chunk(3)]); + }); + + it("returns full merged result", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + + const merged = await cache.commit("conv-1", [chunk(3), chunk(1)]); + expect(merged).toEqual([chunk(1), chunk(3)]); + }); + + it("is idempotent — re-committing same chunks is a no-op", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + + await cache.commit("conv-1", [chunk(1), chunk(2)]); + const merged = await cache.commit("conv-1", [chunk(1), chunk(2)]); + expect(merged).toEqual([chunk(1), chunk(2)]); + + const stored = await store.load("conv-1"); + expect(stored).toEqual([chunk(1), chunk(2)]); + }); +}); + +describe("cache.sinceSeq", () => { + it("returns max seq from cache", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + await store.append("conv-1", [chunk(1), chunk(5), chunk(3)]); + expect(await cache.sinceSeq("conv-1")).toBe(5); + }); + + it("returns 0 for empty conversation", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + expect(await cache.sinceSeq("conv-1")).toBe(0); + }); +}); + +describe("cache.evictIfOverBudget", () => { + it("deletes selected conversations", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store, { maxChunks: 5 }); + + await store.append("a", [chunk(1), chunk(2)]); + await store.append("b", [chunk(1), chunk(2)]); + await store.append("c", [chunk(1)]); + + // Total = 5, max = 5, under budget + const evicted = await cache.evictIfOverBudget(null); + expect(evicted).toEqual([]); + + // Add more to go over budget + await store.append("d", [chunk(1), chunk(2), chunk(3)]); + // Total = 8, max = 5, need to evict 3+ chunks + + const evicted2 = await cache.evictIfOverBudget(null); + expect(evicted2.length).toBeGreaterThan(0); + + // Verify evicted conversations are deleted + for (const id of evicted2) { + expect(await store.load(id)).toEqual([]); + } + }); + + it("never evicts the active conversation", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store, { maxChunks: 3 }); + + await store.append("active", [chunk(1), chunk(2), chunk(3)]); + await store.append("other", [chunk(1), chunk(2)]); + + // Total = 5, max = 3, need to evict 2+ chunks + const evicted = await cache.evictIfOverBudget("active"); + expect(evicted).not.toContain("active"); + expect(evicted).toContain("other"); + }); + + it("returns empty when under budget", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store, { maxChunks: 100 }); + + await store.append("a", [chunk(1)]); + await store.append("b", [chunk(1)]); + + const evicted = await cache.evictIfOverBudget(null); + expect(evicted).toEqual([]); + }); +}); diff --git a/src/features/conversation-cache/cache.ts b/src/features/conversation-cache/cache.ts new file mode 100644 index 0000000..4aab487 --- /dev/null +++ b/src/features/conversation-cache/cache.ts @@ -0,0 +1,71 @@ +import type { StoredChunk } from "@dispatch/wire"; +import { nextSinceSeq, reconcileCache, selectEvictions } from "./logic"; +import type { ConversationChunkStore } from "./types"; + +export interface ConversationCache { + /** Load all cached chunks for a conversation. */ + load(conversationId: string): Promise<readonly StoredChunk[]>; + + /** + * Load + reconcile + append new chunks. + * Returns the merged cache (the new authoritative cache for this conversation). + */ + commit(conversationId: string, incoming: readonly StoredChunk[]): Promise<readonly StoredChunk[]>; + + /** Return the `?sinceSeq=` cursor for the next incremental sync. */ + sinceSeq(conversationId: string): Promise<number>; + + /** + * Evict conversations over budget. + * Returns the evicted conversationIds. + */ + evictIfOverBudget(activeConversationId: string | null): Promise<readonly string[]>; +} + +export interface ConversationCacheOptions { + /** Maximum total chunks across all conversations before eviction triggers. */ + readonly maxChunks?: number; +} + +const DEFAULT_MAX_CHUNKS = 10_000; + +/** + * Create a conversation cache backed by the injected storage port. + * + * The ONLY impurity is the injected `store`; all logic delegates to pure functions. + */ +export function createConversationCache( + store: ConversationChunkStore, + opts?: ConversationCacheOptions, +): ConversationCache { + const maxChunks = opts?.maxChunks ?? DEFAULT_MAX_CHUNKS; + + return { + async load(conversationId) { + return store.load(conversationId); + }, + + async commit(conversationId, incoming) { + const cached = await store.load(conversationId); + const { merged, toAppend } = reconcileCache(cached, incoming); + if (toAppend.length > 0) { + await store.append(conversationId, toAppend); + } + return merged; + }, + + async sinceSeq(conversationId) { + const cached = await store.load(conversationId); + return nextSinceSeq(cached); + }, + + async evictIfOverBudget(activeConversationId) { + const idx = await store.index(); + const toEvict = selectEvictions(idx, { maxChunks, activeConversationId }); + for (const id of toEvict) { + await store.delete(id); + } + return toEvict; + }, + }; +} diff --git a/src/features/conversation-cache/index.ts b/src/features/conversation-cache/index.ts new file mode 100644 index 0000000..ba3f69a --- /dev/null +++ b/src/features/conversation-cache/index.ts @@ -0,0 +1,8 @@ +export type { ConversationCache, ConversationCacheOptions } from "./cache"; +export { createConversationCache } from "./cache"; +export { nextSinceSeq, reconcileCache, selectEvictions } from "./logic"; +export type { + ConversationCacheIndexEntry, + ConversationChunkStore, + ReconcileResult, +} from "./types"; diff --git a/src/features/conversation-cache/logic.test.ts b/src/features/conversation-cache/logic.test.ts new file mode 100644 index 0000000..858460a --- /dev/null +++ b/src/features/conversation-cache/logic.test.ts @@ -0,0 +1,140 @@ +import type { StoredChunk } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { nextSinceSeq, reconcileCache, selectEvictions } from "./logic"; +import type { ConversationCacheIndexEntry } from "./types"; + +const chunk = (seq: number, role: "user" | "assistant" = "user"): StoredChunk => ({ + seq, + role, + chunk: { type: "text", text: `chunk-${seq}` }, +}); + +describe("reconcileCache", () => { + it("merges and dedupes by seq", () => { + const cached = [chunk(1), chunk(2)]; + const incoming = [chunk(2), chunk(3)]; + const result = reconcileCache(cached, incoming); + expect(result.merged).toEqual([chunk(1), chunk(2), chunk(3)]); + }); + + it("toAppend excludes already-cached seqs", () => { + const cached = [chunk(1), chunk(2)]; + const incoming = [chunk(2), chunk(3)]; + const result = reconcileCache(cached, incoming); + expect(result.toAppend).toEqual([chunk(3)]); + }); + + it("tolerates out-of-order incoming", () => { + const cached = [chunk(1)]; + const incoming = [chunk(5), chunk(3), chunk(2)]; + const result = reconcileCache(cached, incoming); + expect(result.merged).toEqual([chunk(1), chunk(2), chunk(3), chunk(5)]); + expect(result.toAppend).toEqual([chunk(5), chunk(3), chunk(2)]); + }); + + it("returns empty merged and toAppend when both inputs are empty", () => { + const result = reconcileCache([], []); + expect(result.merged).toEqual([]); + expect(result.toAppend).toEqual([]); + }); + + it("handles empty cached with incoming", () => { + const incoming = [chunk(3), chunk(1)]; + const result = reconcileCache([], incoming); + expect(result.merged).toEqual([chunk(1), chunk(3)]); + expect(result.toAppend).toEqual([chunk(3), chunk(1)]); + }); + + it("handles cached with empty incoming", () => { + const cached = [chunk(1), chunk(2)]; + const result = reconcileCache(cached, []); + expect(result.merged).toEqual([chunk(1), chunk(2)]); + expect(result.toAppend).toEqual([]); + }); + + it("is idempotent — re-reconciling same incoming produces same result", () => { + const cached = [chunk(1)]; + const incoming = [chunk(2), chunk(3)]; + const first = reconcileCache(cached, incoming); + const second = reconcileCache(first.merged, incoming); + expect(second.merged).toEqual(first.merged); + expect(second.toAppend).toEqual([]); + }); +}); + +describe("nextSinceSeq", () => { + it("returns max seq", () => { + const cached = [chunk(1), chunk(5), chunk(3)]; + expect(nextSinceSeq(cached)).toBe(5); + }); + + it("returns 0 when empty", () => { + expect(nextSinceSeq([])).toBe(0); + }); + + it("returns single seq for single chunk", () => { + expect(nextSinceSeq([chunk(42)])).toBe(42); + }); +}); + +describe("selectEvictions", () => { + it("never evicts the active conversation", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "active", chunkCount: 100, maxSeq: 100, lastAccess: 1000 }, + { conversationId: "other", chunkCount: 50, maxSeq: 50, lastAccess: 1 }, + ]; + const result = selectEvictions(index, { maxChunks: 50, activeConversationId: "active" }); + expect(result).not.toContain("active"); + expect(result).toContain("other"); + }); + + it("evicts LRU until under budget", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 30, maxSeq: 30, lastAccess: 100 }, + { conversationId: "b", chunkCount: 30, maxSeq: 30, lastAccess: 50 }, + { conversationId: "c", chunkCount: 30, maxSeq: 30, lastAccess: 200 }, + { conversationId: "d", chunkCount: 30, maxSeq: 30, lastAccess: 10 }, + ]; + // Total = 120, max = 60, need to evict 60+ chunks + // LRU order: d(10), b(50), a(100), c(200) + const result = selectEvictions(index, { maxChunks: 60, activeConversationId: null }); + expect(result).toEqual(["d", "b"]); + }); + + it("is a no-op under budget", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 10, maxSeq: 10, lastAccess: 100 }, + { conversationId: "b", chunkCount: 10, maxSeq: 10, lastAccess: 50 }, + ]; + const result = selectEvictions(index, { maxChunks: 100, activeConversationId: null }); + expect(result).toEqual([]); + }); + + it("returns empty for empty index", () => { + const result = selectEvictions([], { maxChunks: 100, activeConversationId: null }); + expect(result).toEqual([]); + }); + + it("tie-breaks by smaller maxSeq when lastAccess is equal", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 30, maxSeq: 100, lastAccess: 50 }, + { conversationId: "b", chunkCount: 30, maxSeq: 50, lastAccess: 50 }, + { conversationId: "c", chunkCount: 30, maxSeq: 200, lastAccess: 50 }, + ]; + // Total = 90, max = 60, need to evict 30+ chunks + // All have same lastAccess, tie-break by maxSeq: b(50), a(100), c(200) + const result = selectEvictions(index, { maxChunks: 60, activeConversationId: null }); + expect(result).toEqual(["b"]); + }); + + it("handles missing lastAccess (treated as 0)", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 30, maxSeq: 30, lastAccess: 100 }, + { conversationId: "b", chunkCount: 30, maxSeq: 30 }, + ]; + // Total = 60, max = 30, need to evict 30+ chunks + // b has no lastAccess (0), a has 100 + const result = selectEvictions(index, { maxChunks: 30, activeConversationId: null }); + expect(result).toEqual(["b"]); + }); +}); diff --git a/src/features/conversation-cache/logic.ts b/src/features/conversation-cache/logic.ts new file mode 100644 index 0000000..4a4479e --- /dev/null +++ b/src/features/conversation-cache/logic.ts @@ -0,0 +1,77 @@ +import type { StoredChunk } from "@dispatch/wire"; +import type { ConversationCacheIndexEntry, ReconcileResult } from "./types"; + +/** + * Merge authoritative seq-keyed chunks with cached chunks. + * + * Deduplicates by `seq`, produces seq-monotonic order. + * `toAppend` = the incoming chunks whose `seq` is not already in `cached` + * (exactly what to persist). Idempotent; tolerant of out-of-order/overlapping `incoming`. + */ +export function reconcileCache( + cached: readonly StoredChunk[], + incoming: readonly StoredChunk[], +): ReconcileResult { + const seen = new Set<number>(); + for (const chunk of cached) { + seen.add(chunk.seq); + } + + const toAppend: StoredChunk[] = []; + for (const chunk of incoming) { + if (!seen.has(chunk.seq)) { + toAppend.push(chunk); + seen.add(chunk.seq); + } + } + + const merged = [...cached, ...toAppend].sort((a, b) => a.seq - b.seq); + return { merged, toAppend }; +} + +/** + * Return the max committed `seq`, or `0` if empty. + * This is the `?sinceSeq=` cursor for the next incremental sync. + */ +export function nextSinceSeq(cached: readonly StoredChunk[]): number { + if (cached.length === 0) return 0; + let max = 0; + for (const chunk of cached) { + if (chunk.seq > max) max = chunk.seq; + } + return max; +} + +/** + * Choose conversationIds to evict to get total cached chunks under `maxChunks`. + * + * LRU eviction: oldest `lastAccess` first, tie-break smaller `maxSeq`. + * NEVER evicts the `activeConversationId`. + * Returns [] when under budget. + */ +export function selectEvictions( + index: readonly ConversationCacheIndexEntry[], + opts: { maxChunks: number; activeConversationId: string | null }, +): readonly string[] { + const totalChunks = index.reduce((sum, entry) => sum + entry.chunkCount, 0); + if (totalChunks <= opts.maxChunks) return []; + + const candidates = index + .filter((entry) => entry.conversationId !== opts.activeConversationId) + .sort((a, b) => { + const aAccess = a.lastAccess ?? 0; + const bAccess = b.lastAccess ?? 0; + if (aAccess !== bAccess) return aAccess - bAccess; + return a.maxSeq - b.maxSeq; + }); + + let remaining = totalChunks; + const evictions: string[] = []; + for (const entry of candidates) { + if (remaining <= opts.maxChunks) break; + evictions.push(entry.conversationId); + remaining -= entry.chunkCount; + } + + return evictions; +} diff --git a/src/features/conversation-cache/types.ts b/src/features/conversation-cache/types.ts new file mode 100644 index 0000000..2a349cc --- /dev/null +++ b/src/features/conversation-cache/types.ts @@ -0,0 +1,42 @@ +import type { StoredChunk } from "@dispatch/wire"; + +/** Metadata entry for a cached conversation, used by eviction logic. */ +export interface ConversationCacheIndexEntry { + readonly conversationId: string; + readonly chunkCount: number; + readonly maxSeq: number; + readonly lastAccess?: number; +} + +/** + * Storage port for conversation chunk persistence. + * + * The IndexedDB implementation lives in `src/adapters/idb/` (separate unit); + * this interface is the contract the cache logic depends on. + * + * All methods MUST be idempotent on `seq`: re-appending an existing seq is a no-op. + */ +export interface ConversationChunkStore { + /** Load all cached chunks for a conversation, seq-ordered. Returns [] if absent. */ + load(conversationId: string): Promise<readonly StoredChunk[]>; + + /** + * Append committed chunks to a conversation's cache. + * MUST be idempotent on `seq`: re-appending an existing seq is a no-op. + */ + append(conversationId: string, chunks: readonly StoredChunk[]): Promise<void>; + + /** Delete all cached data for a conversation. */ + delete(conversationId: string): Promise<void>; + + /** Return metadata for all cached conversations (for eviction). */ + index(): Promise<readonly ConversationCacheIndexEntry[]>; +} + +/** Result of reconciling cached chunks with incoming authoritative chunks. */ +export interface ReconcileResult { + /** The merged, deduplicated, seq-ordered chunk list. */ + readonly merged: readonly StoredChunk[]; + /** The subset of incoming chunks that need to be appended (not already cached). */ + readonly toAppend: readonly StoredChunk[]; +} |
