diff options
| author | Adam Malczewski <[email protected]> | 2026-06-10 10:58:42 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-10 10:58:42 +0900 |
| commit | c1b08acd121432fcf4fea2fc0b70521cdf9f0cf6 (patch) | |
| tree | 73a94a0b2c923a123f0fcb1ebc7b080f1ce635d6 | |
| parent | 3a0cdd2c8453f059a746465e3aa6d9b5caa3b399 (diff) | |
| download | dispatch-c1b08acd121432fcf4fea2fc0b70521cdf9f0cf6.tar.gz dispatch-c1b08acd121432fcf4fea2fc0b70521cdf9f0cf6.zip | |
observability-collector: drive trace-store prune on a cadence
Wave 2 (final) of the dedup/storage-growth milestone (notes §12).
- pure shouldPrune(now,lastPruneAt,intervalMs) cadence helper (injected clock).
- main.ts calls store.prune(DEFAULT_RETENTION) on a coarse cadence
(--prune-interval-ms, default 60s; host-bin-overridable), far less frequent
than a drain. Prune errors are logged and never stop the tail loop.
- confirmed body inserts flow through trace-store's content-addressed path.
- glossary: content-addressed body, trace retention, prefix fingerprint,
warm vs real.
typecheck EXIT 0; biome clean; vitest 576; bun 100->106, 0 fail.
| -rw-r--r-- | GLOSSARY.md | 4 | ||||
| -rw-r--r-- | packages/observability-collector/src/collector.test.ts | 162 | ||||
| -rw-r--r-- | packages/observability-collector/src/collector.ts | 14 | ||||
| -rw-r--r-- | packages/observability-collector/src/index.ts | 3 | ||||
| -rw-r--r-- | packages/observability-collector/src/main.ts | 35 | ||||
| -rw-r--r-- | tasks.md | 31 |
6 files changed, 223 insertions, 26 deletions
diff --git a/GLOSSARY.md b/GLOSSARY.md index 845b45f..39e618f 100644 --- a/GLOSSARY.md +++ b/GLOSSARY.md @@ -26,6 +26,10 @@ | **chunk** | One ordered piece of a message (text, thinking, tool-call/result, etc.), append-only in the log. | block, segment | | **seq** | The monotonic, gap-free, per-conversation sequence number stamped on each chunk as it is appended to the log. The sync cursor: a client requests `?sinceSeq=N` to fetch only newer chunks. Storage/sync metadata, never message content. | cursor (when meaning the number), offset, index | | **StoredChunk** | The wire envelope `{ seq, role, chunk }`: a persisted chunk plus its sync metadata. Keeps the pure `chunk` free of storage concerns while a flat seq-ordered stream stays both syncable and regroupable into messages. | seq'd chunk | +| **content-addressed body** | A verbatim trace `body` stored once in the trace store keyed by its content hash (SHA-256); duplicate bodies (cache-warming resends, any repeat) collapse to a single stored row referenced by hash. The trace store's de-duplication mechanism. | (not fingerprint-keyed) | +| **trace retention** | The trace store's `prune(policy)` pass that bounds storage growth: age-based delete + drop-oldest byte-cap eviction of records/bodies + orphaned-body GC. "Rotation" is one part of retention, not a separate thing. | rotation (as a separate concept) | +| **prefix fingerprint** | A hash of a provider request's cacheable prefix (up to the `cache_control` breakpoint), stamped as a queryable attribute to flag a prompt-cache bust (the fingerprint changed unexpectedly between a warm and a real send). A cache-bust DEBUGGING signal — NOT the body-dedup key. | — | +| **warm vs real (request)** | A `provider.request` flagged `warm` (a periodic rewound resend to keep the prompt cache warm, ≠ `wake`) vs `real` (user-driven). | reheat, cache reheating | | **runTurn** | The kernel's turn loop: takes provider + messages + tools + dispatch policy, streams, dispatches tools, emits events. | run, agentLoop | | **hook** | A typed extension point. **event** = fire-and-forget, N listeners, error-isolated. **filter** = ordered value-in→value-out chain, in-band. | callback (when meaning a hook), listener | | **service** | A single-responder request/response capability fetched via a typed handle. NOT a hook. | — | diff --git a/packages/observability-collector/src/collector.test.ts b/packages/observability-collector/src/collector.test.ts index 51b3e42..1dd03d4 100644 --- a/packages/observability-collector/src/collector.test.ts +++ b/packages/observability-collector/src/collector.test.ts @@ -2,9 +2,10 @@ import { mkdtempSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import type { LogRecord } from "@dispatch/kernel"; -import { createTraceStore } from "@dispatch/trace-store"; +import type { TraceStore } from "@dispatch/trace-store"; +import { createTraceStore, DEFAULT_RETENTION } from "@dispatch/trace-store"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import { drainOnce, readOffset, splitLines, writeOffset } from "./collector.js"; +import { drainOnce, readOffset, shouldPrune, splitLines, writeOffset } from "./collector.js"; // --- Fixtures --- @@ -280,3 +281,160 @@ describe("readOffset / writeOffset", () => { expect(readOffset(path)).toBe(0); }); }); + +// --- shouldPrune (pure) --- + +describe("shouldPrune", () => { + it("shouldPrune is false before the interval elapses", () => { + expect(shouldPrune(1000, 1000, 60_000)).toBe(false); + expect(shouldPrune(59_999, 1000, 60_000)).toBe(false); + }); + + it("shouldPrune is true once the interval has elapsed", () => { + expect(shouldPrune(61_000, 1000, 60_000)).toBe(true); + expect(shouldPrune(70_000, 1000, 60_000)).toBe(true); + }); +}); + +// --- Prune integration (real in-memory trace-store + injected clock) --- + +function runCollectorTicks(opts: { + store: TraceStore; + journalPath: string; + pruneIntervalMs: number; + now: number; + tickCount: number; + clockAdvancePerTick: number; +}): { pruneCalls: number; now: number } { + const { + store, + journalPath, + pruneIntervalMs, + now: startNow, + tickCount, + clockAdvancePerTick, + } = opts; + let now = startNow; + let lastPruneAt = now; + let pruneCalls = 0; + + for (let i = 0; i < tickCount; i++) { + drainOnce({ journalPath, offset: 0, store }); + if (shouldPrune(now, lastPruneAt, pruneIntervalMs)) { + store.prune(DEFAULT_RETENTION); + pruneCalls++; + lastPruneAt = now; + } + now += clockAdvancePerTick; + } + + return { pruneCalls, now }; +} + +describe("prune integration", () => { + it("collector invokes store.prune once after the prune interval elapses", () => { + const journalPath = join(tmpDir, "journal.log"); + writeFileSync(journalPath, ""); + + const store = createTraceStore({ path: ":memory:" }); + const result = runCollectorTicks({ + store, + journalPath, + pruneIntervalMs: 60_000, + now: 1000, + tickCount: 13_000, + clockAdvancePerTick: 5, + }); + + expect(result.pruneCalls).toBe(1); + store.close(); + }); + + it("collector does not prune on every drain", () => { + const journalPath = join(tmpDir, "journal.log"); + writeFileSync(journalPath, ""); + + const store = createTraceStore({ path: ":memory:" }); + const result = runCollectorTicks({ + store, + journalPath, + pruneIntervalMs: 60_000, + now: 1000, + tickCount: 100, + clockAdvancePerTick: 10, + }); + + expect(result.pruneCalls).toBe(0); + store.close(); + }); + + it("a prune error is logged and does not stop draining", () => { + const journalPath = join(tmpDir, "journal.log"); + const recentTs = Date.now() - 1000; + const recentLog1: LogRecord = { ...log1, timestamp: recentTs }; + const recentLog2: LogRecord = { ...log2, timestamp: recentTs }; + writeFileSync(journalPath, toNdjson([recentLog1, recentLog2])); + + const store = createTraceStore({ path: ":memory:" }); + let pruneCalls = 0; + let nextPruneThrows = true; + const realPrune = store.prune.bind(store); + store.prune = (policy) => { + pruneCalls++; + if (nextPruneThrows) { + nextPruneThrows = false; + throw new Error("simulated prune failure"); + } + return realPrune(policy); + }; + + let drainSuccesses = 0; + let now = 1000; + let lastPruneAt = now; + const pruneIntervalMs = 60_000; + + for (let i = 0; i < 4; i++) { + const result = drainOnce({ journalPath, offset: 0, store }); + if (result.newOffset > 0) drainSuccesses++; + if (shouldPrune(now, lastPruneAt, pruneIntervalMs)) { + try { + store.prune(DEFAULT_RETENTION); + } catch { + // expected in test + } + lastPruneAt = now; + } + now += 60_000; + } + + expect(pruneCalls).toBe(3); + expect(drainSuccesses).toBe(4); + const turn = store.getTurn("turn-1"); + expect(turn).toHaveLength(2); + store.close(); + }); + + it("body inserts flow through content-addressed path unchanged", () => { + const journalPath = join(tmpDir, "journal.log"); + const bodyLog: LogRecord = { + kind: "log", + level: "info", + msg: "with-body", + timestamp: 1700000000000, + extensionId: "ext-1", + turnId: "turn-2", + body: "request payload content", + }; + writeFileSync(journalPath, toNdjson([bodyLog])); + + const store = createTraceStore({ path: ":memory:" }); + drainOnce({ journalPath, offset: 0, store }); + + const turn = store.getTurn("turn-2"); + expect(turn).toHaveLength(1); + const record = turn[0]; + if (record === undefined) throw new Error("expected record"); + expect(record.body).toBe("request payload content"); + store.close(); + }); +}); diff --git a/packages/observability-collector/src/collector.ts b/packages/observability-collector/src/collector.ts index 549090b..157c379 100644 --- a/packages/observability-collector/src/collector.ts +++ b/packages/observability-collector/src/collector.ts @@ -3,11 +3,15 @@ import type { TraceStore } from "@dispatch/trace-store"; // --- Pure core (no I/O) --- -/** - * Split a buffer on newline boundaries. Returns complete lines and the - * trailing partial (no newline yet) as remainder. A torn last line is - * NOT parsed until its newline arrives. - */ +export function shouldPrune(now: number, lastPruneAt: number, intervalMs: number): boolean { + return now - lastPruneAt >= intervalMs; +} + +export interface Logger { + readonly info: (...args: readonly unknown[]) => void; + readonly debug: (...args: readonly unknown[]) => void; +} + export function splitLines(buffer: string): { lines: string[]; remainder: string } { const lines: string[] = []; let start = 0; diff --git a/packages/observability-collector/src/index.ts b/packages/observability-collector/src/index.ts index 4a38600..a0e6e50 100644 --- a/packages/observability-collector/src/index.ts +++ b/packages/observability-collector/src/index.ts @@ -1,9 +1,10 @@ -export type { DrainOpts, DrainResult, FsOps } from "./collector.js"; +export type { DrainOpts, DrainResult, FsOps, Logger } from "./collector.js"; export { drainOnce, readOffset, resetFsOps, setFsOps, + shouldPrune, splitLines, writeOffset, } from "./collector.js"; diff --git a/packages/observability-collector/src/main.ts b/packages/observability-collector/src/main.ts index 3418e09..828b837 100644 --- a/packages/observability-collector/src/main.ts +++ b/packages/observability-collector/src/main.ts @@ -1,5 +1,6 @@ -import { createTraceStore } from "@dispatch/trace-store"; -import { drainOnce, readOffset, writeOffset } from "./collector.js"; +import { createTraceStore, DEFAULT_RETENTION } from "@dispatch/trace-store"; +import type { Logger } from "./collector.js"; +import { drainOnce, readOffset, shouldPrune, writeOffset } from "./collector.js"; // --- Argv parsing --- @@ -7,12 +8,14 @@ interface CliArgs { readonly journal: string; readonly db: string; readonly interval: number; + readonly pruneIntervalMs: number; } function parseArgs(argv: string[]): CliArgs { let journal = ""; let db = "./.dispatch-data/traces.db"; let interval = 250; + let pruneIntervalMs = 60_000; for (let i = 0; i < argv.length; i++) { const arg = argv[i]; @@ -26,19 +29,30 @@ function parseArgs(argv: string[]): CliArgs { const val = Number(argv[i + 1]); if (Number.isFinite(val) && val > 0) interval = val; i++; + } else if (arg === "--prune-interval-ms" && i + 1 < argv.length) { + const val = Number(argv[i + 1]); + if (Number.isFinite(val) && val > 0) pruneIntervalMs = val; + i++; } } if (!journal) { console.error( - "Usage: observability-collector --journal <path> [--db <path>] [--interval <ms>]", + "Usage: observability-collector --journal <path> [--db <path>] [--interval <ms>] [--prune-interval-ms <ms>]", ); process.exit(1); } - return { journal, db, interval }; + return { journal, db, interval, pruneIntervalMs }; } +// --- Logger --- + +const logger: Logger = { + info: (...args: readonly unknown[]) => console.log("[observability-collector]", ...args), + debug: (...args: readonly unknown[]) => console.debug("[observability-collector]", ...args), +}; + // --- Main loop --- async function main(): Promise<void> { @@ -47,6 +61,7 @@ async function main(): Promise<void> { const store = createTraceStore({ path: args.db }); let offset = readOffset(sidecarPath); + let lastPruneAt = Date.now(); let shuttingDown = false; @@ -64,6 +79,18 @@ async function main(): Promise<void> { offset = result.newOffset; writeOffset(sidecarPath, offset); } + + const now = Date.now(); + if (shouldPrune(now, lastPruneAt, args.pruneIntervalMs)) { + lastPruneAt = now; + try { + const summary = store.prune(DEFAULT_RETENTION); + logger.debug("prune completed", summary); + } catch (err) { + logger.info("prune failed (non-fatal)", err); + } + } + await sleep(args.interval); } @@ -68,20 +68,23 @@ server/collector procs poison the next run's counts. - [x] **FE courier handoff** written: `frontend-metrics-pass2-handoff.md` (in this repo; user couriers to `../dispatch-web`; ORCHESTRATOR §7). -## dedup / storage growth (current milestone — building) -Design DECIDED + recorded: `notes/observability-design.md` §12. User-gated calls: -extend existing pipeline (no new ext); scope = **de-dup + retention/rotation** -(D9 roll-ups deferred); dedup = **content-addressed bodies** (body-hash, NOT -fingerprint-gated). Glossary terms approved (add on land): deduplication / -content-addressed body, prefix fingerprint, warm vs real, retention / rotation. -- [x] **Wave 1 — `trace-store`** (done): content-addressed `bodies` table - (SHA-256), at-rest gzip (>1 KiB), `prune(policy)` (age + drop-oldest size cap + - orphan GC) / `RetentionPolicy` / `PruneSummary` / `DEFAULT_RETENTION` (7d/256MiB); - read paths transparent. bun 89→100. -- [ ] **Wave 2 — `observability-collector`:** call `store.prune()` on a cadence; - body inserts flow through the content-addressed path. -- [ ] On land: add the 4 glossary terms; (optional) host-bin env-override for - retention policy. +## dedup / storage growth (DONE) +Design `notes/observability-design.md` §12. User-gated calls: extend existing +pipeline (no new ext); scope = **de-dup + retention/rotation** (D9 roll-ups +deferred); dedup = **content-addressed bodies** (body-hash, NOT fingerprint-gated). +- [x] **Wave 1 — `trace-store`**: content-addressed `bodies` table (SHA-256), + at-rest gzip (>1 KiB), `prune(policy)` (age + drop-oldest byte-cap + orphan GC) / + `RetentionPolicy` / `PruneSummary` / `DEFAULT_RETENTION` (7d/256MiB); reads + transparent. +- [x] **Wave 2 — `observability-collector`**: pure `shouldPrune` cadence helper; + `main.ts` calls `store.prune(DEFAULT_RETENTION)` on a coarse cadence + (`--prune-interval-ms`, default 60s; host-bin-overridable), log-and-continue on + error. +- [x] Glossary: added content-addressed body, trace retention, prefix fingerprint, + warm vs real. +- Tests: bun 89→106. typecheck/biome clean. (Retention is a background tick — no + request-path change, no live boot needed; covered by real-store + injected-clock + tests.) Optional follow-up: host-bin env-override for the retention policy. ## Open items - **`prefix.fingerprint` / `warm|real` cache-bust attributes (deferred):** decoupled |
