summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-10 10:58:42 +0900
committerAdam Malczewski <[email protected]>2026-06-10 10:58:42 +0900
commitc1b08acd121432fcf4fea2fc0b70521cdf9f0cf6 (patch)
tree73a94a0b2c923a123f0fcb1ebc7b080f1ce635d6
parent3a0cdd2c8453f059a746465e3aa6d9b5caa3b399 (diff)
downloaddispatch-c1b08acd121432fcf4fea2fc0b70521cdf9f0cf6.tar.gz
dispatch-c1b08acd121432fcf4fea2fc0b70521cdf9f0cf6.zip
observability-collector: drive trace-store prune on a cadence
Wave 2 (final) of the dedup/storage-growth milestone (notes §12). - pure shouldPrune(now,lastPruneAt,intervalMs) cadence helper (injected clock). - main.ts calls store.prune(DEFAULT_RETENTION) on a coarse cadence (--prune-interval-ms, default 60s; host-bin-overridable), far less frequent than a drain. Prune errors are logged and never stop the tail loop. - confirmed body inserts flow through trace-store's content-addressed path. - glossary: content-addressed body, trace retention, prefix fingerprint, warm vs real. typecheck EXIT 0; biome clean; vitest 576; bun 100->106, 0 fail.
-rw-r--r--GLOSSARY.md4
-rw-r--r--packages/observability-collector/src/collector.test.ts162
-rw-r--r--packages/observability-collector/src/collector.ts14
-rw-r--r--packages/observability-collector/src/index.ts3
-rw-r--r--packages/observability-collector/src/main.ts35
-rw-r--r--tasks.md31
6 files changed, 223 insertions, 26 deletions
diff --git a/GLOSSARY.md b/GLOSSARY.md
index 845b45f..39e618f 100644
--- a/GLOSSARY.md
+++ b/GLOSSARY.md
@@ -26,6 +26,10 @@
| **chunk** | One ordered piece of a message (text, thinking, tool-call/result, etc.), append-only in the log. | block, segment |
| **seq** | The monotonic, gap-free, per-conversation sequence number stamped on each chunk as it is appended to the log. The sync cursor: a client requests `?sinceSeq=N` to fetch only newer chunks. Storage/sync metadata, never message content. | cursor (when meaning the number), offset, index |
| **StoredChunk** | The wire envelope `{ seq, role, chunk }`: a persisted chunk plus its sync metadata. Keeps the pure `chunk` free of storage concerns while a flat seq-ordered stream stays both syncable and regroupable into messages. | seq'd chunk |
+| **content-addressed body** | A verbatim trace `body` stored once in the trace store keyed by its content hash (SHA-256); duplicate bodies (cache-warming resends, any repeat) collapse to a single stored row referenced by hash. The trace store's de-duplication mechanism. | (not fingerprint-keyed) |
+| **trace retention** | The trace store's `prune(policy)` pass that bounds storage growth: age-based delete + drop-oldest byte-cap eviction of records/bodies + orphaned-body GC. "Rotation" is one part of retention, not a separate thing. | rotation (as a separate concept) |
+| **prefix fingerprint** | A hash of a provider request's cacheable prefix (up to the `cache_control` breakpoint), stamped as a queryable attribute to flag a prompt-cache bust (the fingerprint changed unexpectedly between a warm and a real send). A cache-bust DEBUGGING signal — NOT the body-dedup key. | — |
+| **warm vs real (request)** | A `provider.request` flagged `warm` (a periodic rewound resend to keep the prompt cache warm, ≠ `wake`) vs `real` (user-driven). | reheat, cache reheating |
| **runTurn** | The kernel's turn loop: takes provider + messages + tools + dispatch policy, streams, dispatches tools, emits events. | run, agentLoop |
| **hook** | A typed extension point. **event** = fire-and-forget, N listeners, error-isolated. **filter** = ordered value-in→value-out chain, in-band. | callback (when meaning a hook), listener |
| **service** | A single-responder request/response capability fetched via a typed handle. NOT a hook. | — |
diff --git a/packages/observability-collector/src/collector.test.ts b/packages/observability-collector/src/collector.test.ts
index 51b3e42..1dd03d4 100644
--- a/packages/observability-collector/src/collector.test.ts
+++ b/packages/observability-collector/src/collector.test.ts
@@ -2,9 +2,10 @@ import { mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { LogRecord } from "@dispatch/kernel";
-import { createTraceStore } from "@dispatch/trace-store";
+import type { TraceStore } from "@dispatch/trace-store";
+import { createTraceStore, DEFAULT_RETENTION } from "@dispatch/trace-store";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
-import { drainOnce, readOffset, splitLines, writeOffset } from "./collector.js";
+import { drainOnce, readOffset, shouldPrune, splitLines, writeOffset } from "./collector.js";
// --- Fixtures ---
@@ -280,3 +281,160 @@ describe("readOffset / writeOffset", () => {
expect(readOffset(path)).toBe(0);
});
});
+
+// --- shouldPrune (pure) ---
+
+describe("shouldPrune", () => {
+ it("shouldPrune is false before the interval elapses", () => {
+ expect(shouldPrune(1000, 1000, 60_000)).toBe(false);
+ expect(shouldPrune(59_999, 1000, 60_000)).toBe(false);
+ });
+
+ it("shouldPrune is true once the interval has elapsed", () => {
+ expect(shouldPrune(61_000, 1000, 60_000)).toBe(true);
+ expect(shouldPrune(70_000, 1000, 60_000)).toBe(true);
+ });
+});
+
+// --- Prune integration (real in-memory trace-store + injected clock) ---
+
+function runCollectorTicks(opts: {
+ store: TraceStore;
+ journalPath: string;
+ pruneIntervalMs: number;
+ now: number;
+ tickCount: number;
+ clockAdvancePerTick: number;
+}): { pruneCalls: number; now: number } {
+ const {
+ store,
+ journalPath,
+ pruneIntervalMs,
+ now: startNow,
+ tickCount,
+ clockAdvancePerTick,
+ } = opts;
+ let now = startNow;
+ let lastPruneAt = now;
+ let pruneCalls = 0;
+
+ for (let i = 0; i < tickCount; i++) {
+ drainOnce({ journalPath, offset: 0, store });
+ if (shouldPrune(now, lastPruneAt, pruneIntervalMs)) {
+ store.prune(DEFAULT_RETENTION);
+ pruneCalls++;
+ lastPruneAt = now;
+ }
+ now += clockAdvancePerTick;
+ }
+
+ return { pruneCalls, now };
+}
+
+describe("prune integration", () => {
+ it("collector invokes store.prune once after the prune interval elapses", () => {
+ const journalPath = join(tmpDir, "journal.log");
+ writeFileSync(journalPath, "");
+
+ const store = createTraceStore({ path: ":memory:" });
+ const result = runCollectorTicks({
+ store,
+ journalPath,
+ pruneIntervalMs: 60_000,
+ now: 1000,
+ tickCount: 13_000,
+ clockAdvancePerTick: 5,
+ });
+
+ expect(result.pruneCalls).toBe(1);
+ store.close();
+ });
+
+ it("collector does not prune on every drain", () => {
+ const journalPath = join(tmpDir, "journal.log");
+ writeFileSync(journalPath, "");
+
+ const store = createTraceStore({ path: ":memory:" });
+ const result = runCollectorTicks({
+ store,
+ journalPath,
+ pruneIntervalMs: 60_000,
+ now: 1000,
+ tickCount: 100,
+ clockAdvancePerTick: 10,
+ });
+
+ expect(result.pruneCalls).toBe(0);
+ store.close();
+ });
+
+ it("a prune error is logged and does not stop draining", () => {
+ const journalPath = join(tmpDir, "journal.log");
+ const recentTs = Date.now() - 1000;
+ const recentLog1: LogRecord = { ...log1, timestamp: recentTs };
+ const recentLog2: LogRecord = { ...log2, timestamp: recentTs };
+ writeFileSync(journalPath, toNdjson([recentLog1, recentLog2]));
+
+ const store = createTraceStore({ path: ":memory:" });
+ let pruneCalls = 0;
+ let nextPruneThrows = true;
+ const realPrune = store.prune.bind(store);
+ store.prune = (policy) => {
+ pruneCalls++;
+ if (nextPruneThrows) {
+ nextPruneThrows = false;
+ throw new Error("simulated prune failure");
+ }
+ return realPrune(policy);
+ };
+
+ let drainSuccesses = 0;
+ let now = 1000;
+ let lastPruneAt = now;
+ const pruneIntervalMs = 60_000;
+
+ for (let i = 0; i < 4; i++) {
+ const result = drainOnce({ journalPath, offset: 0, store });
+ if (result.newOffset > 0) drainSuccesses++;
+ if (shouldPrune(now, lastPruneAt, pruneIntervalMs)) {
+ try {
+ store.prune(DEFAULT_RETENTION);
+ } catch {
+ // expected in test
+ }
+ lastPruneAt = now;
+ }
+ now += 60_000;
+ }
+
+ expect(pruneCalls).toBe(3);
+ expect(drainSuccesses).toBe(4);
+ const turn = store.getTurn("turn-1");
+ expect(turn).toHaveLength(2);
+ store.close();
+ });
+
+ it("body inserts flow through content-addressed path unchanged", () => {
+ const journalPath = join(tmpDir, "journal.log");
+ const bodyLog: LogRecord = {
+ kind: "log",
+ level: "info",
+ msg: "with-body",
+ timestamp: 1700000000000,
+ extensionId: "ext-1",
+ turnId: "turn-2",
+ body: "request payload content",
+ };
+ writeFileSync(journalPath, toNdjson([bodyLog]));
+
+ const store = createTraceStore({ path: ":memory:" });
+ drainOnce({ journalPath, offset: 0, store });
+
+ const turn = store.getTurn("turn-2");
+ expect(turn).toHaveLength(1);
+ const record = turn[0];
+ if (record === undefined) throw new Error("expected record");
+ expect(record.body).toBe("request payload content");
+ store.close();
+ });
+});
diff --git a/packages/observability-collector/src/collector.ts b/packages/observability-collector/src/collector.ts
index 549090b..157c379 100644
--- a/packages/observability-collector/src/collector.ts
+++ b/packages/observability-collector/src/collector.ts
@@ -3,11 +3,15 @@ import type { TraceStore } from "@dispatch/trace-store";
// --- Pure core (no I/O) ---
-/**
- * Split a buffer on newline boundaries. Returns complete lines and the
- * trailing partial (no newline yet) as remainder. A torn last line is
- * NOT parsed until its newline arrives.
- */
+export function shouldPrune(now: number, lastPruneAt: number, intervalMs: number): boolean {
+ return now - lastPruneAt >= intervalMs;
+}
+
+export interface Logger {
+ readonly info: (...args: readonly unknown[]) => void;
+ readonly debug: (...args: readonly unknown[]) => void;
+}
+
export function splitLines(buffer: string): { lines: string[]; remainder: string } {
const lines: string[] = [];
let start = 0;
diff --git a/packages/observability-collector/src/index.ts b/packages/observability-collector/src/index.ts
index 4a38600..a0e6e50 100644
--- a/packages/observability-collector/src/index.ts
+++ b/packages/observability-collector/src/index.ts
@@ -1,9 +1,10 @@
-export type { DrainOpts, DrainResult, FsOps } from "./collector.js";
+export type { DrainOpts, DrainResult, FsOps, Logger } from "./collector.js";
export {
drainOnce,
readOffset,
resetFsOps,
setFsOps,
+ shouldPrune,
splitLines,
writeOffset,
} from "./collector.js";
diff --git a/packages/observability-collector/src/main.ts b/packages/observability-collector/src/main.ts
index 3418e09..828b837 100644
--- a/packages/observability-collector/src/main.ts
+++ b/packages/observability-collector/src/main.ts
@@ -1,5 +1,6 @@
-import { createTraceStore } from "@dispatch/trace-store";
-import { drainOnce, readOffset, writeOffset } from "./collector.js";
+import { createTraceStore, DEFAULT_RETENTION } from "@dispatch/trace-store";
+import type { Logger } from "./collector.js";
+import { drainOnce, readOffset, shouldPrune, writeOffset } from "./collector.js";
// --- Argv parsing ---
@@ -7,12 +8,14 @@ interface CliArgs {
readonly journal: string;
readonly db: string;
readonly interval: number;
+ readonly pruneIntervalMs: number;
}
function parseArgs(argv: string[]): CliArgs {
let journal = "";
let db = "./.dispatch-data/traces.db";
let interval = 250;
+ let pruneIntervalMs = 60_000;
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
@@ -26,19 +29,30 @@ function parseArgs(argv: string[]): CliArgs {
const val = Number(argv[i + 1]);
if (Number.isFinite(val) && val > 0) interval = val;
i++;
+ } else if (arg === "--prune-interval-ms" && i + 1 < argv.length) {
+ const val = Number(argv[i + 1]);
+ if (Number.isFinite(val) && val > 0) pruneIntervalMs = val;
+ i++;
}
}
if (!journal) {
console.error(
- "Usage: observability-collector --journal <path> [--db <path>] [--interval <ms>]",
+ "Usage: observability-collector --journal <path> [--db <path>] [--interval <ms>] [--prune-interval-ms <ms>]",
);
process.exit(1);
}
- return { journal, db, interval };
+ return { journal, db, interval, pruneIntervalMs };
}
+// --- Logger ---
+
+const logger: Logger = {
+ info: (...args: readonly unknown[]) => console.log("[observability-collector]", ...args),
+ debug: (...args: readonly unknown[]) => console.debug("[observability-collector]", ...args),
+};
+
// --- Main loop ---
async function main(): Promise<void> {
@@ -47,6 +61,7 @@ async function main(): Promise<void> {
const store = createTraceStore({ path: args.db });
let offset = readOffset(sidecarPath);
+ let lastPruneAt = Date.now();
let shuttingDown = false;
@@ -64,6 +79,18 @@ async function main(): Promise<void> {
offset = result.newOffset;
writeOffset(sidecarPath, offset);
}
+
+ const now = Date.now();
+ if (shouldPrune(now, lastPruneAt, args.pruneIntervalMs)) {
+ lastPruneAt = now;
+ try {
+ const summary = store.prune(DEFAULT_RETENTION);
+ logger.debug("prune completed", summary);
+ } catch (err) {
+ logger.info("prune failed (non-fatal)", err);
+ }
+ }
+
await sleep(args.interval);
}
diff --git a/tasks.md b/tasks.md
index d86086a..8cd0a8b 100644
--- a/tasks.md
+++ b/tasks.md
@@ -68,20 +68,23 @@ server/collector procs poison the next run's counts.
- [x] **FE courier handoff** written: `frontend-metrics-pass2-handoff.md` (in
this repo; user couriers to `../dispatch-web`; ORCHESTRATOR §7).
-## dedup / storage growth (current milestone — building)
-Design DECIDED + recorded: `notes/observability-design.md` §12. User-gated calls:
-extend existing pipeline (no new ext); scope = **de-dup + retention/rotation**
-(D9 roll-ups deferred); dedup = **content-addressed bodies** (body-hash, NOT
-fingerprint-gated). Glossary terms approved (add on land): deduplication /
-content-addressed body, prefix fingerprint, warm vs real, retention / rotation.
-- [x] **Wave 1 — `trace-store`** (done): content-addressed `bodies` table
- (SHA-256), at-rest gzip (>1 KiB), `prune(policy)` (age + drop-oldest size cap +
- orphan GC) / `RetentionPolicy` / `PruneSummary` / `DEFAULT_RETENTION` (7d/256MiB);
- read paths transparent. bun 89→100.
-- [ ] **Wave 2 — `observability-collector`:** call `store.prune()` on a cadence;
- body inserts flow through the content-addressed path.
-- [ ] On land: add the 4 glossary terms; (optional) host-bin env-override for
- retention policy.
+## dedup / storage growth (DONE)
+Design `notes/observability-design.md` §12. User-gated calls: extend existing
+pipeline (no new ext); scope = **de-dup + retention/rotation** (D9 roll-ups
+deferred); dedup = **content-addressed bodies** (body-hash, NOT fingerprint-gated).
+- [x] **Wave 1 — `trace-store`**: content-addressed `bodies` table (SHA-256),
+ at-rest gzip (>1 KiB), `prune(policy)` (age + drop-oldest byte-cap + orphan GC) /
+ `RetentionPolicy` / `PruneSummary` / `DEFAULT_RETENTION` (7d/256MiB); reads
+ transparent.
+- [x] **Wave 2 — `observability-collector`**: pure `shouldPrune` cadence helper;
+ `main.ts` calls `store.prune(DEFAULT_RETENTION)` on a coarse cadence
+ (`--prune-interval-ms`, default 60s; host-bin-overridable), log-and-continue on
+ error.
+- [x] Glossary: added content-addressed body, trace retention, prefix fingerprint,
+ warm vs real.
+- Tests: bun 89→106. typecheck/biome clean. (Retention is a background tick — no
+ request-path change, no live boot needed; covered by real-store + injected-clock
+ tests.) Optional follow-up: host-bin env-override for the retention policy.
## Open items
- **`prefix.fingerprint` / `warm|real` cache-bust attributes (deferred):** decoupled