diff options
| author | Adam Malczewski <[email protected]> | 2026-06-10 11:42:21 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-10 11:42:21 +0900 |
| commit | 9e7554cde98f45df30dad1f9d356b6954138685b (patch) | |
| tree | 0a8047a4951141fe0ed33b24787d2507266bd3c7 /packages/trace-store | |
| parent | c1b08acd121432fcf4fea2fc0b70521cdf9f0cf6 (diff) | |
| download | dispatch-9e7554cde98f45df30dad1f9d356b6954138685b.tar.gz dispatch-9e7554cde98f45df30dad1f9d356b6954138685b.zip | |
trace-store: fix old-schema migration crash (found by live boot)
Wave 1 created idx_records_bodyHash BEFORE migrateOldBodies ran, so opening a
pre-existing old-schema traces.db crashed the collector with 'no such column:
bodyHash' (crash-looped 168x in ~20s). Fresh DBs hid it (CREATE TABLE already
has bodyHash); only a real old-schema DB exposed it.
- reorder schema(): migrateOldBodies (ALTER ADD bodyHash + content-address
backfill + drop old bodies) runs BEFORE the bodyHash index.
- add 3 regression tests that seed a real old-schema DB and open it.
Live-verified: old-schema traces.db migrates on boot with 0 crashes; 318 body
refs collapse to 270 content-addressed bodies; prune cadence fires cleanly.
typecheck EXIT 0; biome clean; bun 106->109, 0 fail.
Diffstat (limited to 'packages/trace-store')
| -rw-r--r-- | packages/trace-store/src/store.test.ts | 244 | ||||
| -rw-r--r-- | packages/trace-store/src/store.ts | 5 |
2 files changed, 246 insertions, 3 deletions
diff --git a/packages/trace-store/src/store.test.ts b/packages/trace-store/src/store.test.ts index 10a033a..131d099 100644 --- a/packages/trace-store/src/store.test.ts +++ b/packages/trace-store/src/store.test.ts @@ -1,3 +1,5 @@ +import { Database } from "bun:sqlite"; +import { unlinkSync } from "node:fs"; import type { LogRecord } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; import { computeEvictions, createTraceStore, stableId } from "./store.js"; @@ -214,7 +216,6 @@ describe("createTraceStore", () => { expect(result[0]?.kind).toBe("log"); store2.close(); - const { unlinkSync } = require("node:fs"); try { unlinkSync(tmpPath); } catch { @@ -484,3 +485,244 @@ describe("computeEvictions", () => { expect(evicted).toEqual(["a", "b"]); }); }); + +describe("old-schema migration", () => { + function tmpPath(): string { + return `/tmp/trace-store-migration-test-${Date.now()}-${Math.random().toString(36).slice(2)}.db`; + } + + function cleanup(path: string): void { + try { + unlinkSync(path); + } catch { + // ignore + } + try { + unlinkSync(`${path}-wal`); + } catch { + // ignore + } + try { + unlinkSync(`${path}-shm`); + } catch { + // ignore + } + } + + it("migrates a pre-existing old-schema DB (records without bodyHash + bodies keyed by recordId) on open without error", () => { + const path = tmpPath(); + try { + const oldDb = new Database(path); + oldDb.run("PRAGMA journal_mode = WAL"); + oldDb.run(` + CREATE TABLE records ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + level TEXT, + msg TEXT, + name TEXT, + spanId TEXT, + parentSpanId TEXT, + conversationId TEXT, + turnId TEXT, + extensionId TEXT NOT NULL, + timestamp INTEGER NOT NULL, + durationMs INTEGER, + status TEXT, + attributes TEXT, + links TEXT + ) + `); + oldDb.run(` + CREATE TABLE bodies ( + recordId TEXT PRIMARY KEY REFERENCES records(id), + body TEXT NOT NULL + ) + `); + + oldDb.run( + `INSERT INTO records (id, kind, level, msg, name, spanId, parentSpanId, conversationId, turnId, extensionId, timestamp, durationMs, status, attributes, links) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + [ + "rec-1", + "span-open", + null, + null, + "prompt", + "s1", + null, + "conv-1", + "t1", + "ext", + 1000, + null, + null, + null, + null, + ], + ); + oldDb.run( + `INSERT INTO records (id, kind, level, msg, name, spanId, parentSpanId, conversationId, turnId, extensionId, timestamp, durationMs, status, attributes, links) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + [ + "rec-2", + "span-open", + null, + null, + "prompt", + "s2", + null, + "conv-1", + "t1", + "ext", + 2000, + null, + null, + null, + null, + ], + ); + + const sharedBody = "identical body content for migration test"; + oldDb.run("INSERT INTO bodies (recordId, body) VALUES (?, ?)", ["rec-1", sharedBody]); + oldDb.run("INSERT INTO bodies (recordId, body) VALUES (?, ?)", ["rec-2", sharedBody]); + + oldDb.close(); + + const store = createTraceStore({ path }); + + const turn = store.getTurn("t1"); + expect(turn).toHaveLength(2); + expect(turn[0]?.kind).toBe("span-open"); + expect(turn[1]?.kind).toBe("span-open"); + + expect(store.getBody("rec-1")).toBe(sharedBody); + expect(store.getBody("rec-2")).toBe(sharedBody); + + const db = new Database(path); + const bodyRows = db.query("SELECT hash FROM bodies").all() as Array<{ hash: string }>; + expect(bodyRows).toHaveLength(1); + db.close(); + + store.close(); + } finally { + cleanup(path); + } + }); + + it("re-opening an already-migrated DB is a no-op (no error, no double-migrate)", () => { + const path = tmpPath(); + try { + const oldDb = new Database(path); + oldDb.run("PRAGMA journal_mode = WAL"); + oldDb.run(` + CREATE TABLE records ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + level TEXT, + msg TEXT, + name TEXT, + spanId TEXT, + parentSpanId TEXT, + conversationId TEXT, + turnId TEXT, + extensionId NOT NULL, + timestamp INTEGER NOT NULL, + durationMs INTEGER, + status TEXT, + attributes TEXT, + links TEXT + ) + `); + oldDb.run(` + CREATE TABLE bodies ( + recordId TEXT PRIMARY KEY REFERENCES records(id), + body TEXT NOT NULL + ) + `); + oldDb.run( + `INSERT INTO records (id, kind, level, msg, name, spanId, parentSpanId, conversationId, turnId, extensionId, timestamp, durationMs, status, attributes, links) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + [ + "rec-1", + "span-open", + null, + null, + "prompt", + "s1", + null, + "conv-1", + "t1", + "ext", + 1000, + null, + null, + null, + null, + ], + ); + oldDb.run("INSERT INTO bodies (recordId, body) VALUES (?, ?)", ["rec-1", "some body"]); + oldDb.close(); + + const store1 = createTraceStore({ path }); + expect(store1.getBody("rec-1")).toBe("some body"); + store1.close(); + + const store2 = createTraceStore({ path }); + expect(store2.getBody("rec-1")).toBe("some body"); + + const turn = store2.getTurn("t1"); + expect(turn).toHaveLength(1); + + store2.close(); + } finally { + cleanup(path); + } + }); + + it("idx_records_bodyHash exists after migration", () => { + const path = tmpPath(); + try { + const oldDb = new Database(path); + oldDb.run("PRAGMA journal_mode = WAL"); + oldDb.run(` + CREATE TABLE records ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + level TEXT, + msg TEXT, + name TEXT, + spanId TEXT, + parentSpanId TEXT, + conversationId TEXT, + turnId TEXT, + extensionId TEXT NOT NULL, + timestamp INTEGER NOT NULL, + durationMs INTEGER, + status TEXT, + attributes TEXT, + links TEXT + ) + `); + oldDb.run(` + CREATE TABLE bodies ( + recordId TEXT PRIMARY KEY REFERENCES records(id), + body TEXT NOT NULL + ) + `); + oldDb.close(); + + const store = createTraceStore({ path }); + store.close(); + + const db = new Database(path); + const indexes = db + .query("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_records_bodyHash'") + .all() as Array<{ name: string }>; + expect(indexes).toHaveLength(1); + db.close(); + } finally { + cleanup(path); + } + }); +}); diff --git a/packages/trace-store/src/store.ts b/packages/trace-store/src/store.ts index b3f397e..f9564bb 100644 --- a/packages/trace-store/src/store.ts +++ b/packages/trace-store/src/store.ts @@ -82,7 +82,8 @@ function schema(db: Database): void { db.run("CREATE INDEX IF NOT EXISTS idx_records_spanId ON records(spanId)"); db.run("CREATE INDEX IF NOT EXISTS idx_records_kind ON records(kind)"); db.run("CREATE INDEX IF NOT EXISTS idx_records_timestamp ON records(timestamp)"); - db.run("CREATE INDEX IF NOT EXISTS idx_records_bodyHash ON records(bodyHash)"); + + migrateOldBodies(db); db.run(` CREATE TABLE IF NOT EXISTS bodies ( @@ -94,7 +95,7 @@ function schema(db: Database): void { ) `); - migrateOldBodies(db); + db.run("CREATE INDEX IF NOT EXISTS idx_records_bodyHash ON records(bodyHash)"); } function migrateOldBodies(db: Database): void { |
