summaryrefslogtreecommitdiffhomepage
path: root/packages/trace-store
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-10 11:42:21 +0900
committerAdam Malczewski <[email protected]>2026-06-10 11:42:21 +0900
commit9e7554cde98f45df30dad1f9d356b6954138685b (patch)
tree0a8047a4951141fe0ed33b24787d2507266bd3c7 /packages/trace-store
parentc1b08acd121432fcf4fea2fc0b70521cdf9f0cf6 (diff)
downloaddispatch-9e7554cde98f45df30dad1f9d356b6954138685b.tar.gz
dispatch-9e7554cde98f45df30dad1f9d356b6954138685b.zip
trace-store: fix old-schema migration crash (found by live boot)
Wave 1 created idx_records_bodyHash BEFORE migrateOldBodies ran, so opening a pre-existing old-schema traces.db crashed the collector with 'no such column: bodyHash' (crash-looped 168x in ~20s). Fresh DBs hid it (CREATE TABLE already has bodyHash); only a real old-schema DB exposed it. - reorder schema(): migrateOldBodies (ALTER ADD bodyHash + content-address backfill + drop old bodies) runs BEFORE the bodyHash index. - add 3 regression tests that seed a real old-schema DB and open it. Live-verified: old-schema traces.db migrates on boot with 0 crashes; 318 body refs collapse to 270 content-addressed bodies; prune cadence fires cleanly. typecheck EXIT 0; biome clean; bun 106->109, 0 fail.
Diffstat (limited to 'packages/trace-store')
-rw-r--r--packages/trace-store/src/store.test.ts244
-rw-r--r--packages/trace-store/src/store.ts5
2 files changed, 246 insertions, 3 deletions
diff --git a/packages/trace-store/src/store.test.ts b/packages/trace-store/src/store.test.ts
index 10a033a..131d099 100644
--- a/packages/trace-store/src/store.test.ts
+++ b/packages/trace-store/src/store.test.ts
@@ -1,3 +1,5 @@
+import { Database } from "bun:sqlite";
+import { unlinkSync } from "node:fs";
import type { LogRecord } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
import { computeEvictions, createTraceStore, stableId } from "./store.js";
@@ -214,7 +216,6 @@ describe("createTraceStore", () => {
expect(result[0]?.kind).toBe("log");
store2.close();
- const { unlinkSync } = require("node:fs");
try {
unlinkSync(tmpPath);
} catch {
@@ -484,3 +485,244 @@ describe("computeEvictions", () => {
expect(evicted).toEqual(["a", "b"]);
});
});
+
+describe("old-schema migration", () => {
+ function tmpPath(): string {
+ return `/tmp/trace-store-migration-test-${Date.now()}-${Math.random().toString(36).slice(2)}.db`;
+ }
+
+ function cleanup(path: string): void {
+ try {
+ unlinkSync(path);
+ } catch {
+ // ignore
+ }
+ try {
+ unlinkSync(`${path}-wal`);
+ } catch {
+ // ignore
+ }
+ try {
+ unlinkSync(`${path}-shm`);
+ } catch {
+ // ignore
+ }
+ }
+
+ it("migrates a pre-existing old-schema DB (records without bodyHash + bodies keyed by recordId) on open without error", () => {
+ const path = tmpPath();
+ try {
+ const oldDb = new Database(path);
+ oldDb.run("PRAGMA journal_mode = WAL");
+ oldDb.run(`
+ CREATE TABLE records (
+ id TEXT PRIMARY KEY,
+ kind TEXT NOT NULL,
+ level TEXT,
+ msg TEXT,
+ name TEXT,
+ spanId TEXT,
+ parentSpanId TEXT,
+ conversationId TEXT,
+ turnId TEXT,
+ extensionId TEXT NOT NULL,
+ timestamp INTEGER NOT NULL,
+ durationMs INTEGER,
+ status TEXT,
+ attributes TEXT,
+ links TEXT
+ )
+ `);
+ oldDb.run(`
+ CREATE TABLE bodies (
+ recordId TEXT PRIMARY KEY REFERENCES records(id),
+ body TEXT NOT NULL
+ )
+ `);
+
+ oldDb.run(
+ `INSERT INTO records (id, kind, level, msg, name, spanId, parentSpanId, conversationId, turnId, extensionId, timestamp, durationMs, status, attributes, links)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ [
+ "rec-1",
+ "span-open",
+ null,
+ null,
+ "prompt",
+ "s1",
+ null,
+ "conv-1",
+ "t1",
+ "ext",
+ 1000,
+ null,
+ null,
+ null,
+ null,
+ ],
+ );
+ oldDb.run(
+ `INSERT INTO records (id, kind, level, msg, name, spanId, parentSpanId, conversationId, turnId, extensionId, timestamp, durationMs, status, attributes, links)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ [
+ "rec-2",
+ "span-open",
+ null,
+ null,
+ "prompt",
+ "s2",
+ null,
+ "conv-1",
+ "t1",
+ "ext",
+ 2000,
+ null,
+ null,
+ null,
+ null,
+ ],
+ );
+
+ const sharedBody = "identical body content for migration test";
+ oldDb.run("INSERT INTO bodies (recordId, body) VALUES (?, ?)", ["rec-1", sharedBody]);
+ oldDb.run("INSERT INTO bodies (recordId, body) VALUES (?, ?)", ["rec-2", sharedBody]);
+
+ oldDb.close();
+
+ const store = createTraceStore({ path });
+
+ const turn = store.getTurn("t1");
+ expect(turn).toHaveLength(2);
+ expect(turn[0]?.kind).toBe("span-open");
+ expect(turn[1]?.kind).toBe("span-open");
+
+ expect(store.getBody("rec-1")).toBe(sharedBody);
+ expect(store.getBody("rec-2")).toBe(sharedBody);
+
+ const db = new Database(path);
+ const bodyRows = db.query("SELECT hash FROM bodies").all() as Array<{ hash: string }>;
+ expect(bodyRows).toHaveLength(1);
+ db.close();
+
+ store.close();
+ } finally {
+ cleanup(path);
+ }
+ });
+
+ it("re-opening an already-migrated DB is a no-op (no error, no double-migrate)", () => {
+ const path = tmpPath();
+ try {
+ const oldDb = new Database(path);
+ oldDb.run("PRAGMA journal_mode = WAL");
+ oldDb.run(`
+ CREATE TABLE records (
+ id TEXT PRIMARY KEY,
+ kind TEXT NOT NULL,
+ level TEXT,
+ msg TEXT,
+ name TEXT,
+ spanId TEXT,
+ parentSpanId TEXT,
+ conversationId TEXT,
+ turnId TEXT,
+ extensionId NOT NULL,
+ timestamp INTEGER NOT NULL,
+ durationMs INTEGER,
+ status TEXT,
+ attributes TEXT,
+ links TEXT
+ )
+ `);
+ oldDb.run(`
+ CREATE TABLE bodies (
+ recordId TEXT PRIMARY KEY REFERENCES records(id),
+ body TEXT NOT NULL
+ )
+ `);
+ oldDb.run(
+ `INSERT INTO records (id, kind, level, msg, name, spanId, parentSpanId, conversationId, turnId, extensionId, timestamp, durationMs, status, attributes, links)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ [
+ "rec-1",
+ "span-open",
+ null,
+ null,
+ "prompt",
+ "s1",
+ null,
+ "conv-1",
+ "t1",
+ "ext",
+ 1000,
+ null,
+ null,
+ null,
+ null,
+ ],
+ );
+ oldDb.run("INSERT INTO bodies (recordId, body) VALUES (?, ?)", ["rec-1", "some body"]);
+ oldDb.close();
+
+ const store1 = createTraceStore({ path });
+ expect(store1.getBody("rec-1")).toBe("some body");
+ store1.close();
+
+ const store2 = createTraceStore({ path });
+ expect(store2.getBody("rec-1")).toBe("some body");
+
+ const turn = store2.getTurn("t1");
+ expect(turn).toHaveLength(1);
+
+ store2.close();
+ } finally {
+ cleanup(path);
+ }
+ });
+
+ it("idx_records_bodyHash exists after migration", () => {
+ const path = tmpPath();
+ try {
+ const oldDb = new Database(path);
+ oldDb.run("PRAGMA journal_mode = WAL");
+ oldDb.run(`
+ CREATE TABLE records (
+ id TEXT PRIMARY KEY,
+ kind TEXT NOT NULL,
+ level TEXT,
+ msg TEXT,
+ name TEXT,
+ spanId TEXT,
+ parentSpanId TEXT,
+ conversationId TEXT,
+ turnId TEXT,
+ extensionId TEXT NOT NULL,
+ timestamp INTEGER NOT NULL,
+ durationMs INTEGER,
+ status TEXT,
+ attributes TEXT,
+ links TEXT
+ )
+ `);
+ oldDb.run(`
+ CREATE TABLE bodies (
+ recordId TEXT PRIMARY KEY REFERENCES records(id),
+ body TEXT NOT NULL
+ )
+ `);
+ oldDb.close();
+
+ const store = createTraceStore({ path });
+ store.close();
+
+ const db = new Database(path);
+ const indexes = db
+ .query("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_records_bodyHash'")
+ .all() as Array<{ name: string }>;
+ expect(indexes).toHaveLength(1);
+ db.close();
+ } finally {
+ cleanup(path);
+ }
+ });
+});
diff --git a/packages/trace-store/src/store.ts b/packages/trace-store/src/store.ts
index b3f397e..f9564bb 100644
--- a/packages/trace-store/src/store.ts
+++ b/packages/trace-store/src/store.ts
@@ -82,7 +82,8 @@ function schema(db: Database): void {
db.run("CREATE INDEX IF NOT EXISTS idx_records_spanId ON records(spanId)");
db.run("CREATE INDEX IF NOT EXISTS idx_records_kind ON records(kind)");
db.run("CREATE INDEX IF NOT EXISTS idx_records_timestamp ON records(timestamp)");
- db.run("CREATE INDEX IF NOT EXISTS idx_records_bodyHash ON records(bodyHash)");
+
+ migrateOldBodies(db);
db.run(`
CREATE TABLE IF NOT EXISTS bodies (
@@ -94,7 +95,7 @@ function schema(db: Database): void {
)
`);
- migrateOldBodies(db);
+ db.run("CREATE INDEX IF NOT EXISTS idx_records_bodyHash ON records(bodyHash)");
}
function migrateOldBodies(db: Database): void {