summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-10 10:43:40 +0900
committerAdam Malczewski <[email protected]>2026-06-10 10:43:40 +0900
commit3a0cdd2c8453f059a746465e3aa6d9b5caa3b399 (patch)
treeb76133d2a593ccd510673a745e5912599f850696
parent80e14ab59732aabbf06035d13138500f133e921d (diff)
downloaddispatch-3a0cdd2c8453f059a746465e3aa6d9b5caa3b399.tar.gz
dispatch-3a0cdd2c8453f059a746465e3aa6d9b5caa3b399.zip
trace-store: content-addressed body dedup + retention/prune
Wave 1 of the dedup/storage-growth milestone (notes §12). - bodies table is now content-addressed (SHA-256 hash key); identical verbatim bodies (cache-warming resends, any repeat) collapse to one stored row, referenced by hash from records. Transparent to insert/read callers. - at-rest gzip compression for bodies >1 KiB (node:zlib), decompressed on read. - prune(policy): age-based delete + drop-oldest byte-cap eviction + orphan-body GC. Exports RetentionPolicy/PruneSummary/DEFAULT_RETENTION (7d / 256 MiB). typecheck EXIT 0; biome clean; vitest 576; bun 89->100, 0 fail.
-rw-r--r--notes/observability-design.md51
-rw-r--r--packages/trace-store/src/index.ts4
-rw-r--r--packages/trace-store/src/store.test.ts264
-rw-r--r--packages/trace-store/src/store.ts396
-rw-r--r--tasks.md28
5 files changed, 720 insertions, 23 deletions
diff --git a/notes/observability-design.md b/notes/observability-design.md
index 4a3515a..1b09973 100644
--- a/notes/observability-design.md
+++ b/notes/observability-design.md
@@ -635,3 +635,54 @@ waits. Per tick the collector:
durable; the collector consumes whenever it's up).
- **Open (Phase B):** poll vs fs-watch; interval/batch-size; offset storage (store
metadata vs sidecar); dedup key; + the store schema/indexes (§6).
+
+---
+
+## 12. Phase C — body de-dup + retention (DECIDED; building)
+
+> Resolves the §6 "Retention/rotation sizing" + "dedup key" open threads + the
+> tasks.md "dedup / storage growth" item. **User-gated decisions (this session):**
+> extend the existing pipeline (no new extension); scope = **de-dup + retention/
+> rotation** (D9 analytics roll-ups DEFERRED); dedup = **content-addressed bodies**.
+
+**The problem.** D5 verbatim provider-I/O capture stores large request/response
+bodies; cache-warming resends near-identical bodies every few minutes → the trace
+store grows without bound.
+
+**Decision — content-addressed bodies (supersedes the §3.1/D5 "fingerprint-gated
+persistence" sketch).** Dedup keys on the **body content hash**, NOT on
+`prefix.fingerprint`:
+- The store hashes each verbatim `body`, stores it **once** in a content-addressed
+ bodies table keyed by hash, and references it by hash from the span/record row.
+ Identical bodies (the cache-warming case, and any other repeat) collapse to one
+ stored copy. Robust against ALL duplicate bodies, not just prefix matches; and it
+ is **stateless** (no "prior fingerprint per conversation" bookkeeping).
+- This **decouples `prefix.fingerprint` from storage.** Fingerprint + `warm|real`
+ revert to their original job: queryable **cache-bust debugging** attributes (§3.1).
+ They are NOT needed for dedup/retention and are **deferred** to a later cache-bust-
+ debugging milestone — also because **cache-warming is not built yet**, so a request
+ cannot honestly be flagged `warm` vs `real` today (declare-reality, §extension brief).
+
+**Retention/rotation.** The store exposes `prune(policy)` enforcing a
+`RetentionPolicy` ({ maxAgeMs?, maxTotalBodyBytes? }): delete records/bodies past
+`maxAgeMs`; evict **oldest** bodies (drop-oldest) until under `maxTotalBodyBytes`;
+garbage-collect orphaned bodies (no remaining referencing row). Bodies above a size
+threshold are **compressed** at rest (gzip), transparently decompressed on read.
+Exports a `DEFAULT_RETENTION` constant.
+
+**Who triggers prune.** The **collector** (process 2) calls `store.prune(policy)` on
+a cadence in its existing tick loop — NOT a `scheduledJob` (the scheduler is for
+extensions; trace-store/collector are supporting packages). Retention policy values
+default to `DEFAULT_RETENTION`; host-bin env-override wiring is a deferred follow-up.
+
+**Units / waves.**
+- **Wave 1 — `trace-store`:** content-addressed body storage + compression +
+ `prune`/`RetentionPolicy`/`DEFAULT_RETENTION` on the `TraceStore` surface, read
+ paths transparent. (`bun:sqlite` → `bun test`.)
+- **Wave 2 — `observability-collector`:** call the new `prune` on a cadence;
+ confirm body inserts flow through the content-addressed path. Depends on Wave 1's
+ surface.
+
+**Deferred (still open):** D9 analytics roll-ups (§2 D9 / §6) — rollup table shape,
+`GROUP BY` indexes, retention asymmetry, the periodic rollup job. And the
+`prefix.fingerprint` / `warm|real` cache-bust attributes (above).
diff --git a/packages/trace-store/src/index.ts b/packages/trace-store/src/index.ts
index 7341305..17e206f 100644
--- a/packages/trace-store/src/index.ts
+++ b/packages/trace-store/src/index.ts
@@ -1,3 +1,3 @@
export { formatDuration, renderEasyView } from "./easy-view.js";
-export type { TraceStore } from "./store.js";
-export { createTraceStore, stableId } from "./store.js";
+export type { PruneSummary, RetentionPolicy, TraceStore } from "./store.js";
+export { createTraceStore, DEFAULT_RETENTION, stableId } from "./store.js";
diff --git a/packages/trace-store/src/store.test.ts b/packages/trace-store/src/store.test.ts
index e380147..10a033a 100644
--- a/packages/trace-store/src/store.test.ts
+++ b/packages/trace-store/src/store.test.ts
@@ -1,6 +1,6 @@
import type { LogRecord } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
-import { createTraceStore, stableId } from "./store.js";
+import { computeEvictions, createTraceStore, stableId } from "./store.js";
const logRecord: LogRecord = {
kind: "log",
@@ -221,4 +221,266 @@ describe("createTraceStore", () => {
// ignore cleanup error
}
});
+
+ it("body round-trips through getTurn", () => {
+ const store = freshStore();
+ store.insertRecords([bodyRecord]);
+ const result = store.getTurn("turn-1");
+ expect(result).toHaveLength(1);
+ expect(result[0]?.kind).toBe("span-open");
+ if (result[0]?.kind === "span-open") {
+ expect(result[0].body).toBe("the full prompt text");
+ }
+ store.close();
+ });
+});
+
+describe("content-addressed body storage", () => {
+ function freshStore() {
+ return createTraceStore({ path: ":memory:" });
+ }
+
+ it("content-addresses two identical bodies to a single stored body row", () => {
+ const store = freshStore();
+ const rec1: LogRecord = {
+ kind: "span-open",
+ spanId: "s1",
+ name: "prompt",
+ timestamp: 1000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: "identical body content",
+ };
+ const rec2: LogRecord = {
+ kind: "span-open",
+ spanId: "s2",
+ name: "prompt",
+ timestamp: 2000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: "identical body content",
+ };
+ store.insertRecords([rec1, rec2]);
+
+ const id1 = stableId(rec1);
+ const id2 = stableId(rec2);
+ expect(store.getBody(id1)).toBe("identical body content");
+ expect(store.getBody(id2)).toBe("identical body content");
+ store.close();
+ });
+
+ it("stores distinct bodies separately", () => {
+ const store = freshStore();
+ const rec1: LogRecord = {
+ kind: "span-open",
+ spanId: "s1",
+ name: "prompt",
+ timestamp: 1000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: "body A",
+ };
+ const rec2: LogRecord = {
+ kind: "span-open",
+ spanId: "s2",
+ name: "prompt",
+ timestamp: 2000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: "body B",
+ };
+ store.insertRecords([rec1, rec2]);
+
+ const id1 = stableId(rec1);
+ const id2 = stableId(rec2);
+ expect(store.getBody(id1)).toBe("body A");
+ expect(store.getBody(id2)).toBe("body B");
+ store.close();
+ });
+
+ it("compresses a body above the threshold and round-trips it on read", () => {
+ const store = freshStore();
+ const largeBody = "x".repeat(2048);
+ const rec: LogRecord = {
+ kind: "span-open",
+ spanId: "s1",
+ name: "prompt",
+ timestamp: 1000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: largeBody,
+ };
+ store.insertRecords([rec]);
+ const id = stableId(rec);
+ expect(store.getBody(id)).toBe(largeBody);
+ store.close();
+ });
+});
+
+describe("prune", () => {
+ function freshStore() {
+ return createTraceStore({ path: ":memory:" });
+ }
+
+ it("prune by maxAgeMs deletes records and their bodies older than the cutoff", () => {
+ const store = freshStore();
+ const oldRec: LogRecord = {
+ kind: "span-open",
+ spanId: "s-old",
+ name: "old-prompt",
+ timestamp: 1000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: "old body content",
+ };
+ const newRec: LogRecord = {
+ kind: "span-open",
+ spanId: "s-new",
+ name: "new-prompt",
+ timestamp: Date.now(),
+ extensionId: "ext",
+ turnId: "t2",
+ body: "new body content",
+ };
+ store.insertRecords([oldRec, newRec]);
+
+ const summary = store.prune({ maxAgeMs: 60000 });
+ expect(summary.recordsDeleted).toBe(1);
+
+ const result = store.getTurn("t1");
+ expect(result).toHaveLength(0);
+ expect(store.getBody(stableId(oldRec))).toBeUndefined();
+
+ const newResult = store.getTurn("t2");
+ expect(newResult).toHaveLength(1);
+ expect(store.getBody(stableId(newRec))).toBe("new body content");
+ store.close();
+ });
+
+ it("prune by maxTotalBodyBytes evicts oldest bodies until under the cap", () => {
+ const store = freshStore();
+ const body1 = "a".repeat(300);
+ const body2 = "b".repeat(300);
+ const body3 = "c".repeat(300);
+ const rec1: LogRecord = {
+ kind: "span-open",
+ spanId: "s1",
+ name: "p",
+ timestamp: 1000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: body1,
+ };
+ const rec2: LogRecord = {
+ kind: "span-open",
+ spanId: "s2",
+ name: "p",
+ timestamp: 2000,
+ extensionId: "ext",
+ turnId: "t2",
+ body: body2,
+ };
+ const rec3: LogRecord = {
+ kind: "span-open",
+ spanId: "s3",
+ name: "p",
+ timestamp: 3000,
+ extensionId: "ext",
+ turnId: "t3",
+ body: body3,
+ };
+ store.insertRecords([rec1, rec2, rec3]);
+
+ const summary = store.prune({ maxTotalBodyBytes: 500 });
+ expect(summary.bodiesDeleted).toBeGreaterThanOrEqual(1);
+
+ expect(store.getBody(stableId(rec1))).toBeUndefined();
+
+ const remaining = store.getTurn("t3");
+ if (remaining.length > 0 && remaining[0]?.kind === "span-open") {
+ expect(remaining[0].body).toBe(body3);
+ }
+ store.close();
+ });
+
+ it("prune garbage-collects an orphaned body with no referencing record", () => {
+ const store = freshStore();
+ const rec: LogRecord = {
+ kind: "span-open",
+ spanId: "s1",
+ name: "p",
+ timestamp: 1000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: "orphan body",
+ };
+ store.insertRecords([rec]);
+
+ const summary = store.prune({ maxAgeMs: 60000 });
+ expect(summary.recordsDeleted).toBe(1);
+ expect(summary.bodiesDeleted).toBe(1);
+ store.close();
+ });
+
+ it("prune keeps a still-referenced body when a duplicate referrer remains", () => {
+ const store = freshStore();
+ const sharedBody = "shared body content";
+ const rec1: LogRecord = {
+ kind: "span-open",
+ spanId: "s1",
+ name: "p",
+ timestamp: 1000,
+ extensionId: "ext",
+ turnId: "t1",
+ body: sharedBody,
+ };
+ const rec2: LogRecord = {
+ kind: "span-open",
+ spanId: "s2",
+ name: "p",
+ timestamp: Date.now(),
+ extensionId: "ext",
+ turnId: "t2",
+ body: sharedBody,
+ };
+ store.insertRecords([rec1, rec2]);
+
+ const summary = store.prune({ maxAgeMs: 60000 });
+ expect(summary.recordsDeleted).toBe(1);
+ expect(summary.bodiesDeleted).toBe(0);
+
+ const id2 = stableId(rec2);
+ expect(store.getBody(id2)).toBe(sharedBody);
+ store.close();
+ });
+});
+
+describe("computeEvictions", () => {
+ it("returns empty when under cap", () => {
+ const bodies = [
+ { hash: "a", storedSize: 100, oldestRecordTimestamp: 1000 },
+ { hash: "b", storedSize: 200, oldestRecordTimestamp: 2000 },
+ ];
+ expect(computeEvictions(bodies, 500)).toEqual([]);
+ });
+
+ it("evicts oldest bodies until under cap", () => {
+ const bodies = [
+ { hash: "a", storedSize: 300, oldestRecordTimestamp: 1000 },
+ { hash: "b", storedSize: 300, oldestRecordTimestamp: 2000 },
+ { hash: "c", storedSize: 300, oldestRecordTimestamp: 3000 },
+ ];
+ const evicted = computeEvictions(bodies, 500);
+ expect(evicted).toEqual(["a", "b"]);
+ });
+
+ it("evicts multiple oldest bodies", () => {
+ const bodies = [
+ { hash: "a", storedSize: 200, oldestRecordTimestamp: 1000 },
+ { hash: "b", storedSize: 200, oldestRecordTimestamp: 2000 },
+ { hash: "c", storedSize: 200, oldestRecordTimestamp: 3000 },
+ ];
+ const evicted = computeEvictions(bodies, 300);
+ expect(evicted).toEqual(["a", "b"]);
+ });
});
diff --git a/packages/trace-store/src/store.ts b/packages/trace-store/src/store.ts
index 53b7390..b3f397e 100644
--- a/packages/trace-store/src/store.ts
+++ b/packages/trace-store/src/store.ts
@@ -1,12 +1,32 @@
import { Database } from "bun:sqlite";
+import { gunzipSync, gzipSync } from "node:zlib";
import type { Attributes, LogRecord, SpanLink } from "@dispatch/kernel";
import { renderEasyView } from "./easy-view.js";
+const COMPRESS_THRESHOLD_BYTES = 1024;
+
+export interface RetentionPolicy {
+ readonly maxAgeMs?: number;
+ readonly maxTotalBodyBytes?: number;
+}
+
+export const DEFAULT_RETENTION: Required<RetentionPolicy> = {
+ maxAgeMs: 7 * 24 * 60 * 60 * 1000,
+ maxTotalBodyBytes: 256 * 1024 * 1024,
+};
+
+export interface PruneSummary {
+ recordsDeleted: number;
+ bodiesDeleted: number;
+ bytesReclaimed: number;
+}
+
export interface TraceStore {
insertRecords(records: readonly LogRecord[]): void;
getTurn(turnId: string): LogRecord[];
getBody(recordId: string): string | undefined;
easyView(turnId: string): string;
+ prune(policy: RetentionPolicy): PruneSummary;
close(): void;
}
@@ -27,6 +47,9 @@ export function createTraceStore(opts: { path: string }): TraceStore {
easyView(turnId) {
return renderEasyView(getTurn(db, turnId));
},
+ prune(policy) {
+ return prune(db, policy);
+ },
close() {
db.close();
},
@@ -50,7 +73,8 @@ function schema(db: Database): void {
durationMs INTEGER,
status TEXT,
attributes TEXT,
- links TEXT
+ links TEXT,
+ bodyHash TEXT
)
`);
db.run("CREATE INDEX IF NOT EXISTS idx_records_turnId ON records(turnId)");
@@ -58,13 +82,239 @@ function schema(db: Database): void {
db.run("CREATE INDEX IF NOT EXISTS idx_records_spanId ON records(spanId)");
db.run("CREATE INDEX IF NOT EXISTS idx_records_kind ON records(kind)");
db.run("CREATE INDEX IF NOT EXISTS idx_records_timestamp ON records(timestamp)");
+ db.run("CREATE INDEX IF NOT EXISTS idx_records_bodyHash ON records(bodyHash)");
db.run(`
CREATE TABLE IF NOT EXISTS bodies (
- recordId TEXT PRIMARY KEY REFERENCES records(id),
- body TEXT NOT NULL
+ hash TEXT PRIMARY KEY,
+ body BLOB NOT NULL,
+ isCompressed INTEGER NOT NULL DEFAULT 0,
+ originalSize INTEGER NOT NULL,
+ storedSize INTEGER NOT NULL
)
`);
+
+ migrateOldBodies(db);
+}
+
+function migrateOldBodies(db: Database): void {
+ const hasOldTable = db
+ .query("SELECT name FROM sqlite_master WHERE type='table' AND name='bodies_old'")
+ .get() as { name: string } | null;
+ if (hasOldTable !== null) {
+ return;
+ }
+
+ const cols = db.query("PRAGMA table_info(bodies)").all() as Array<{
+ name: string;
+ }>;
+ const hasRecordId = cols.some((c) => c.name === "recordId");
+ if (!hasRecordId) {
+ return;
+ }
+
+ const oldRows = db.query("SELECT recordId, body FROM bodies").all() as Array<{
+ recordId: string;
+ body: string;
+ }>;
+
+ db.run("ALTER TABLE bodies RENAME TO bodies_old");
+
+ db.run(`
+ CREATE TABLE IF NOT EXISTS bodies (
+ hash TEXT PRIMARY KEY,
+ body BLOB NOT NULL,
+ isCompressed INTEGER NOT NULL DEFAULT 0,
+ originalSize INTEGER NOT NULL,
+ storedSize INTEGER NOT NULL
+ )
+ `);
+
+ const hasBodyHash = cols.some((c) => c.name === "bodyHash");
+ if (!hasBodyHash) {
+ db.run("ALTER TABLE records ADD COLUMN bodyHash TEXT");
+ }
+
+ const upsertBody = db.prepare(`
+ INSERT OR IGNORE INTO bodies (hash, body, isCompressed, originalSize, storedSize)
+ VALUES (?, ?, 0, ?, ?)
+ `);
+ const updateRecord = db.prepare("UPDATE records SET bodyHash = ? WHERE id = ?");
+
+ const migrateTxn = db.transaction(() => {
+ for (const row of oldRows) {
+ const hash = contentHash(row.body);
+ const bodyBytes = new TextEncoder().encode(row.body);
+ upsertBody.run(hash, bodyBytes, bodyBytes.length, bodyBytes.length);
+ updateRecord.run(hash, row.recordId);
+ }
+ db.run("DROP TABLE bodies_old");
+ });
+ migrateTxn();
+}
+
+function sha256Hex(input: string): string {
+ const data = new TextEncoder().encode(input);
+ let h0 = 0x6a09e667;
+ let h1 = 0xbb67ae85;
+ let h2 = 0x3c6ef372;
+ let h3 = 0xa54ff53a;
+ let h4 = 0x510e527f;
+ let h5 = 0x9b05688c;
+ let h6 = 0x1f83d9ab;
+ let h7 = 0x5be0cd19;
+
+ const msgLen = data.length;
+ const bitLen = msgLen * 8;
+ const withOne = msgLen + 1;
+ const paddedLen = withOne + ((96 - (withOne % 64)) % 64) + 8;
+ const padded = new Uint8Array(paddedLen);
+ padded.set(data);
+ padded[msgLen] = 0x80;
+ padded[paddedLen - 8] = (bitLen / 0x100000000) >>> 0;
+ padded[paddedLen - 4] = bitLen >>> 0;
+
+ const kArr = new Uint32Array(K);
+
+ for (let offset = 0; offset < paddedLen; offset += 64) {
+ const w = new Uint32Array(64);
+ for (let i = 0; i < 16; i++) {
+ const o = offset + i * 4;
+ const b0 = padded[o] ?? 0;
+ const b1 = padded[o + 1] ?? 0;
+ const b2 = padded[o + 2] ?? 0;
+ const b3 = padded[o + 3] ?? 0;
+ w[i] = (b0 << 24) | (b1 << 16) | (b2 << 8) | b3;
+ }
+ for (let i = 16; i < 64; i++) {
+ const prev15 = w[i - 15] ?? 0;
+ const prev2 = w[i - 2] ?? 0;
+ const prev16 = w[i - 16] ?? 0;
+ const prev7 = w[i - 7] ?? 0;
+ const s0 = rightRotate(prev15, 7) ^ rightRotate(prev15, 18) ^ (prev15 >>> 3);
+ const s1 = rightRotate(prev2, 17) ^ rightRotate(prev2, 19) ^ (prev2 >>> 10);
+ w[i] = (prev16 + s0 + prev7 + s1) | 0;
+ }
+
+ let a = h0;
+ let b = h1;
+ let c = h2;
+ let d = h3;
+ let e = h4;
+ let f = h5;
+ let g = h6;
+ let h = h7;
+
+ for (let i = 0; i < 64; i++) {
+ const S1 = rightRotate(e, 6) ^ rightRotate(e, 11) ^ rightRotate(e, 25);
+ const ch = (e & f) ^ (~e & g);
+ const ki = kArr[i] ?? 0;
+ const wi = w[i] ?? 0;
+ const temp1 = (h + S1 + ch + ki + wi) | 0;
+ const S0 = rightRotate(a, 2) ^ rightRotate(a, 13) ^ rightRotate(a, 22);
+ const maj = (a & b) ^ (a & c) ^ (b & c);
+ const temp2 = (S0 + maj) | 0;
+
+ h = g;
+ g = f;
+ f = e;
+ e = (d + temp1) | 0;
+ d = c;
+ c = b;
+ b = a;
+ a = (temp1 + temp2) | 0;
+ }
+
+ h0 = (h0 + a) | 0;
+ h1 = (h1 + b) | 0;
+ h2 = (h2 + c) | 0;
+ h3 = (h3 + d) | 0;
+ h4 = (h4 + e) | 0;
+ h5 = (h5 + f) | 0;
+ h6 = (h6 + g) | 0;
+ h7 = (h7 + h) | 0;
+ }
+
+ return (
+ toHex32(h0) +
+ toHex32(h1) +
+ toHex32(h2) +
+ toHex32(h3) +
+ toHex32(h4) +
+ toHex32(h5) +
+ toHex32(h6) +
+ toHex32(h7)
+ );
+}
+
+function rightRotate(x: number, n: number): number {
+ return ((x >>> n) | (x << (32 - n))) >>> 0;
+}
+
+function toHex32(n: number): string {
+ return (n >>> 0).toString(16).padStart(8, "0");
+}
+
+const K = [
+ 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
+ 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
+ 0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da,
+ 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967,
+ 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
+ 0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
+ 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
+ 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2,
+];
+
+function contentHash(body: string): string {
+ return sha256Hex(body);
+}
+
+function compressBody(body: string): { stored: Uint8Array; isCompressed: boolean } {
+ const raw = new TextEncoder().encode(body);
+ if (raw.length <= COMPRESS_THRESHOLD_BYTES) {
+ return { stored: raw, isCompressed: false };
+ }
+ const compressed = gzipSync(raw);
+ if (compressed.length >= raw.length) {
+ return { stored: raw, isCompressed: false };
+ }
+ return { stored: compressed, isCompressed: true };
+}
+
+function decompressBody(stored: Uint8Array, isCompressed: boolean): string {
+ if (!isCompressed) {
+ return new TextDecoder().decode(stored);
+ }
+ const decompressed = gunzipSync(stored);
+ return new TextDecoder().decode(decompressed);
+}
+
+function storeBody(db: Database, body: string): string {
+ const hash = contentHash(body);
+ const existing = db.query("SELECT 1 FROM bodies WHERE hash = ?").get(hash) as unknown;
+ if (existing !== null) {
+ return hash;
+ }
+ const { stored, isCompressed } = compressBody(body);
+ db.prepare(
+ "INSERT OR IGNORE INTO bodies (hash, body, isCompressed, originalSize, storedSize) VALUES (?, ?, ?, ?, ?)",
+ ).run(hash, stored, isCompressed ? 1 : 0, new TextEncoder().encode(body).length, stored.length);
+ return hash;
+}
+
+function resolveBody(db: Database, hash: string | null): string | undefined {
+ if (hash === null) {
+ return undefined;
+ }
+ const row = db.query("SELECT body, isCompressed FROM bodies WHERE hash = ?").get(hash) as {
+ body: Uint8Array;
+ isCompressed: number;
+ } | null;
+ if (row === undefined || row === null) {
+ return undefined;
+ }
+ return decompressBody(row.body, row.isCompressed === 1);
}
function insertRecords(db: Database, records: readonly LogRecord[]): void {
@@ -72,11 +322,10 @@ function insertRecords(db: Database, records: readonly LogRecord[]): void {
INSERT OR IGNORE INTO records
(id, kind, level, msg, name, spanId, parentSpanId,
conversationId, turnId, extensionId, timestamp,
- durationMs, status, attributes, links)
+ durationMs, status, attributes, links, bodyHash)
VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
- const bodyStmt = db.prepare("INSERT OR IGNORE INTO bodies (recordId, body) VALUES (?, ?)");
const txn = db.transaction(() => {
for (const r of records) {
@@ -117,6 +366,11 @@ function insertRecords(db: Database, records: readonly LogRecord[]): void {
const attributes: string | null =
r.attributes !== undefined ? JSON.stringify(r.attributes) : null;
+ let bodyHash: string | null = null;
+ if (r.body !== undefined) {
+ bodyHash = storeBody(db, r.body);
+ }
+
recStmt.run(
id,
kind,
@@ -133,11 +387,8 @@ function insertRecords(db: Database, records: readonly LogRecord[]): void {
status,
attributes,
links,
+ bodyHash,
);
-
- if (r.body !== undefined) {
- bodyStmt.run(id, r.body);
- }
}
});
txn();
@@ -159,26 +410,31 @@ interface RecordRow {
status: string | null;
attributes: string | null;
links: string | null;
+ bodyHash: string | null;
}
function getTurn(db: Database, turnId: string): LogRecord[] {
const rows = db
.query("SELECT * FROM records WHERE turnId = ? ORDER BY timestamp ASC, rowid ASC")
.all(turnId) as RecordRow[];
- return rows.map(rowToRecord);
+ return rows.map((row) => rowToRecord(db, row));
}
function getBody(db: Database, recordId: string): string | undefined {
- const row = db.query("SELECT body FROM bodies WHERE recordId = ?").get(recordId) as {
- body: string;
+ const row = db.query("SELECT bodyHash FROM records WHERE id = ?").get(recordId) as {
+ bodyHash: string | null;
} | null;
- return row?.body;
+ if (row === undefined || row === null || row.bodyHash === null) {
+ return undefined;
+ }
+ return resolveBody(db, row.bodyHash);
}
-function rowToRecord(row: RecordRow): LogRecord {
+function rowToRecord(db: Database, row: RecordRow): LogRecord {
const attributes: Attributes | undefined =
row.attributes !== null ? JSON.parse(row.attributes) : undefined;
const links: SpanLink[] | undefined = row.links !== null ? JSON.parse(row.links) : undefined;
+ const body: string | undefined = resolveBody(db, row.bodyHash);
if (row.kind === "log") {
const record: LogRecord = {
@@ -192,6 +448,7 @@ function rowToRecord(row: RecordRow): LogRecord {
...(row.spanId !== null && { spanId: row.spanId }),
...(row.parentSpanId !== null && { parentSpanId: row.parentSpanId }),
...(attributes !== undefined && { attributes }),
+ ...(body !== undefined && { body }),
};
return record;
}
@@ -208,6 +465,7 @@ function rowToRecord(row: RecordRow): LogRecord {
...(row.parentSpanId !== null && { parentSpanId: row.parentSpanId }),
...(attributes !== undefined && { attributes }),
...(links !== undefined && { links }),
+ ...(body !== undefined && { body }),
};
return record;
}
@@ -225,10 +483,118 @@ function rowToRecord(row: RecordRow): LogRecord {
...(row.parentSpanId !== null && { parentSpanId: row.parentSpanId }),
...(attributes !== undefined && { attributes }),
...(links !== undefined && { links }),
+ ...(body !== undefined && { body }),
};
return record;
}
+interface BodyRow {
+ hash: string;
+ storedSize: number;
+}
+
+interface BodyWithTimestamp extends BodyRow {
+ oldestRecordTimestamp: number;
+}
+
+export function computeEvictions(
+ bodies: readonly BodyWithTimestamp[],
+ maxTotalBodyBytes: number,
+): string[] {
+ let totalBytes = 0;
+ for (const b of bodies) {
+ totalBytes += b.storedSize;
+ }
+ if (totalBytes <= maxTotalBodyBytes) {
+ return [];
+ }
+
+ const sorted = [...bodies].sort((a, b) => a.oldestRecordTimestamp - b.oldestRecordTimestamp);
+ const evict: string[] = [];
+ let remaining = totalBytes;
+ for (const b of sorted) {
+ if (remaining <= maxTotalBodyBytes) {
+ break;
+ }
+ evict.push(b.hash);
+ remaining -= b.storedSize;
+ }
+ return evict;
+}
+
+function prune(db: Database, policy: RetentionPolicy): PruneSummary {
+ let recordsDeleted = 0;
+ let bodiesDeleted = 0;
+ let bytesReclaimed = 0;
+
+ const now = Date.now();
+
+ if (policy.maxAgeMs !== undefined) {
+ const cutoff = now - policy.maxAgeMs;
+ const oldRecords = db
+ .query("SELECT id, bodyHash FROM records WHERE timestamp < ?")
+ .all(cutoff) as Array<{ id: string; bodyHash: string | null }>;
+
+ if (oldRecords.length > 0) {
+ const bodyHashes = oldRecords.map((r) => r.bodyHash).filter((h): h is string => h !== null);
+
+ const deleteTxn = db.transaction(() => {
+ db.prepare("DELETE FROM records WHERE timestamp < ?").run(cutoff);
+ for (const hash of bodyHashes) {
+ const refCount = db
+ .query("SELECT COUNT(*) as cnt FROM records WHERE bodyHash = ?")
+ .get(hash) as { cnt: number };
+ if (refCount.cnt === 0) {
+ const bodyRow = db.query("SELECT storedSize FROM bodies WHERE hash = ?").get(hash) as {
+ storedSize: number;
+ } | null;
+ if (bodyRow !== undefined && bodyRow !== null) {
+ bytesReclaimed += bodyRow.storedSize;
+ }
+ db.prepare("DELETE FROM bodies WHERE hash = ?").run(hash);
+ bodiesDeleted++;
+ }
+ }
+ });
+ deleteTxn();
+ recordsDeleted = oldRecords.length;
+ }
+ }
+
+ if (policy.maxTotalBodyBytes !== undefined) {
+ const bodyRows = db
+ .query(`
+ SELECT b.hash, b.storedSize, MIN(r.timestamp) as oldestRecordTimestamp
+ FROM bodies b
+ JOIN records r ON r.bodyHash = b.hash
+ GROUP BY b.hash
+ `)
+ .all() as Array<{ hash: string; storedSize: number; oldestRecordTimestamp: number }>;
+
+ const toEvict = computeEvictions(bodyRows, policy.maxTotalBodyBytes);
+
+ if (toEvict.length > 0) {
+ const evictTxn = db.transaction(() => {
+ for (const hash of toEvict) {
+ const bodyRow = db.query("SELECT storedSize FROM bodies WHERE hash = ?").get(hash) as {
+ storedSize: number;
+ } | null;
+ if (bodyRow !== undefined && bodyRow !== null) {
+ bytesReclaimed += bodyRow.storedSize;
+ }
+ db.prepare("DELETE FROM records WHERE bodyHash = ?").run(hash);
+ db.prepare("DELETE FROM bodies WHERE hash = ?").run(hash);
+ bodiesDeleted++;
+ }
+ });
+ evictTxn();
+ recordsDeleted += toEvict.length;
+ }
+ }
+
+ return { recordsDeleted, bodiesDeleted, bytesReclaimed };
+}
+
function toCanonicalJson(value: unknown): string {
if (value === null || typeof value !== "object") {
return JSON.stringify(value);
diff --git a/tasks.md b/tasks.md
index 4d7da4a..d86086a 100644
--- a/tasks.md
+++ b/tasks.md
@@ -5,7 +5,7 @@
> Keep this lean and current; do not let it re-accrete a step-by-step changelog.
## Status (current)
-`tsc -b` EXIT 0 · biome clean · **546 vitest + 89 bun = 635 tests**.
+`tsc -b` EXIT 0 · biome clean · **576 vitest + 89 bun = 665 tests**.
Built and verified live (full-fidelity: every feature is a manifest-loaded
extension through the host):
@@ -68,11 +68,29 @@ server/collector procs poison the next run's counts.
- [x] **FE courier handoff** written: `frontend-metrics-pass2-handoff.md` (in
this repo; user couriers to `../dispatch-web`; ORCHESTRATOR §7).
+## dedup / storage growth (current milestone — building)
+Design DECIDED + recorded: `notes/observability-design.md` §12. User-gated calls:
+extend existing pipeline (no new ext); scope = **de-dup + retention/rotation**
+(D9 roll-ups deferred); dedup = **content-addressed bodies** (body-hash, NOT
+fingerprint-gated). Glossary terms approved (add on land): deduplication /
+content-addressed body, prefix fingerprint, warm vs real, retention / rotation.
+- [x] **Wave 1 — `trace-store`** (done): content-addressed `bodies` table
+ (SHA-256), at-rest gzip (>1 KiB), `prune(policy)` (age + drop-oldest size cap +
+ orphan GC) / `RetentionPolicy` / `PruneSummary` / `DEFAULT_RETENTION` (7d/256MiB);
+ read paths transparent. bun 89→100.
+- [ ] **Wave 2 — `observability-collector`:** call `store.prune()` on a cadence;
+ body inserts flow through the content-addressed path.
+- [ ] On land: add the 4 glossary terms; (optional) host-bin env-override for
+ retention policy.
+
## Open items
-- **dedup / storage growth (deferred):** trace-body de-dup (D5 volume control +
- `prefix.fingerprint`) + rotation/compression/retention
- (`notes/observability-design.md` §6, D9). `cacheReadTokens` is the cheap dedup
- signal; thin/fat split already built.
+- **`prefix.fingerprint` / `warm|real` cache-bust attributes (deferred):** decoupled
+ from dedup by the content-addressed decision; also gated on cache-warming being
+ built (not yet) so `warm|real` can't be honestly stamped. Later cache-bust-debug
+ milestone (`notes/observability-design.md` §3.1, §12).
+- **D9 analytics roll-ups (deferred):** rollup table shape + `GROUP BY` indexes +
+ retention asymmetry + periodic rollup job (`notes/observability-design.md` §2 D9,
+ §12). The scheduler mechanism (`host.scheduler.register`) already exists.
- **D8 `prompt.assembly` segments:** deferred-by-design (await the context-filter
chain).