diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 00:02:32 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 00:02:32 +0900 |
| commit | 5d9ae1849337b64af1b0d47c23b8c4950a55f792 (patch) | |
| tree | dd5fccaff7535bf1216457a986b8f95bd14fd61e /src/features/conversation-cache | |
| parent | fac44794432928d0341728642fd70eef87837da4 (diff) | |
| download | dispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.tar.gz dispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.zip | |
Slice 2 wave 1: transcript reducer, wire conformance, ws chat, cache core
- core/chunks: the one pure transcript reducer (foldEvent live deltas +
applyHistory seq-keyed reconcile + selectChunks/selectMessages); 27 tests
- core/wire: FE-side contract-conformance exhaustiveness guards + drift smoke
tests over wire/transport-contract unions (ยง2.9 drift signal); 10 tests
- adapters/ws: additively multiplex chat.send/chat.delta/chat.error on the
existing surface socket (onChat + widened send); surface API unchanged
- features/conversation-cache: pure reconcileCache/nextSinceSeq/selectEvictions
+ ConversationChunkStore port + injected createConversationCache; 26 tests
Verified green: svelte-check 0/0, vitest 169, biome clean, build ok.
Diffstat (limited to 'src/features/conversation-cache')
| -rw-r--r-- | src/features/conversation-cache/cache.test.ts | 173 | ||||
| -rw-r--r-- | src/features/conversation-cache/cache.ts | 71 | ||||
| -rw-r--r-- | src/features/conversation-cache/index.ts | 8 | ||||
| -rw-r--r-- | src/features/conversation-cache/logic.test.ts | 140 | ||||
| -rw-r--r-- | src/features/conversation-cache/logic.ts | 77 | ||||
| -rw-r--r-- | src/features/conversation-cache/types.ts | 42 |
6 files changed, 511 insertions, 0 deletions
diff --git a/src/features/conversation-cache/cache.test.ts b/src/features/conversation-cache/cache.test.ts new file mode 100644 index 0000000..c68ed0d --- /dev/null +++ b/src/features/conversation-cache/cache.test.ts @@ -0,0 +1,173 @@ +import type { StoredChunk } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { createConversationCache } from "./cache"; +import type { ConversationCacheIndexEntry, ConversationChunkStore } from "./types"; + +const chunk = (seq: number, role: "user" | "assistant" = "user"): StoredChunk => ({ + seq, + role, + chunk: { type: "text", text: `chunk-${seq}` }, +}); + +/** + * In-memory fake ConversationChunkStore โ the ONLY allowed fake. + * An outermost edge: simulates the storage port without any real I/O. + */ +function createFakeStore(): ConversationChunkStore { + const store = new Map<string, StoredChunk[]>(); + + return { + async load(conversationId) { + return store.get(conversationId) ?? []; + }, + + async append(conversationId, chunks) { + const existing = store.get(conversationId) ?? []; + const existingSeqs = new Set(existing.map((c) => c.seq)); + const toAdd = chunks.filter((c) => !existingSeqs.has(c.seq)); + store.set( + conversationId, + [...existing, ...toAdd].sort((a, b) => a.seq - b.seq), + ); + }, + + async delete(conversationId) { + store.delete(conversationId); + }, + + async index() { + const entries: ConversationCacheIndexEntry[] = []; + for (const [id, chunks] of store) { + if (chunks.length === 0) continue; + let maxSeq = 0; + for (const c of chunks) { + if (c.seq > maxSeq) maxSeq = c.seq; + } + entries.push({ + conversationId: id, + chunkCount: chunks.length, + maxSeq, + }); + } + return entries; + }, + }; +} + +describe("cache.load", () => { + it("returns stored chunks", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + await store.append("conv-1", [chunk(1), chunk(2)]); + const result = await cache.load("conv-1"); + expect(result).toEqual([chunk(1), chunk(2)]); + }); + + it("returns empty array for absent conversation", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + const result = await cache.load("nonexistent"); + expect(result).toEqual([]); + }); +}); + +describe("cache.commit", () => { + it("appends only new chunks", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + await store.append("conv-1", [chunk(1), chunk(2)]); + + const merged = await cache.commit("conv-1", [chunk(2), chunk(3)]); + expect(merged).toEqual([chunk(1), chunk(2), chunk(3)]); + + // Verify store has all chunks + const stored = await store.load("conv-1"); + expect(stored).toEqual([chunk(1), chunk(2), chunk(3)]); + }); + + it("returns full merged result", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + + const merged = await cache.commit("conv-1", [chunk(3), chunk(1)]); + expect(merged).toEqual([chunk(1), chunk(3)]); + }); + + it("is idempotent โ re-committing same chunks is a no-op", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + + await cache.commit("conv-1", [chunk(1), chunk(2)]); + const merged = await cache.commit("conv-1", [chunk(1), chunk(2)]); + expect(merged).toEqual([chunk(1), chunk(2)]); + + const stored = await store.load("conv-1"); + expect(stored).toEqual([chunk(1), chunk(2)]); + }); +}); + +describe("cache.sinceSeq", () => { + it("returns max seq from cache", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + await store.append("conv-1", [chunk(1), chunk(5), chunk(3)]); + expect(await cache.sinceSeq("conv-1")).toBe(5); + }); + + it("returns 0 for empty conversation", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store); + expect(await cache.sinceSeq("conv-1")).toBe(0); + }); +}); + +describe("cache.evictIfOverBudget", () => { + it("deletes selected conversations", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store, { maxChunks: 5 }); + + await store.append("a", [chunk(1), chunk(2)]); + await store.append("b", [chunk(1), chunk(2)]); + await store.append("c", [chunk(1)]); + + // Total = 5, max = 5, under budget + const evicted = await cache.evictIfOverBudget(null); + expect(evicted).toEqual([]); + + // Add more to go over budget + await store.append("d", [chunk(1), chunk(2), chunk(3)]); + // Total = 8, max = 5, need to evict 3+ chunks + + const evicted2 = await cache.evictIfOverBudget(null); + expect(evicted2.length).toBeGreaterThan(0); + + // Verify evicted conversations are deleted + for (const id of evicted2) { + expect(await store.load(id)).toEqual([]); + } + }); + + it("never evicts the active conversation", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store, { maxChunks: 3 }); + + await store.append("active", [chunk(1), chunk(2), chunk(3)]); + await store.append("other", [chunk(1), chunk(2)]); + + // Total = 5, max = 3, need to evict 2+ chunks + const evicted = await cache.evictIfOverBudget("active"); + expect(evicted).not.toContain("active"); + expect(evicted).toContain("other"); + }); + + it("returns empty when under budget", async () => { + const store = createFakeStore(); + const cache = createConversationCache(store, { maxChunks: 100 }); + + await store.append("a", [chunk(1)]); + await store.append("b", [chunk(1)]); + + const evicted = await cache.evictIfOverBudget(null); + expect(evicted).toEqual([]); + }); +}); diff --git a/src/features/conversation-cache/cache.ts b/src/features/conversation-cache/cache.ts new file mode 100644 index 0000000..4aab487 --- /dev/null +++ b/src/features/conversation-cache/cache.ts @@ -0,0 +1,71 @@ +import type { StoredChunk } from "@dispatch/wire"; +import { nextSinceSeq, reconcileCache, selectEvictions } from "./logic"; +import type { ConversationChunkStore } from "./types"; + +export interface ConversationCache { + /** Load all cached chunks for a conversation. */ + load(conversationId: string): Promise<readonly StoredChunk[]>; + + /** + * Load + reconcile + append new chunks. + * Returns the merged cache (the new authoritative cache for this conversation). + */ + commit(conversationId: string, incoming: readonly StoredChunk[]): Promise<readonly StoredChunk[]>; + + /** Return the `?sinceSeq=` cursor for the next incremental sync. */ + sinceSeq(conversationId: string): Promise<number>; + + /** + * Evict conversations over budget. + * Returns the evicted conversationIds. + */ + evictIfOverBudget(activeConversationId: string | null): Promise<readonly string[]>; +} + +export interface ConversationCacheOptions { + /** Maximum total chunks across all conversations before eviction triggers. */ + readonly maxChunks?: number; +} + +const DEFAULT_MAX_CHUNKS = 10_000; + +/** + * Create a conversation cache backed by the injected storage port. + * + * The ONLY impurity is the injected `store`; all logic delegates to pure functions. + */ +export function createConversationCache( + store: ConversationChunkStore, + opts?: ConversationCacheOptions, +): ConversationCache { + const maxChunks = opts?.maxChunks ?? DEFAULT_MAX_CHUNKS; + + return { + async load(conversationId) { + return store.load(conversationId); + }, + + async commit(conversationId, incoming) { + const cached = await store.load(conversationId); + const { merged, toAppend } = reconcileCache(cached, incoming); + if (toAppend.length > 0) { + await store.append(conversationId, toAppend); + } + return merged; + }, + + async sinceSeq(conversationId) { + const cached = await store.load(conversationId); + return nextSinceSeq(cached); + }, + + async evictIfOverBudget(activeConversationId) { + const idx = await store.index(); + const toEvict = selectEvictions(idx, { maxChunks, activeConversationId }); + for (const id of toEvict) { + await store.delete(id); + } + return toEvict; + }, + }; +} diff --git a/src/features/conversation-cache/index.ts b/src/features/conversation-cache/index.ts new file mode 100644 index 0000000..ba3f69a --- /dev/null +++ b/src/features/conversation-cache/index.ts @@ -0,0 +1,8 @@ +export type { ConversationCache, ConversationCacheOptions } from "./cache"; +export { createConversationCache } from "./cache"; +export { nextSinceSeq, reconcileCache, selectEvictions } from "./logic"; +export type { + ConversationCacheIndexEntry, + ConversationChunkStore, + ReconcileResult, +} from "./types"; diff --git a/src/features/conversation-cache/logic.test.ts b/src/features/conversation-cache/logic.test.ts new file mode 100644 index 0000000..858460a --- /dev/null +++ b/src/features/conversation-cache/logic.test.ts @@ -0,0 +1,140 @@ +import type { StoredChunk } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { nextSinceSeq, reconcileCache, selectEvictions } from "./logic"; +import type { ConversationCacheIndexEntry } from "./types"; + +const chunk = (seq: number, role: "user" | "assistant" = "user"): StoredChunk => ({ + seq, + role, + chunk: { type: "text", text: `chunk-${seq}` }, +}); + +describe("reconcileCache", () => { + it("merges and dedupes by seq", () => { + const cached = [chunk(1), chunk(2)]; + const incoming = [chunk(2), chunk(3)]; + const result = reconcileCache(cached, incoming); + expect(result.merged).toEqual([chunk(1), chunk(2), chunk(3)]); + }); + + it("toAppend excludes already-cached seqs", () => { + const cached = [chunk(1), chunk(2)]; + const incoming = [chunk(2), chunk(3)]; + const result = reconcileCache(cached, incoming); + expect(result.toAppend).toEqual([chunk(3)]); + }); + + it("tolerates out-of-order incoming", () => { + const cached = [chunk(1)]; + const incoming = [chunk(5), chunk(3), chunk(2)]; + const result = reconcileCache(cached, incoming); + expect(result.merged).toEqual([chunk(1), chunk(2), chunk(3), chunk(5)]); + expect(result.toAppend).toEqual([chunk(5), chunk(3), chunk(2)]); + }); + + it("returns empty merged and toAppend when both inputs are empty", () => { + const result = reconcileCache([], []); + expect(result.merged).toEqual([]); + expect(result.toAppend).toEqual([]); + }); + + it("handles empty cached with incoming", () => { + const incoming = [chunk(3), chunk(1)]; + const result = reconcileCache([], incoming); + expect(result.merged).toEqual([chunk(1), chunk(3)]); + expect(result.toAppend).toEqual([chunk(3), chunk(1)]); + }); + + it("handles cached with empty incoming", () => { + const cached = [chunk(1), chunk(2)]; + const result = reconcileCache(cached, []); + expect(result.merged).toEqual([chunk(1), chunk(2)]); + expect(result.toAppend).toEqual([]); + }); + + it("is idempotent โ re-reconciling same incoming produces same result", () => { + const cached = [chunk(1)]; + const incoming = [chunk(2), chunk(3)]; + const first = reconcileCache(cached, incoming); + const second = reconcileCache(first.merged, incoming); + expect(second.merged).toEqual(first.merged); + expect(second.toAppend).toEqual([]); + }); +}); + +describe("nextSinceSeq", () => { + it("returns max seq", () => { + const cached = [chunk(1), chunk(5), chunk(3)]; + expect(nextSinceSeq(cached)).toBe(5); + }); + + it("returns 0 when empty", () => { + expect(nextSinceSeq([])).toBe(0); + }); + + it("returns single seq for single chunk", () => { + expect(nextSinceSeq([chunk(42)])).toBe(42); + }); +}); + +describe("selectEvictions", () => { + it("never evicts the active conversation", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "active", chunkCount: 100, maxSeq: 100, lastAccess: 1000 }, + { conversationId: "other", chunkCount: 50, maxSeq: 50, lastAccess: 1 }, + ]; + const result = selectEvictions(index, { maxChunks: 50, activeConversationId: "active" }); + expect(result).not.toContain("active"); + expect(result).toContain("other"); + }); + + it("evicts LRU until under budget", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 30, maxSeq: 30, lastAccess: 100 }, + { conversationId: "b", chunkCount: 30, maxSeq: 30, lastAccess: 50 }, + { conversationId: "c", chunkCount: 30, maxSeq: 30, lastAccess: 200 }, + { conversationId: "d", chunkCount: 30, maxSeq: 30, lastAccess: 10 }, + ]; + // Total = 120, max = 60, need to evict 60+ chunks + // LRU order: d(10), b(50), a(100), c(200) + const result = selectEvictions(index, { maxChunks: 60, activeConversationId: null }); + expect(result).toEqual(["d", "b"]); + }); + + it("is a no-op under budget", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 10, maxSeq: 10, lastAccess: 100 }, + { conversationId: "b", chunkCount: 10, maxSeq: 10, lastAccess: 50 }, + ]; + const result = selectEvictions(index, { maxChunks: 100, activeConversationId: null }); + expect(result).toEqual([]); + }); + + it("returns empty for empty index", () => { + const result = selectEvictions([], { maxChunks: 100, activeConversationId: null }); + expect(result).toEqual([]); + }); + + it("tie-breaks by smaller maxSeq when lastAccess is equal", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 30, maxSeq: 100, lastAccess: 50 }, + { conversationId: "b", chunkCount: 30, maxSeq: 50, lastAccess: 50 }, + { conversationId: "c", chunkCount: 30, maxSeq: 200, lastAccess: 50 }, + ]; + // Total = 90, max = 60, need to evict 30+ chunks + // All have same lastAccess, tie-break by maxSeq: b(50), a(100), c(200) + const result = selectEvictions(index, { maxChunks: 60, activeConversationId: null }); + expect(result).toEqual(["b"]); + }); + + it("handles missing lastAccess (treated as 0)", () => { + const index: ConversationCacheIndexEntry[] = [ + { conversationId: "a", chunkCount: 30, maxSeq: 30, lastAccess: 100 }, + { conversationId: "b", chunkCount: 30, maxSeq: 30 }, + ]; + // Total = 60, max = 30, need to evict 30+ chunks + // b has no lastAccess (0), a has 100 + const result = selectEvictions(index, { maxChunks: 30, activeConversationId: null }); + expect(result).toEqual(["b"]); + }); +}); diff --git a/src/features/conversation-cache/logic.ts b/src/features/conversation-cache/logic.ts new file mode 100644 index 0000000..4a4479e --- /dev/null +++ b/src/features/conversation-cache/logic.ts @@ -0,0 +1,77 @@ +import type { StoredChunk } from "@dispatch/wire"; +import type { ConversationCacheIndexEntry, ReconcileResult } from "./types"; + +/** + * Merge authoritative seq-keyed chunks with cached chunks. + * + * Deduplicates by `seq`, produces seq-monotonic order. + * `toAppend` = the incoming chunks whose `seq` is not already in `cached` + * (exactly what to persist). Idempotent; tolerant of out-of-order/overlapping `incoming`. + */ +export function reconcileCache( + cached: readonly StoredChunk[], + incoming: readonly StoredChunk[], +): ReconcileResult { + const seen = new Set<number>(); + for (const chunk of cached) { + seen.add(chunk.seq); + } + + const toAppend: StoredChunk[] = []; + for (const chunk of incoming) { + if (!seen.has(chunk.seq)) { + toAppend.push(chunk); + seen.add(chunk.seq); + } + } + + const merged = [...cached, ...toAppend].sort((a, b) => a.seq - b.seq); + return { merged, toAppend }; +} + +/** + * Return the max committed `seq`, or `0` if empty. + * This is the `?sinceSeq=` cursor for the next incremental sync. + */ +export function nextSinceSeq(cached: readonly StoredChunk[]): number { + if (cached.length === 0) return 0; + let max = 0; + for (const chunk of cached) { + if (chunk.seq > max) max = chunk.seq; + } + return max; +} + +/** + * Choose conversationIds to evict to get total cached chunks under `maxChunks`. + * + * LRU eviction: oldest `lastAccess` first, tie-break smaller `maxSeq`. + * NEVER evicts the `activeConversationId`. + * Returns [] when under budget. + */ +export function selectEvictions( + index: readonly ConversationCacheIndexEntry[], + opts: { maxChunks: number; activeConversationId: string | null }, +): readonly string[] { + const totalChunks = index.reduce((sum, entry) => sum + entry.chunkCount, 0); + if (totalChunks <= opts.maxChunks) return []; + + const candidates = index + .filter((entry) => entry.conversationId !== opts.activeConversationId) + .sort((a, b) => { + const aAccess = a.lastAccess ?? 0; + const bAccess = b.lastAccess ?? 0; + if (aAccess !== bAccess) return aAccess - bAccess; + return a.maxSeq - b.maxSeq; + }); + + let remaining = totalChunks; + const evictions: string[] = []; + for (const entry of candidates) { + if (remaining <= opts.maxChunks) break; + evictions.push(entry.conversationId); + remaining -= entry.chunkCount; + } + + return evictions; +} diff --git a/src/features/conversation-cache/types.ts b/src/features/conversation-cache/types.ts new file mode 100644 index 0000000..2a349cc --- /dev/null +++ b/src/features/conversation-cache/types.ts @@ -0,0 +1,42 @@ +import type { StoredChunk } from "@dispatch/wire"; + +/** Metadata entry for a cached conversation, used by eviction logic. */ +export interface ConversationCacheIndexEntry { + readonly conversationId: string; + readonly chunkCount: number; + readonly maxSeq: number; + readonly lastAccess?: number; +} + +/** + * Storage port for conversation chunk persistence. + * + * The IndexedDB implementation lives in `src/adapters/idb/` (separate unit); + * this interface is the contract the cache logic depends on. + * + * All methods MUST be idempotent on `seq`: re-appending an existing seq is a no-op. + */ +export interface ConversationChunkStore { + /** Load all cached chunks for a conversation, seq-ordered. Returns [] if absent. */ + load(conversationId: string): Promise<readonly StoredChunk[]>; + + /** + * Append committed chunks to a conversation's cache. + * MUST be idempotent on `seq`: re-appending an existing seq is a no-op. + */ + append(conversationId: string, chunks: readonly StoredChunk[]): Promise<void>; + + /** Delete all cached data for a conversation. */ + delete(conversationId: string): Promise<void>; + + /** Return metadata for all cached conversations (for eviction). */ + index(): Promise<readonly ConversationCacheIndexEntry[]>; +} + +/** Result of reconciling cached chunks with incoming authoritative chunks. */ +export interface ReconcileResult { + /** The merged, deduplicated, seq-ordered chunk list. */ + readonly merged: readonly StoredChunk[]; + /** The subset of incoming chunks that need to be appended (not already cached). */ + readonly toAppend: readonly StoredChunk[]; +} |
