diff options
Diffstat (limited to 'src/adapters')
| -rw-r--r-- | src/adapters/idb/index.test.ts | 120 | ||||
| -rw-r--r-- | src/adapters/idb/index.ts | 181 |
2 files changed, 301 insertions, 0 deletions
diff --git a/src/adapters/idb/index.test.ts b/src/adapters/idb/index.test.ts new file mode 100644 index 0000000..12bb5ad --- /dev/null +++ b/src/adapters/idb/index.test.ts @@ -0,0 +1,120 @@ +import "fake-indexeddb/auto"; +import type { StoredChunk } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { createIdbChunkStore } from "./index"; + +function textChunk(text: string): StoredChunk["chunk"] { + return { type: "text", text }; +} + +function makeChunk( + seq: number, + text: string, + role: StoredChunk["role"] = "assistant", +): StoredChunk { + return { seq, role, chunk: textChunk(text) }; +} + +describe("createIdbChunkStore", () => { + it("append then load returns chunks seq-ordered", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + const chunks = [makeChunk(1, "a"), makeChunk(2, "b"), makeChunk(3, "c")]; + + await store.append("conv1", chunks); + const loaded = await store.load("conv1"); + + expect(loaded).toHaveLength(3); + expect(loaded[0]?.seq).toBe(1); + expect(loaded[1]?.seq).toBe(2); + expect(loaded[2]?.seq).toBe(3); + expect(loaded[0]?.chunk).toEqual(textChunk("a")); + }); + + it("append out-of-order still loads seq-ordered", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + const chunks = [makeChunk(3, "c"), makeChunk(1, "a"), makeChunk(2, "b")]; + + await store.append("conv1", chunks); + const loaded = await store.load("conv1"); + + expect(loaded).toHaveLength(3); + expect(loaded.map((c) => c.seq)).toEqual([1, 2, 3]); + }); + + it("append is idempotent on duplicate seq", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + + await store.append("conv1", [makeChunk(1, "first"), makeChunk(2, "b")]); + await store.append("conv1", [makeChunk(1, "first"), makeChunk(3, "c")]); + + const loaded = await store.load("conv1"); + expect(loaded).toHaveLength(3); + expect(loaded.map((c) => c.seq)).toEqual([1, 2, 3]); + expect(loaded[0]?.chunk).toEqual(textChunk("first")); + }); + + it("load returns [] for an absent conversation", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + + const loaded = await store.load("nonexistent"); + expect(loaded).toEqual([]); + }); + + it("delete removes a conversation", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + + await store.append("conv1", [makeChunk(1, "a")]); + await store.append("conv2", [makeChunk(1, "b")]); + + await store.delete("conv1"); + + expect(await store.load("conv1")).toEqual([]); + const conv2 = await store.load("conv2"); + expect(conv2).toHaveLength(1); + expect(conv2[0]?.chunk).toEqual(textChunk("b")); + }); + + it("index aggregates chunkCount and maxSeq", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + + await store.append("conv1", [makeChunk(1, "a"), makeChunk(2, "b"), makeChunk(3, "c")]); + await store.append("conv2", [makeChunk(1, "x")]); + + const idx = await store.index(); + expect(idx).toHaveLength(2); + + const c1 = idx.find((e) => e.conversationId === "conv1"); + const c2 = idx.find((e) => e.conversationId === "conv2"); + + expect(c1?.chunkCount).toBe(3); + expect(c1?.maxSeq).toBe(3); + expect(c2?.chunkCount).toBe(1); + expect(c2?.maxSeq).toBe(1); + }); + + it("index reports lastAccess after load", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + + await store.append("conv1", [makeChunk(1, "a")]); + const idx = await store.index(); + + const entry = idx.find((e) => e.conversationId === "conv1"); + expect(entry?.lastAccess).toBeTypeOf("number"); + expect(entry?.lastAccess).toBeGreaterThan(0); + }); + + it("separate conversations are isolated", async () => { + const store = createIdbChunkStore({ indexedDB: new IDBFactory() }); + + await store.append("conv1", [makeChunk(1, "a1"), makeChunk(2, "a2")]); + await store.append("conv2", [makeChunk(1, "b1")]); + + const loaded1 = await store.load("conv1"); + const loaded2 = await store.load("conv2"); + + expect(loaded1).toHaveLength(2); + expect(loaded2).toHaveLength(1); + expect(loaded1[0]?.chunk).toEqual(textChunk("a1")); + expect(loaded2[0]?.chunk).toEqual(textChunk("b1")); + }); +}); diff --git a/src/adapters/idb/index.ts b/src/adapters/idb/index.ts new file mode 100644 index 0000000..302edb5 --- /dev/null +++ b/src/adapters/idb/index.ts @@ -0,0 +1,181 @@ +import type { StoredChunk } from "@dispatch/wire"; +import type { + ConversationCacheIndexEntry, + ConversationChunkStore, +} from "../../features/conversation-cache"; + +const DEFAULT_DB_NAME = "dispatch-chunk-cache"; +const DB_VERSION = 1; +const CHUNKS_STORE = "chunks"; +const META_STORE = "meta"; + +interface ChunkRecord { + conversationId: string; + seq: number; + role: StoredChunk["role"]; + chunk: StoredChunk["chunk"]; +} + +interface MetaRecord { + conversationId: string; + lastAccess: number; +} + +export interface CreateIdbChunkStoreOptions { + indexedDB?: IDBFactory; + dbName?: string; +} + +function requestToPromise<T>(req: IDBRequest<T>): Promise<T> { + return new Promise<T>((resolve, reject) => { + req.onsuccess = () => resolve(req.result); + req.onerror = () => reject(req.error); + }); +} + +function txComplete(tx: IDBTransaction): Promise<void> { + return new Promise<void>((resolve, reject) => { + tx.oncomplete = () => resolve(); + tx.onerror = () => reject(tx.error); + tx.onabort = () => reject(tx.error); + }); +} + +function openDb(idb: IDBFactory, dbName: string): Promise<IDBDatabase> { + return new Promise<IDBDatabase>((resolve, reject) => { + const req = idb.open(dbName, DB_VERSION); + + req.onupgradeneeded = () => { + const db = req.result; + if (!db.objectStoreNames.contains(CHUNKS_STORE)) { + const store = db.createObjectStore(CHUNKS_STORE, { + keyPath: ["conversationId", "seq"], + }); + store.createIndex("byConversation", "conversationId"); + } + if (!db.objectStoreNames.contains(META_STORE)) { + db.createObjectStore(META_STORE, { keyPath: "conversationId" }); + } + }; + + req.onsuccess = () => resolve(req.result); + req.onerror = () => reject(req.error); + }); +} + +function keyRangeFor(conversationId: string): IDBKeyRange { + const lower: [string, number] = [conversationId, 0]; + const upper: [string, number] = [conversationId, Number.POSITIVE_INFINITY]; + return IDBKeyRange.bound(lower, upper); +} + +function chunksToStoredChunks(records: ChunkRecord[]): StoredChunk[] { + return records.map((r) => ({ seq: r.seq, role: r.role, chunk: r.chunk })); +} + +export function createIdbChunkStore(opts?: CreateIdbChunkStoreOptions): ConversationChunkStore { + const idb = opts?.indexedDB ?? globalThis.indexedDB; + const dbName = opts?.dbName ?? DEFAULT_DB_NAME; + + let dbPromise: Promise<IDBDatabase> | null = null; + + function getDb(): Promise<IDBDatabase> { + if (dbPromise === null) { + dbPromise = openDb(idb, dbName); + } + return dbPromise; + } + + return { + async load(conversationId: string): Promise<readonly StoredChunk[]> { + const db = await getDb(); + const tx = db.transaction(CHUNKS_STORE, "readonly"); + const store = tx.objectStore(CHUNKS_STORE); + const range = keyRangeFor(conversationId); + const records = await requestToPromise<ChunkRecord[]>(store.getAll(range)); + await txComplete(tx); + + records.sort((a, b) => a.seq - b.seq); + return chunksToStoredChunks(records); + }, + + async append(conversationId: string, chunks: readonly StoredChunk[]): Promise<void> { + if (chunks.length === 0) return; + + const db = await getDb(); + const tx = db.transaction([CHUNKS_STORE, META_STORE], "readwrite"); + const chunkStore = tx.objectStore(CHUNKS_STORE); + const metaStore = tx.objectStore(META_STORE); + + for (const c of chunks) { + chunkStore.put({ + conversationId, + seq: c.seq, + role: c.role, + chunk: c.chunk, + } satisfies ChunkRecord); + } + + metaStore.put({ + conversationId, + lastAccess: Date.now(), + } satisfies MetaRecord); + + await txComplete(tx); + }, + + async delete(conversationId: string): Promise<void> { + const db = await getDb(); + const tx = db.transaction([CHUNKS_STORE, META_STORE], "readwrite"); + const chunkStore = tx.objectStore(CHUNKS_STORE); + const metaStore = tx.objectStore(META_STORE); + + chunkStore.delete(keyRangeFor(conversationId)); + metaStore.delete(conversationId); + + await txComplete(tx); + }, + + async index(): Promise<readonly ConversationCacheIndexEntry[]> { + const db = await getDb(); + const tx = db.transaction([CHUNKS_STORE, META_STORE], "readonly"); + const chunkStore = tx.objectStore(CHUNKS_STORE); + const metaStore = tx.objectStore(META_STORE); + + const allChunks = await requestToPromise<ChunkRecord[]>(chunkStore.getAll()); + const allMeta = await requestToPromise<MetaRecord[]>(metaStore.getAll()); + await txComplete(tx); + + const metaMap = new Map<string, number>(); + for (const m of allMeta) { + metaMap.set(m.conversationId, m.lastAccess); + } + + const grouped = new Map<string, { chunkCount: number; maxSeq: number }>(); + for (const r of allChunks) { + const existing = grouped.get(r.conversationId); + if (existing === undefined) { + grouped.set(r.conversationId, { chunkCount: 1, maxSeq: r.seq }); + } else { + existing.chunkCount++; + if (r.seq > existing.maxSeq) { + existing.maxSeq = r.seq; + } + } + } + + const result: ConversationCacheIndexEntry[] = []; + for (const [conversationId, stats] of grouped) { + const lastAccess = metaMap.get(conversationId); + result.push({ + conversationId, + chunkCount: stats.chunkCount, + maxSeq: stats.maxSeq, + ...(lastAccess !== undefined ? { lastAccess } : {}), + }); + } + + return result; + }, + }; +} |
