From 3a0cdd2c8453f059a746465e3aa6d9b5caa3b399 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Wed, 10 Jun 2026 10:43:40 +0900 Subject: trace-store: content-addressed body dedup + retention/prune MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wave 1 of the dedup/storage-growth milestone (notes §12). - bodies table is now content-addressed (SHA-256 hash key); identical verbatim bodies (cache-warming resends, any repeat) collapse to one stored row, referenced by hash from records. Transparent to insert/read callers. - at-rest gzip compression for bodies >1 KiB (node:zlib), decompressed on read. - prune(policy): age-based delete + drop-oldest byte-cap eviction + orphan-body GC. Exports RetentionPolicy/PruneSummary/DEFAULT_RETENTION (7d / 256 MiB). typecheck EXIT 0; biome clean; vitest 576; bun 89->100, 0 fail. --- notes/observability-design.md | 51 +++++ packages/trace-store/src/index.ts | 4 +- packages/trace-store/src/store.test.ts | 264 +++++++++++++++++++++- packages/trace-store/src/store.ts | 396 +++++++++++++++++++++++++++++++-- tasks.md | 28 ++- 5 files changed, 720 insertions(+), 23 deletions(-) diff --git a/notes/observability-design.md b/notes/observability-design.md index 4a3515a..1b09973 100644 --- a/notes/observability-design.md +++ b/notes/observability-design.md @@ -635,3 +635,54 @@ waits. Per tick the collector: durable; the collector consumes whenever it's up). - **Open (Phase B):** poll vs fs-watch; interval/batch-size; offset storage (store metadata vs sidecar); dedup key; + the store schema/indexes (§6). + +--- + +## 12. Phase C — body de-dup + retention (DECIDED; building) + +> Resolves the §6 "Retention/rotation sizing" + "dedup key" open threads + the +> tasks.md "dedup / storage growth" item. **User-gated decisions (this session):** +> extend the existing pipeline (no new extension); scope = **de-dup + retention/ +> rotation** (D9 analytics roll-ups DEFERRED); dedup = **content-addressed bodies**. + +**The problem.** D5 verbatim provider-I/O capture stores large request/response +bodies; cache-warming resends near-identical bodies every few minutes → the trace +store grows without bound. + +**Decision — content-addressed bodies (supersedes the §3.1/D5 "fingerprint-gated +persistence" sketch).** Dedup keys on the **body content hash**, NOT on +`prefix.fingerprint`: +- The store hashes each verbatim `body`, stores it **once** in a content-addressed + bodies table keyed by hash, and references it by hash from the span/record row. + Identical bodies (the cache-warming case, and any other repeat) collapse to one + stored copy. Robust against ALL duplicate bodies, not just prefix matches; and it + is **stateless** (no "prior fingerprint per conversation" bookkeeping). +- This **decouples `prefix.fingerprint` from storage.** Fingerprint + `warm|real` + revert to their original job: queryable **cache-bust debugging** attributes (§3.1). + They are NOT needed for dedup/retention and are **deferred** to a later cache-bust- + debugging milestone — also because **cache-warming is not built yet**, so a request + cannot honestly be flagged `warm` vs `real` today (declare-reality, §extension brief). + +**Retention/rotation.** The store exposes `prune(policy)` enforcing a +`RetentionPolicy` ({ maxAgeMs?, maxTotalBodyBytes? }): delete records/bodies past +`maxAgeMs`; evict **oldest** bodies (drop-oldest) until under `maxTotalBodyBytes`; +garbage-collect orphaned bodies (no remaining referencing row). Bodies above a size +threshold are **compressed** at rest (gzip), transparently decompressed on read. +Exports a `DEFAULT_RETENTION` constant. + +**Who triggers prune.** The **collector** (process 2) calls `store.prune(policy)` on +a cadence in its existing tick loop — NOT a `scheduledJob` (the scheduler is for +extensions; trace-store/collector are supporting packages). Retention policy values +default to `DEFAULT_RETENTION`; host-bin env-override wiring is a deferred follow-up. + +**Units / waves.** +- **Wave 1 — `trace-store`:** content-addressed body storage + compression + + `prune`/`RetentionPolicy`/`DEFAULT_RETENTION` on the `TraceStore` surface, read + paths transparent. (`bun:sqlite` → `bun test`.) +- **Wave 2 — `observability-collector`:** call the new `prune` on a cadence; + confirm body inserts flow through the content-addressed path. Depends on Wave 1's + surface. + +**Deferred (still open):** D9 analytics roll-ups (§2 D9 / §6) — rollup table shape, +`GROUP BY` indexes, retention asymmetry, the periodic rollup job. And the +`prefix.fingerprint` / `warm|real` cache-bust attributes (above). diff --git a/packages/trace-store/src/index.ts b/packages/trace-store/src/index.ts index 7341305..17e206f 100644 --- a/packages/trace-store/src/index.ts +++ b/packages/trace-store/src/index.ts @@ -1,3 +1,3 @@ export { formatDuration, renderEasyView } from "./easy-view.js"; -export type { TraceStore } from "./store.js"; -export { createTraceStore, stableId } from "./store.js"; +export type { PruneSummary, RetentionPolicy, TraceStore } from "./store.js"; +export { createTraceStore, DEFAULT_RETENTION, stableId } from "./store.js"; diff --git a/packages/trace-store/src/store.test.ts b/packages/trace-store/src/store.test.ts index e380147..10a033a 100644 --- a/packages/trace-store/src/store.test.ts +++ b/packages/trace-store/src/store.test.ts @@ -1,6 +1,6 @@ import type { LogRecord } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; -import { createTraceStore, stableId } from "./store.js"; +import { computeEvictions, createTraceStore, stableId } from "./store.js"; const logRecord: LogRecord = { kind: "log", @@ -221,4 +221,266 @@ describe("createTraceStore", () => { // ignore cleanup error } }); + + it("body round-trips through getTurn", () => { + const store = freshStore(); + store.insertRecords([bodyRecord]); + const result = store.getTurn("turn-1"); + expect(result).toHaveLength(1); + expect(result[0]?.kind).toBe("span-open"); + if (result[0]?.kind === "span-open") { + expect(result[0].body).toBe("the full prompt text"); + } + store.close(); + }); +}); + +describe("content-addressed body storage", () => { + function freshStore() { + return createTraceStore({ path: ":memory:" }); + } + + it("content-addresses two identical bodies to a single stored body row", () => { + const store = freshStore(); + const rec1: LogRecord = { + kind: "span-open", + spanId: "s1", + name: "prompt", + timestamp: 1000, + extensionId: "ext", + turnId: "t1", + body: "identical body content", + }; + const rec2: LogRecord = { + kind: "span-open", + spanId: "s2", + name: "prompt", + timestamp: 2000, + extensionId: "ext", + turnId: "t1", + body: "identical body content", + }; + store.insertRecords([rec1, rec2]); + + const id1 = stableId(rec1); + const id2 = stableId(rec2); + expect(store.getBody(id1)).toBe("identical body content"); + expect(store.getBody(id2)).toBe("identical body content"); + store.close(); + }); + + it("stores distinct bodies separately", () => { + const store = freshStore(); + const rec1: LogRecord = { + kind: "span-open", + spanId: "s1", + name: "prompt", + timestamp: 1000, + extensionId: "ext", + turnId: "t1", + body: "body A", + }; + const rec2: LogRecord = { + kind: "span-open", + spanId: "s2", + name: "prompt", + timestamp: 2000, + extensionId: "ext", + turnId: "t1", + body: "body B", + }; + store.insertRecords([rec1, rec2]); + + const id1 = stableId(rec1); + const id2 = stableId(rec2); + expect(store.getBody(id1)).toBe("body A"); + expect(store.getBody(id2)).toBe("body B"); + store.close(); + }); + + it("compresses a body above the threshold and round-trips it on read", () => { + const store = freshStore(); + const largeBody = "x".repeat(2048); + const rec: LogRecord = { + kind: "span-open", + spanId: "s1", + name: "prompt", + timestamp: 1000, + extensionId: "ext", + turnId: "t1", + body: largeBody, + }; + store.insertRecords([rec]); + const id = stableId(rec); + expect(store.getBody(id)).toBe(largeBody); + store.close(); + }); +}); + +describe("prune", () => { + function freshStore() { + return createTraceStore({ path: ":memory:" }); + } + + it("prune by maxAgeMs deletes records and their bodies older than the cutoff", () => { + const store = freshStore(); + const oldRec: LogRecord = { + kind: "span-open", + spanId: "s-old", + name: "old-prompt", + timestamp: 1000, + extensionId: "ext", + turnId: "t1", + body: "old body content", + }; + const newRec: LogRecord = { + kind: "span-open", + spanId: "s-new", + name: "new-prompt", + timestamp: Date.now(), + extensionId: "ext", + turnId: "t2", + body: "new body content", + }; + store.insertRecords([oldRec, newRec]); + + const summary = store.prune({ maxAgeMs: 60000 }); + expect(summary.recordsDeleted).toBe(1); + + const result = store.getTurn("t1"); + expect(result).toHaveLength(0); + expect(store.getBody(stableId(oldRec))).toBeUndefined(); + + const newResult = store.getTurn("t2"); + expect(newResult).toHaveLength(1); + expect(store.getBody(stableId(newRec))).toBe("new body content"); + store.close(); + }); + + it("prune by maxTotalBodyBytes evicts oldest bodies until under the cap", () => { + const store = freshStore(); + const body1 = "a".repeat(300); + const body2 = "b".repeat(300); + const body3 = "c".repeat(300); + const rec1: LogRecord = { + kind: "span-open", + spanId: "s1", + name: "p", + timestamp: 1000, + extensionId: "ext", + turnId: "t1", + body: body1, + }; + const rec2: LogRecord = { + kind: "span-open", + spanId: "s2", + name: "p", + timestamp: 2000, + extensionId: "ext", + turnId: "t2", + body: body2, + }; + const rec3: LogRecord = { + kind: "span-open", + spanId: "s3", + name: "p", + timestamp: 3000, + extensionId: "ext", + turnId: "t3", + body: body3, + }; + store.insertRecords([rec1, rec2, rec3]); + + const summary = store.prune({ maxTotalBodyBytes: 500 }); + expect(summary.bodiesDeleted).toBeGreaterThanOrEqual(1); + + expect(store.getBody(stableId(rec1))).toBeUndefined(); + + const remaining = store.getTurn("t3"); + if (remaining.length > 0 && remaining[0]?.kind === "span-open") { + expect(remaining[0].body).toBe(body3); + } + store.close(); + }); + + it("prune garbage-collects an orphaned body with no referencing record", () => { + const store = freshStore(); + const rec: LogRecord = { + kind: "span-open", + spanId: "s1", + name: "p", + timestamp: 1000, + extensionId: "ext", + turnId: "t1", + body: "orphan body", + }; + store.insertRecords([rec]); + + const summary = store.prune({ maxAgeMs: 60000 }); + expect(summary.recordsDeleted).toBe(1); + expect(summary.bodiesDeleted).toBe(1); + store.close(); + }); + + it("prune keeps a still-referenced body when a duplicate referrer remains", () => { + const store = freshStore(); + const sharedBody = "shared body content"; + const rec1: LogRecord = { + kind: "span-open", + spanId: "s1", + name: "p", + timestamp: 1000, + extensionId: "ext", + turnId: "t1", + body: sharedBody, + }; + const rec2: LogRecord = { + kind: "span-open", + spanId: "s2", + name: "p", + timestamp: Date.now(), + extensionId: "ext", + turnId: "t2", + body: sharedBody, + }; + store.insertRecords([rec1, rec2]); + + const summary = store.prune({ maxAgeMs: 60000 }); + expect(summary.recordsDeleted).toBe(1); + expect(summary.bodiesDeleted).toBe(0); + + const id2 = stableId(rec2); + expect(store.getBody(id2)).toBe(sharedBody); + store.close(); + }); +}); + +describe("computeEvictions", () => { + it("returns empty when under cap", () => { + const bodies = [ + { hash: "a", storedSize: 100, oldestRecordTimestamp: 1000 }, + { hash: "b", storedSize: 200, oldestRecordTimestamp: 2000 }, + ]; + expect(computeEvictions(bodies, 500)).toEqual([]); + }); + + it("evicts oldest bodies until under cap", () => { + const bodies = [ + { hash: "a", storedSize: 300, oldestRecordTimestamp: 1000 }, + { hash: "b", storedSize: 300, oldestRecordTimestamp: 2000 }, + { hash: "c", storedSize: 300, oldestRecordTimestamp: 3000 }, + ]; + const evicted = computeEvictions(bodies, 500); + expect(evicted).toEqual(["a", "b"]); + }); + + it("evicts multiple oldest bodies", () => { + const bodies = [ + { hash: "a", storedSize: 200, oldestRecordTimestamp: 1000 }, + { hash: "b", storedSize: 200, oldestRecordTimestamp: 2000 }, + { hash: "c", storedSize: 200, oldestRecordTimestamp: 3000 }, + ]; + const evicted = computeEvictions(bodies, 300); + expect(evicted).toEqual(["a", "b"]); + }); }); diff --git a/packages/trace-store/src/store.ts b/packages/trace-store/src/store.ts index 53b7390..b3f397e 100644 --- a/packages/trace-store/src/store.ts +++ b/packages/trace-store/src/store.ts @@ -1,12 +1,32 @@ import { Database } from "bun:sqlite"; +import { gunzipSync, gzipSync } from "node:zlib"; import type { Attributes, LogRecord, SpanLink } from "@dispatch/kernel"; import { renderEasyView } from "./easy-view.js"; +const COMPRESS_THRESHOLD_BYTES = 1024; + +export interface RetentionPolicy { + readonly maxAgeMs?: number; + readonly maxTotalBodyBytes?: number; +} + +export const DEFAULT_RETENTION: Required = { + maxAgeMs: 7 * 24 * 60 * 60 * 1000, + maxTotalBodyBytes: 256 * 1024 * 1024, +}; + +export interface PruneSummary { + recordsDeleted: number; + bodiesDeleted: number; + bytesReclaimed: number; +} + export interface TraceStore { insertRecords(records: readonly LogRecord[]): void; getTurn(turnId: string): LogRecord[]; getBody(recordId: string): string | undefined; easyView(turnId: string): string; + prune(policy: RetentionPolicy): PruneSummary; close(): void; } @@ -27,6 +47,9 @@ export function createTraceStore(opts: { path: string }): TraceStore { easyView(turnId) { return renderEasyView(getTurn(db, turnId)); }, + prune(policy) { + return prune(db, policy); + }, close() { db.close(); }, @@ -50,7 +73,8 @@ function schema(db: Database): void { durationMs INTEGER, status TEXT, attributes TEXT, - links TEXT + links TEXT, + bodyHash TEXT ) `); db.run("CREATE INDEX IF NOT EXISTS idx_records_turnId ON records(turnId)"); @@ -58,13 +82,239 @@ function schema(db: Database): void { db.run("CREATE INDEX IF NOT EXISTS idx_records_spanId ON records(spanId)"); db.run("CREATE INDEX IF NOT EXISTS idx_records_kind ON records(kind)"); db.run("CREATE INDEX IF NOT EXISTS idx_records_timestamp ON records(timestamp)"); + db.run("CREATE INDEX IF NOT EXISTS idx_records_bodyHash ON records(bodyHash)"); db.run(` CREATE TABLE IF NOT EXISTS bodies ( - recordId TEXT PRIMARY KEY REFERENCES records(id), - body TEXT NOT NULL + hash TEXT PRIMARY KEY, + body BLOB NOT NULL, + isCompressed INTEGER NOT NULL DEFAULT 0, + originalSize INTEGER NOT NULL, + storedSize INTEGER NOT NULL ) `); + + migrateOldBodies(db); +} + +function migrateOldBodies(db: Database): void { + const hasOldTable = db + .query("SELECT name FROM sqlite_master WHERE type='table' AND name='bodies_old'") + .get() as { name: string } | null; + if (hasOldTable !== null) { + return; + } + + const cols = db.query("PRAGMA table_info(bodies)").all() as Array<{ + name: string; + }>; + const hasRecordId = cols.some((c) => c.name === "recordId"); + if (!hasRecordId) { + return; + } + + const oldRows = db.query("SELECT recordId, body FROM bodies").all() as Array<{ + recordId: string; + body: string; + }>; + + db.run("ALTER TABLE bodies RENAME TO bodies_old"); + + db.run(` + CREATE TABLE IF NOT EXISTS bodies ( + hash TEXT PRIMARY KEY, + body BLOB NOT NULL, + isCompressed INTEGER NOT NULL DEFAULT 0, + originalSize INTEGER NOT NULL, + storedSize INTEGER NOT NULL + ) + `); + + const hasBodyHash = cols.some((c) => c.name === "bodyHash"); + if (!hasBodyHash) { + db.run("ALTER TABLE records ADD COLUMN bodyHash TEXT"); + } + + const upsertBody = db.prepare(` + INSERT OR IGNORE INTO bodies (hash, body, isCompressed, originalSize, storedSize) + VALUES (?, ?, 0, ?, ?) + `); + const updateRecord = db.prepare("UPDATE records SET bodyHash = ? WHERE id = ?"); + + const migrateTxn = db.transaction(() => { + for (const row of oldRows) { + const hash = contentHash(row.body); + const bodyBytes = new TextEncoder().encode(row.body); + upsertBody.run(hash, bodyBytes, bodyBytes.length, bodyBytes.length); + updateRecord.run(hash, row.recordId); + } + db.run("DROP TABLE bodies_old"); + }); + migrateTxn(); +} + +function sha256Hex(input: string): string { + const data = new TextEncoder().encode(input); + let h0 = 0x6a09e667; + let h1 = 0xbb67ae85; + let h2 = 0x3c6ef372; + let h3 = 0xa54ff53a; + let h4 = 0x510e527f; + let h5 = 0x9b05688c; + let h6 = 0x1f83d9ab; + let h7 = 0x5be0cd19; + + const msgLen = data.length; + const bitLen = msgLen * 8; + const withOne = msgLen + 1; + const paddedLen = withOne + ((96 - (withOne % 64)) % 64) + 8; + const padded = new Uint8Array(paddedLen); + padded.set(data); + padded[msgLen] = 0x80; + padded[paddedLen - 8] = (bitLen / 0x100000000) >>> 0; + padded[paddedLen - 4] = bitLen >>> 0; + + const kArr = new Uint32Array(K); + + for (let offset = 0; offset < paddedLen; offset += 64) { + const w = new Uint32Array(64); + for (let i = 0; i < 16; i++) { + const o = offset + i * 4; + const b0 = padded[o] ?? 0; + const b1 = padded[o + 1] ?? 0; + const b2 = padded[o + 2] ?? 0; + const b3 = padded[o + 3] ?? 0; + w[i] = (b0 << 24) | (b1 << 16) | (b2 << 8) | b3; + } + for (let i = 16; i < 64; i++) { + const prev15 = w[i - 15] ?? 0; + const prev2 = w[i - 2] ?? 0; + const prev16 = w[i - 16] ?? 0; + const prev7 = w[i - 7] ?? 0; + const s0 = rightRotate(prev15, 7) ^ rightRotate(prev15, 18) ^ (prev15 >>> 3); + const s1 = rightRotate(prev2, 17) ^ rightRotate(prev2, 19) ^ (prev2 >>> 10); + w[i] = (prev16 + s0 + prev7 + s1) | 0; + } + + let a = h0; + let b = h1; + let c = h2; + let d = h3; + let e = h4; + let f = h5; + let g = h6; + let h = h7; + + for (let i = 0; i < 64; i++) { + const S1 = rightRotate(e, 6) ^ rightRotate(e, 11) ^ rightRotate(e, 25); + const ch = (e & f) ^ (~e & g); + const ki = kArr[i] ?? 0; + const wi = w[i] ?? 0; + const temp1 = (h + S1 + ch + ki + wi) | 0; + const S0 = rightRotate(a, 2) ^ rightRotate(a, 13) ^ rightRotate(a, 22); + const maj = (a & b) ^ (a & c) ^ (b & c); + const temp2 = (S0 + maj) | 0; + + h = g; + g = f; + f = e; + e = (d + temp1) | 0; + d = c; + c = b; + b = a; + a = (temp1 + temp2) | 0; + } + + h0 = (h0 + a) | 0; + h1 = (h1 + b) | 0; + h2 = (h2 + c) | 0; + h3 = (h3 + d) | 0; + h4 = (h4 + e) | 0; + h5 = (h5 + f) | 0; + h6 = (h6 + g) | 0; + h7 = (h7 + h) | 0; + } + + return ( + toHex32(h0) + + toHex32(h1) + + toHex32(h2) + + toHex32(h3) + + toHex32(h4) + + toHex32(h5) + + toHex32(h6) + + toHex32(h7) + ); +} + +function rightRotate(x: number, n: number): number { + return ((x >>> n) | (x << (32 - n))) >>> 0; +} + +function toHex32(n: number): string { + return (n >>> 0).toString(16).padStart(8, "0"); +} + +const K = [ + 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5, + 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, + 0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da, + 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967, + 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85, + 0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070, + 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3, + 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2, +]; + +function contentHash(body: string): string { + return sha256Hex(body); +} + +function compressBody(body: string): { stored: Uint8Array; isCompressed: boolean } { + const raw = new TextEncoder().encode(body); + if (raw.length <= COMPRESS_THRESHOLD_BYTES) { + return { stored: raw, isCompressed: false }; + } + const compressed = gzipSync(raw); + if (compressed.length >= raw.length) { + return { stored: raw, isCompressed: false }; + } + return { stored: compressed, isCompressed: true }; +} + +function decompressBody(stored: Uint8Array, isCompressed: boolean): string { + if (!isCompressed) { + return new TextDecoder().decode(stored); + } + const decompressed = gunzipSync(stored); + return new TextDecoder().decode(decompressed); +} + +function storeBody(db: Database, body: string): string { + const hash = contentHash(body); + const existing = db.query("SELECT 1 FROM bodies WHERE hash = ?").get(hash) as unknown; + if (existing !== null) { + return hash; + } + const { stored, isCompressed } = compressBody(body); + db.prepare( + "INSERT OR IGNORE INTO bodies (hash, body, isCompressed, originalSize, storedSize) VALUES (?, ?, ?, ?, ?)", + ).run(hash, stored, isCompressed ? 1 : 0, new TextEncoder().encode(body).length, stored.length); + return hash; +} + +function resolveBody(db: Database, hash: string | null): string | undefined { + if (hash === null) { + return undefined; + } + const row = db.query("SELECT body, isCompressed FROM bodies WHERE hash = ?").get(hash) as { + body: Uint8Array; + isCompressed: number; + } | null; + if (row === undefined || row === null) { + return undefined; + } + return decompressBody(row.body, row.isCompressed === 1); } function insertRecords(db: Database, records: readonly LogRecord[]): void { @@ -72,11 +322,10 @@ function insertRecords(db: Database, records: readonly LogRecord[]): void { INSERT OR IGNORE INTO records (id, kind, level, msg, name, spanId, parentSpanId, conversationId, turnId, extensionId, timestamp, - durationMs, status, attributes, links) + durationMs, status, attributes, links, bodyHash) VALUES - (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); - const bodyStmt = db.prepare("INSERT OR IGNORE INTO bodies (recordId, body) VALUES (?, ?)"); const txn = db.transaction(() => { for (const r of records) { @@ -117,6 +366,11 @@ function insertRecords(db: Database, records: readonly LogRecord[]): void { const attributes: string | null = r.attributes !== undefined ? JSON.stringify(r.attributes) : null; + let bodyHash: string | null = null; + if (r.body !== undefined) { + bodyHash = storeBody(db, r.body); + } + recStmt.run( id, kind, @@ -133,11 +387,8 @@ function insertRecords(db: Database, records: readonly LogRecord[]): void { status, attributes, links, + bodyHash, ); - - if (r.body !== undefined) { - bodyStmt.run(id, r.body); - } } }); txn(); @@ -159,26 +410,31 @@ interface RecordRow { status: string | null; attributes: string | null; links: string | null; + bodyHash: string | null; } function getTurn(db: Database, turnId: string): LogRecord[] { const rows = db .query("SELECT * FROM records WHERE turnId = ? ORDER BY timestamp ASC, rowid ASC") .all(turnId) as RecordRow[]; - return rows.map(rowToRecord); + return rows.map((row) => rowToRecord(db, row)); } function getBody(db: Database, recordId: string): string | undefined { - const row = db.query("SELECT body FROM bodies WHERE recordId = ?").get(recordId) as { - body: string; + const row = db.query("SELECT bodyHash FROM records WHERE id = ?").get(recordId) as { + bodyHash: string | null; } | null; - return row?.body; + if (row === undefined || row === null || row.bodyHash === null) { + return undefined; + } + return resolveBody(db, row.bodyHash); } -function rowToRecord(row: RecordRow): LogRecord { +function rowToRecord(db: Database, row: RecordRow): LogRecord { const attributes: Attributes | undefined = row.attributes !== null ? JSON.parse(row.attributes) : undefined; const links: SpanLink[] | undefined = row.links !== null ? JSON.parse(row.links) : undefined; + const body: string | undefined = resolveBody(db, row.bodyHash); if (row.kind === "log") { const record: LogRecord = { @@ -192,6 +448,7 @@ function rowToRecord(row: RecordRow): LogRecord { ...(row.spanId !== null && { spanId: row.spanId }), ...(row.parentSpanId !== null && { parentSpanId: row.parentSpanId }), ...(attributes !== undefined && { attributes }), + ...(body !== undefined && { body }), }; return record; } @@ -208,6 +465,7 @@ function rowToRecord(row: RecordRow): LogRecord { ...(row.parentSpanId !== null && { parentSpanId: row.parentSpanId }), ...(attributes !== undefined && { attributes }), ...(links !== undefined && { links }), + ...(body !== undefined && { body }), }; return record; } @@ -225,10 +483,118 @@ function rowToRecord(row: RecordRow): LogRecord { ...(row.parentSpanId !== null && { parentSpanId: row.parentSpanId }), ...(attributes !== undefined && { attributes }), ...(links !== undefined && { links }), + ...(body !== undefined && { body }), }; return record; } +interface BodyRow { + hash: string; + storedSize: number; +} + +interface BodyWithTimestamp extends BodyRow { + oldestRecordTimestamp: number; +} + +export function computeEvictions( + bodies: readonly BodyWithTimestamp[], + maxTotalBodyBytes: number, +): string[] { + let totalBytes = 0; + for (const b of bodies) { + totalBytes += b.storedSize; + } + if (totalBytes <= maxTotalBodyBytes) { + return []; + } + + const sorted = [...bodies].sort((a, b) => a.oldestRecordTimestamp - b.oldestRecordTimestamp); + const evict: string[] = []; + let remaining = totalBytes; + for (const b of sorted) { + if (remaining <= maxTotalBodyBytes) { + break; + } + evict.push(b.hash); + remaining -= b.storedSize; + } + return evict; +} + +function prune(db: Database, policy: RetentionPolicy): PruneSummary { + let recordsDeleted = 0; + let bodiesDeleted = 0; + let bytesReclaimed = 0; + + const now = Date.now(); + + if (policy.maxAgeMs !== undefined) { + const cutoff = now - policy.maxAgeMs; + const oldRecords = db + .query("SELECT id, bodyHash FROM records WHERE timestamp < ?") + .all(cutoff) as Array<{ id: string; bodyHash: string | null }>; + + if (oldRecords.length > 0) { + const bodyHashes = oldRecords.map((r) => r.bodyHash).filter((h): h is string => h !== null); + + const deleteTxn = db.transaction(() => { + db.prepare("DELETE FROM records WHERE timestamp < ?").run(cutoff); + for (const hash of bodyHashes) { + const refCount = db + .query("SELECT COUNT(*) as cnt FROM records WHERE bodyHash = ?") + .get(hash) as { cnt: number }; + if (refCount.cnt === 0) { + const bodyRow = db.query("SELECT storedSize FROM bodies WHERE hash = ?").get(hash) as { + storedSize: number; + } | null; + if (bodyRow !== undefined && bodyRow !== null) { + bytesReclaimed += bodyRow.storedSize; + } + db.prepare("DELETE FROM bodies WHERE hash = ?").run(hash); + bodiesDeleted++; + } + } + }); + deleteTxn(); + recordsDeleted = oldRecords.length; + } + } + + if (policy.maxTotalBodyBytes !== undefined) { + const bodyRows = db + .query(` + SELECT b.hash, b.storedSize, MIN(r.timestamp) as oldestRecordTimestamp + FROM bodies b + JOIN records r ON r.bodyHash = b.hash + GROUP BY b.hash + `) + .all() as Array<{ hash: string; storedSize: number; oldestRecordTimestamp: number }>; + + const toEvict = computeEvictions(bodyRows, policy.maxTotalBodyBytes); + + if (toEvict.length > 0) { + const evictTxn = db.transaction(() => { + for (const hash of toEvict) { + const bodyRow = db.query("SELECT storedSize FROM bodies WHERE hash = ?").get(hash) as { + storedSize: number; + } | null; + if (bodyRow !== undefined && bodyRow !== null) { + bytesReclaimed += bodyRow.storedSize; + } + db.prepare("DELETE FROM records WHERE bodyHash = ?").run(hash); + db.prepare("DELETE FROM bodies WHERE hash = ?").run(hash); + bodiesDeleted++; + } + }); + evictTxn(); + recordsDeleted += toEvict.length; + } + } + + return { recordsDeleted, bodiesDeleted, bytesReclaimed }; +} + function toCanonicalJson(value: unknown): string { if (value === null || typeof value !== "object") { return JSON.stringify(value); diff --git a/tasks.md b/tasks.md index 4d7da4a..d86086a 100644 --- a/tasks.md +++ b/tasks.md @@ -5,7 +5,7 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **546 vitest + 89 bun = 635 tests**. +`tsc -b` EXIT 0 · biome clean · **576 vitest + 89 bun = 665 tests**. Built and verified live (full-fidelity: every feature is a manifest-loaded extension through the host): @@ -68,11 +68,29 @@ server/collector procs poison the next run's counts. - [x] **FE courier handoff** written: `frontend-metrics-pass2-handoff.md` (in this repo; user couriers to `../dispatch-web`; ORCHESTRATOR §7). +## dedup / storage growth (current milestone — building) +Design DECIDED + recorded: `notes/observability-design.md` §12. User-gated calls: +extend existing pipeline (no new ext); scope = **de-dup + retention/rotation** +(D9 roll-ups deferred); dedup = **content-addressed bodies** (body-hash, NOT +fingerprint-gated). Glossary terms approved (add on land): deduplication / +content-addressed body, prefix fingerprint, warm vs real, retention / rotation. +- [x] **Wave 1 — `trace-store`** (done): content-addressed `bodies` table + (SHA-256), at-rest gzip (>1 KiB), `prune(policy)` (age + drop-oldest size cap + + orphan GC) / `RetentionPolicy` / `PruneSummary` / `DEFAULT_RETENTION` (7d/256MiB); + read paths transparent. bun 89→100. +- [ ] **Wave 2 — `observability-collector`:** call `store.prune()` on a cadence; + body inserts flow through the content-addressed path. +- [ ] On land: add the 4 glossary terms; (optional) host-bin env-override for + retention policy. + ## Open items -- **dedup / storage growth (deferred):** trace-body de-dup (D5 volume control + - `prefix.fingerprint`) + rotation/compression/retention - (`notes/observability-design.md` §6, D9). `cacheReadTokens` is the cheap dedup - signal; thin/fat split already built. +- **`prefix.fingerprint` / `warm|real` cache-bust attributes (deferred):** decoupled + from dedup by the content-addressed decision; also gated on cache-warming being + built (not yet) so `warm|real` can't be honestly stamped. Later cache-bust-debug + milestone (`notes/observability-design.md` §3.1, §12). +- **D9 analytics roll-ups (deferred):** rollup table shape + `GROUP BY` indexes + + retention asymmetry + periodic rollup job (`notes/observability-design.md` §2 D9, + §12). The scheduler mechanism (`host.scheduler.register`) already exists. - **D8 `prompt.assembly` segments:** deferred-by-design (await the context-filter chain). -- cgit v1.2.3