diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
| commit | c48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch) | |
| tree | 1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b /packages/journal-sink | |
| parent | 94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff) | |
| download | dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip | |
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file.
Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span.
journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn.
Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11).
Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array.
Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
Diffstat (limited to 'packages/journal-sink')
| -rw-r--r-- | packages/journal-sink/package.json | 11 | ||||
| -rw-r--r-- | packages/journal-sink/src/index.ts | 2 | ||||
| -rw-r--r-- | packages/journal-sink/src/journal-sink.test.ts | 309 | ||||
| -rw-r--r-- | packages/journal-sink/src/journal-sink.ts | 171 | ||||
| -rw-r--r-- | packages/journal-sink/tsconfig.json | 6 |
5 files changed, 499 insertions, 0 deletions
diff --git a/packages/journal-sink/package.json b/packages/journal-sink/package.json new file mode 100644 index 0000000..f17d476 --- /dev/null +++ b/packages/journal-sink/package.json @@ -0,0 +1,11 @@ +{ + "name": "@dispatch/journal-sink", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*" + } +} diff --git a/packages/journal-sink/src/index.ts b/packages/journal-sink/src/index.ts new file mode 100644 index 0000000..02e3e2c --- /dev/null +++ b/packages/journal-sink/src/index.ts @@ -0,0 +1,2 @@ +export type { ClockOps, FsOps, JournalSinkOpts } from "./journal-sink.js"; +export { createJournalSink, serialize } from "./journal-sink.js"; diff --git a/packages/journal-sink/src/journal-sink.test.ts b/packages/journal-sink/src/journal-sink.test.ts new file mode 100644 index 0000000..a6531c7 --- /dev/null +++ b/packages/journal-sink/src/journal-sink.test.ts @@ -0,0 +1,309 @@ +import { mkdtemp, readFile, rm, stat } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { LogRecord } from "@dispatch/kernel"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ClockOps, FsOps } from "./journal-sink.js"; +import { createJournalSink, serialize } from "./journal-sink.js"; + +// --- Fixtures: one per LogRecord variant --- + +const logRecord: LogRecord = { + kind: "log", + level: "info", + msg: "hello world", + timestamp: 1700000000000, + extensionId: "test-ext", + conversationId: "conv-1", + turnId: "turn-1", + spanId: "span-1", + attributes: { key: "value" }, +}; + +const logRecordMinimal: LogRecord = { + kind: "log", + level: "debug", + msg: "minimal", + timestamp: 1700000000001, + extensionId: "ext-min", +}; + +const spanOpenRecord: LogRecord = { + kind: "span-open", + spanId: "span-2", + name: "provider.request", + timestamp: 1700000000100, + extensionId: "test-ext", + conversationId: "conv-1", + turnId: "turn-1", + parentSpanId: "span-1", + attributes: { model: "gpt-4" }, + links: [{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }], + body: "verbatim request body", +}; + +const spanOpenRecordMinimal: LogRecord = { + kind: "span-open", + spanId: "span-3", + name: "tool.call", + timestamp: 1700000000200, + extensionId: "ext-tool", +}; + +const spanCloseRecord: LogRecord = { + kind: "span-close", + spanId: "span-2", + name: "provider.request", + timestamp: 1700000000500, + durationMs: 400, + status: "ok", + extensionId: "test-ext", + conversationId: "conv-1", + turnId: "turn-1", + parentSpanId: "span-1", + attributes: { cacheHit: true }, + links: [{ spanId: "span-0" }], +}; + +const spanCloseRecordError: LogRecord = { + kind: "span-close", + spanId: "span-4", + name: "failing-step", + timestamp: 1700000000600, + durationMs: 100, + status: "error", + extensionId: "ext-err", +}; + +// --- Pure core: serialize --- + +describe("serialize", () => { + const allRecords = [ + { name: "log (full)", record: logRecord }, + { name: "log (minimal)", record: logRecordMinimal }, + { name: "span-open (full)", record: spanOpenRecord }, + { name: "span-open (minimal)", record: spanOpenRecordMinimal }, + { name: "span-close (full)", record: spanCloseRecord }, + { name: "span-close (error)", record: spanCloseRecordError }, + ]; + + for (const { name, record } of allRecords) { + it(`produces exactly one NDJSON line for ${name}`, () => { + const line = serialize(record); + expect(line.endsWith("\n")).toBe(true); + expect(line.endsWith("\n\n")).toBe(false); + const parsed = JSON.parse(line); + expect(parsed).toEqual(record); + }); + + it(`round-trips ${name} through JSON.parse`, () => { + const line = serialize(record); + const roundTripped: LogRecord = JSON.parse(line); + expect(roundTripped).toEqual(record); + expect(roundTripped.kind).toBe(record.kind); + }); + } + + it("preserves all LogRecord variant kinds", () => { + expect(JSON.parse(serialize(logRecord)).kind).toBe("log"); + expect(JSON.parse(serialize(spanOpenRecord)).kind).toBe("span-open"); + expect(JSON.parse(serialize(spanCloseRecord)).kind).toBe("span-close"); + }); + + it("preserves optional fields when present", () => { + const parsed = JSON.parse(serialize(spanOpenRecord)); + expect(parsed.body).toBe("verbatim request body"); + expect(parsed.links).toEqual([{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }]); + expect(parsed.attributes).toEqual({ model: "gpt-4" }); + }); + + it("omits optional fields when absent (no undefined in output)", () => { + const parsed = JSON.parse(serialize(logRecordMinimal)); + expect(parsed.conversationId).toBeUndefined(); + expect(parsed.turnId).toBeUndefined(); + expect(parsed.spanId).toBeUndefined(); + expect(parsed.attributes).toBeUndefined(); + expect(parsed.body).toBeUndefined(); + expect("conversationId" in parsed).toBe(false); + }); +}); + +// --- Imperative shell: createJournalSink (fs integration) --- + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "journal-sink-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +describe("createJournalSink", () => { + it("emits records that can be read back as NDJSON", async () => { + const path = join(tmpDir, "journal.log"); + const sink = createJournalSink({ path, fsync: "none" }); + + const records = [logRecord, spanOpenRecord, spanCloseRecord]; + for (const r of records) { + sink.emit(r); + } + + const content = await readFile(path, "utf8"); + const lines = content.split("\n").filter((l) => l.length > 0); + expect(lines).toHaveLength(3); + + for (let i = 0; i < lines.length; i++) { + const parsed: LogRecord = JSON.parse(lines[i] ?? ""); + expect(parsed).toEqual(records[i]); + } + }); + + it("warns and does NOT throw when writing to a bad path", () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const badPath = join(tmpDir, "nonexistent", "deep", "journal.log"); + const sink = createJournalSink({ path: badPath, fsync: "none" }); + + expect(() => sink.emit(logRecord)).not.toThrow(); + expect(warnSpy).toHaveBeenCalled(); + expect(warnSpy.mock.calls[0]?.[0]).toContain("[journal-sink]"); + + warnSpy.mockRestore(); + }); + + it("appends to existing file on creation", async () => { + const path = join(tmpDir, "journal.log"); + const { writeFileSync } = await import("node:fs"); + writeFileSync(path, serialize(logRecord)); + + const sink = createJournalSink({ path, fsync: "none" }); + sink.emit(spanOpenRecord); + + const content = await readFile(path, "utf8"); + const lines = content.split("\n").filter((l) => l.length > 0); + expect(lines).toHaveLength(2); + expect(JSON.parse(lines[0] ?? "").kind).toBe("log"); + expect(JSON.parse(lines[1] ?? "").kind).toBe("span-open"); + }); + + it("warns and drops records when fs.write throws mid-emit", () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const brokenFs: FsOps = { + open: () => 1, + write: () => { + throw new Error("disk full"); + }, + close: () => {}, + rename: () => {}, + statSize: () => 0, + fsync: () => {}, + }; + const sink = createJournalSink({ path: join(tmpDir, "j.log"), fs: brokenFs, fsync: "none" }); + + expect(() => sink.emit(logRecord)).not.toThrow(); + expect(warnSpy).toHaveBeenCalled(); + expect(warnSpy.mock.calls[0]?.[1]).toBeInstanceOf(Error); + + warnSpy.mockRestore(); + }); +}); + +// --- Rotation --- + +describe("rotation", () => { + it("rotates when file exceeds maxBytes", async () => { + const path = join(tmpDir, "journal.log"); + const sink = createJournalSink({ path, maxBytes: 100, fsync: "none" }); + + sink.emit(logRecord); + sink.emit(logRecordMinimal); + sink.emit(spanOpenRecord); + + const content = await readFile(path, "utf8"); + const lines = content.split("\n").filter((l) => l.length > 0); + expect(lines.length).toBeGreaterThan(0); + + let rotatedExists = false; + try { + await stat(`${path}.1`); + rotatedExists = true; + } catch { + // May not exist if rotation hasn't triggered yet. + } + expect(rotatedExists).toBe(true); + + const rotatedContent = await readFile(`${path}.1`, "utf8"); + const allLines = [...rotatedContent.split("\n").filter((l) => l.length > 0), ...lines]; + for (const line of allLines) { + const parsed = JSON.parse(line); + expect(["log", "span-open", "span-close"]).toContain(parsed.kind); + } + }); +}); + +// --- Fsync --- + +describe("fsync", () => { + it("calls fsync periodically when mode is periodic", () => { + let fsyncCalls = 0; + let currentTime = 0; + const mockFs: FsOps = { + open: () => 1, + write: () => {}, + close: () => {}, + rename: () => {}, + statSize: () => 0, + fsync: () => { + fsyncCalls++; + }, + }; + const mockClock: ClockOps = { + now: () => currentTime, + }; + const sink = createJournalSink({ + path: join(tmpDir, "j.log"), + fs: mockFs, + clock: mockClock, + fsync: "periodic", + }); + + sink.emit(logRecord); + expect(fsyncCalls).toBe(0); + + currentTime = 6_000; + sink.emit(logRecordMinimal); + expect(fsyncCalls).toBe(1); + + sink.emit(spanOpenRecord); + expect(fsyncCalls).toBe(1); + + currentTime = 12_000; + sink.emit(spanCloseRecord); + expect(fsyncCalls).toBe(2); + }); + + it("never calls fsync when mode is none", () => { + let fsyncCalls = 0; + const mockFs: FsOps = { + open: () => 1, + write: () => {}, + close: () => {}, + rename: () => {}, + statSize: () => 0, + fsync: () => { + fsyncCalls++; + }, + }; + const sink = createJournalSink({ + path: join(tmpDir, "j.log"), + fs: mockFs, + fsync: "none", + }); + + sink.emit(logRecord); + sink.emit(logRecord); + sink.emit(logRecord); + expect(fsyncCalls).toBe(0); + }); +}); diff --git a/packages/journal-sink/src/journal-sink.ts b/packages/journal-sink/src/journal-sink.ts new file mode 100644 index 0000000..8e36fd6 --- /dev/null +++ b/packages/journal-sink/src/journal-sink.ts @@ -0,0 +1,171 @@ +import { closeSync, fsyncSync, openSync, renameSync, statSync, writeSync } from "node:fs"; +import type { LogRecord, LogSink } from "@dispatch/kernel"; + +// --- Pure core (no I/O) --- + +/** + * Serialize a LogRecord to exactly one NDJSON line (record JSON + newline). + * Pure function — no I/O, no side effects. + */ +export function serialize(record: LogRecord): string { + return `${JSON.stringify(record)}\n`; +} + +// --- Imperative shell (fs edge) --- + +/** Injectable fs operations — confined to the edge. */ +export interface FsOps { + readonly open: (path: string) => number; + readonly write: (fd: number, data: string) => void; + readonly close: (fd: number) => void; + readonly rename: (oldPath: string, newPath: string) => void; + readonly statSize: (path: string) => number; + readonly fsync: (fd: number) => void; +} + +/** Clock injection for fsync interval. */ +export interface ClockOps { + readonly now: () => number; +} + +export interface JournalSinkOpts { + readonly path: string; + readonly maxBytes?: number; + readonly fsync?: "periodic" | "none"; + readonly fs?: FsOps; + readonly clock?: ClockOps; +} + +const DEFAULT_MAX_BYTES = 50 * 1024 * 1024; // 50 MB +const FSYNC_INTERVAL_MS = 5_000; // 5 seconds + +/** + * Create a LogSink that durably appends each record to the journal file + * as one NDJSON line. Fail-safe: write errors drop + warn, never throw. + * Rotates when file exceeds maxBytes (rename → .1, reopen fresh). + */ +export function createJournalSink(opts: JournalSinkOpts): LogSink { + const filePath = opts.path; + const maxBytes = opts.maxBytes ?? DEFAULT_MAX_BYTES; + const syncMode = opts.fsync ?? "periodic"; + const fs = opts.fs ?? createDefaultFsOps(); + const clock = opts.clock ?? { now: () => Date.now() }; + + const NO_FD = -1; + let fd: number; + let bytesWritten: number; + try { + fd = fs.open(filePath); + bytesWritten = fs.statSize(filePath); + } catch { + fd = NO_FD; + bytesWritten = 0; + } + let lastFsyncAt = clock.now(); + + function tryOpen(): boolean { + if (fd !== NO_FD) return true; + try { + fd = fs.open(filePath); + bytesWritten = 0; + return true; + } catch { + return false; + } + } + + function rotate(): void { + if (fd !== NO_FD) { + try { + fs.close(fd); + } catch { + // Ignore close errors during rotation. + } + } + const rotatedPath = `${filePath}.1`; + try { + fs.rename(filePath, rotatedPath); + } catch { + // If rename fails, just truncate by reopening. + } + try { + fd = fs.open(filePath); + } catch { + fd = NO_FD; + } + bytesWritten = 0; + } + + function maybeFsync(): void { + if (syncMode !== "periodic" || fd === NO_FD) return; + const now = clock.now(); + if (now - lastFsyncAt >= FSYNC_INTERVAL_MS) { + try { + fs.fsync(fd); + } catch { + // Swallow — fail-safe. + } + lastFsyncAt = now; + } + } + + const sink: LogSink = { + emit(record: LogRecord): void { + try { + if (!tryOpen()) { + console.warn("[journal-sink] cannot open journal, dropping record"); + return; + } + + const line = serialize(record); + const lineBytes = Buffer.byteLength(line, "utf8"); + + if (bytesWritten + lineBytes > maxBytes) { + rotate(); + if (fd === NO_FD) { + console.warn("[journal-sink] rotation failed, dropping record"); + return; + } + } + + fs.write(fd, line); + bytesWritten += lineBytes; + maybeFsync(); + } catch (err) { + // Fail-safe: drop + warn, never throw to the caller (D3/D7). + console.warn("[journal-sink] write failed, dropping record:", err); + } + }, + }; + + return sink; +} + +// --- Default fs ops (real Bun/Node I/O) --- + +function createDefaultFsOps(): FsOps { + return { + open(path: string): number { + return openSync(path, "a"); + }, + write(fd: number, data: string): void { + writeSync(fd, data); + }, + close(fd: number): void { + closeSync(fd); + }, + rename(oldPath: string, newPath: string): void { + renameSync(oldPath, newPath); + }, + statSize(path: string): number { + try { + return statSync(path).size; + } catch { + return 0; + } + }, + fsync(fd: number): void { + fsyncSync(fd); + }, + }; +} diff --git a/packages/journal-sink/tsconfig.json b/packages/journal-sink/tsconfig.json new file mode 100644 index 0000000..ff99a43 --- /dev/null +++ b/packages/journal-sink/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [{ "path": "../kernel" }] +} |
