summaryrefslogtreecommitdiffhomepage
path: root/packages/journal-sink/src
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
committerAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
commitc48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch)
tree1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b /packages/journal-sink/src
parent94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff)
downloaddispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz
dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file. Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span. journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn. Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11). Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array. Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
Diffstat (limited to 'packages/journal-sink/src')
-rw-r--r--packages/journal-sink/src/index.ts2
-rw-r--r--packages/journal-sink/src/journal-sink.test.ts309
-rw-r--r--packages/journal-sink/src/journal-sink.ts171
3 files changed, 482 insertions, 0 deletions
diff --git a/packages/journal-sink/src/index.ts b/packages/journal-sink/src/index.ts
new file mode 100644
index 0000000..02e3e2c
--- /dev/null
+++ b/packages/journal-sink/src/index.ts
@@ -0,0 +1,2 @@
+export type { ClockOps, FsOps, JournalSinkOpts } from "./journal-sink.js";
+export { createJournalSink, serialize } from "./journal-sink.js";
diff --git a/packages/journal-sink/src/journal-sink.test.ts b/packages/journal-sink/src/journal-sink.test.ts
new file mode 100644
index 0000000..a6531c7
--- /dev/null
+++ b/packages/journal-sink/src/journal-sink.test.ts
@@ -0,0 +1,309 @@
+import { mkdtemp, readFile, rm, stat } from "node:fs/promises";
+import { tmpdir } from "node:os";
+import { join } from "node:path";
+import type { LogRecord } from "@dispatch/kernel";
+import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
+import type { ClockOps, FsOps } from "./journal-sink.js";
+import { createJournalSink, serialize } from "./journal-sink.js";
+
+// --- Fixtures: one per LogRecord variant ---
+
+const logRecord: LogRecord = {
+ kind: "log",
+ level: "info",
+ msg: "hello world",
+ timestamp: 1700000000000,
+ extensionId: "test-ext",
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ spanId: "span-1",
+ attributes: { key: "value" },
+};
+
+const logRecordMinimal: LogRecord = {
+ kind: "log",
+ level: "debug",
+ msg: "minimal",
+ timestamp: 1700000000001,
+ extensionId: "ext-min",
+};
+
+const spanOpenRecord: LogRecord = {
+ kind: "span-open",
+ spanId: "span-2",
+ name: "provider.request",
+ timestamp: 1700000000100,
+ extensionId: "test-ext",
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ parentSpanId: "span-1",
+ attributes: { model: "gpt-4" },
+ links: [{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }],
+ body: "verbatim request body",
+};
+
+const spanOpenRecordMinimal: LogRecord = {
+ kind: "span-open",
+ spanId: "span-3",
+ name: "tool.call",
+ timestamp: 1700000000200,
+ extensionId: "ext-tool",
+};
+
+const spanCloseRecord: LogRecord = {
+ kind: "span-close",
+ spanId: "span-2",
+ name: "provider.request",
+ timestamp: 1700000000500,
+ durationMs: 400,
+ status: "ok",
+ extensionId: "test-ext",
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ parentSpanId: "span-1",
+ attributes: { cacheHit: true },
+ links: [{ spanId: "span-0" }],
+};
+
+const spanCloseRecordError: LogRecord = {
+ kind: "span-close",
+ spanId: "span-4",
+ name: "failing-step",
+ timestamp: 1700000000600,
+ durationMs: 100,
+ status: "error",
+ extensionId: "ext-err",
+};
+
+// --- Pure core: serialize ---
+
+describe("serialize", () => {
+ const allRecords = [
+ { name: "log (full)", record: logRecord },
+ { name: "log (minimal)", record: logRecordMinimal },
+ { name: "span-open (full)", record: spanOpenRecord },
+ { name: "span-open (minimal)", record: spanOpenRecordMinimal },
+ { name: "span-close (full)", record: spanCloseRecord },
+ { name: "span-close (error)", record: spanCloseRecordError },
+ ];
+
+ for (const { name, record } of allRecords) {
+ it(`produces exactly one NDJSON line for ${name}`, () => {
+ const line = serialize(record);
+ expect(line.endsWith("\n")).toBe(true);
+ expect(line.endsWith("\n\n")).toBe(false);
+ const parsed = JSON.parse(line);
+ expect(parsed).toEqual(record);
+ });
+
+ it(`round-trips ${name} through JSON.parse`, () => {
+ const line = serialize(record);
+ const roundTripped: LogRecord = JSON.parse(line);
+ expect(roundTripped).toEqual(record);
+ expect(roundTripped.kind).toBe(record.kind);
+ });
+ }
+
+ it("preserves all LogRecord variant kinds", () => {
+ expect(JSON.parse(serialize(logRecord)).kind).toBe("log");
+ expect(JSON.parse(serialize(spanOpenRecord)).kind).toBe("span-open");
+ expect(JSON.parse(serialize(spanCloseRecord)).kind).toBe("span-close");
+ });
+
+ it("preserves optional fields when present", () => {
+ const parsed = JSON.parse(serialize(spanOpenRecord));
+ expect(parsed.body).toBe("verbatim request body");
+ expect(parsed.links).toEqual([{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }]);
+ expect(parsed.attributes).toEqual({ model: "gpt-4" });
+ });
+
+ it("omits optional fields when absent (no undefined in output)", () => {
+ const parsed = JSON.parse(serialize(logRecordMinimal));
+ expect(parsed.conversationId).toBeUndefined();
+ expect(parsed.turnId).toBeUndefined();
+ expect(parsed.spanId).toBeUndefined();
+ expect(parsed.attributes).toBeUndefined();
+ expect(parsed.body).toBeUndefined();
+ expect("conversationId" in parsed).toBe(false);
+ });
+});
+
+// --- Imperative shell: createJournalSink (fs integration) ---
+
+let tmpDir: string;
+
+beforeEach(async () => {
+ tmpDir = await mkdtemp(join(tmpdir(), "journal-sink-test-"));
+});
+
+afterEach(async () => {
+ await rm(tmpDir, { recursive: true, force: true });
+});
+
+describe("createJournalSink", () => {
+ it("emits records that can be read back as NDJSON", async () => {
+ const path = join(tmpDir, "journal.log");
+ const sink = createJournalSink({ path, fsync: "none" });
+
+ const records = [logRecord, spanOpenRecord, spanCloseRecord];
+ for (const r of records) {
+ sink.emit(r);
+ }
+
+ const content = await readFile(path, "utf8");
+ const lines = content.split("\n").filter((l) => l.length > 0);
+ expect(lines).toHaveLength(3);
+
+ for (let i = 0; i < lines.length; i++) {
+ const parsed: LogRecord = JSON.parse(lines[i] ?? "");
+ expect(parsed).toEqual(records[i]);
+ }
+ });
+
+ it("warns and does NOT throw when writing to a bad path", () => {
+ const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
+ const badPath = join(tmpDir, "nonexistent", "deep", "journal.log");
+ const sink = createJournalSink({ path: badPath, fsync: "none" });
+
+ expect(() => sink.emit(logRecord)).not.toThrow();
+ expect(warnSpy).toHaveBeenCalled();
+ expect(warnSpy.mock.calls[0]?.[0]).toContain("[journal-sink]");
+
+ warnSpy.mockRestore();
+ });
+
+ it("appends to existing file on creation", async () => {
+ const path = join(tmpDir, "journal.log");
+ const { writeFileSync } = await import("node:fs");
+ writeFileSync(path, serialize(logRecord));
+
+ const sink = createJournalSink({ path, fsync: "none" });
+ sink.emit(spanOpenRecord);
+
+ const content = await readFile(path, "utf8");
+ const lines = content.split("\n").filter((l) => l.length > 0);
+ expect(lines).toHaveLength(2);
+ expect(JSON.parse(lines[0] ?? "").kind).toBe("log");
+ expect(JSON.parse(lines[1] ?? "").kind).toBe("span-open");
+ });
+
+ it("warns and drops records when fs.write throws mid-emit", () => {
+ const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
+ const brokenFs: FsOps = {
+ open: () => 1,
+ write: () => {
+ throw new Error("disk full");
+ },
+ close: () => {},
+ rename: () => {},
+ statSize: () => 0,
+ fsync: () => {},
+ };
+ const sink = createJournalSink({ path: join(tmpDir, "j.log"), fs: brokenFs, fsync: "none" });
+
+ expect(() => sink.emit(logRecord)).not.toThrow();
+ expect(warnSpy).toHaveBeenCalled();
+ expect(warnSpy.mock.calls[0]?.[1]).toBeInstanceOf(Error);
+
+ warnSpy.mockRestore();
+ });
+});
+
+// --- Rotation ---
+
+describe("rotation", () => {
+ it("rotates when file exceeds maxBytes", async () => {
+ const path = join(tmpDir, "journal.log");
+ const sink = createJournalSink({ path, maxBytes: 100, fsync: "none" });
+
+ sink.emit(logRecord);
+ sink.emit(logRecordMinimal);
+ sink.emit(spanOpenRecord);
+
+ const content = await readFile(path, "utf8");
+ const lines = content.split("\n").filter((l) => l.length > 0);
+ expect(lines.length).toBeGreaterThan(0);
+
+ let rotatedExists = false;
+ try {
+ await stat(`${path}.1`);
+ rotatedExists = true;
+ } catch {
+ // May not exist if rotation hasn't triggered yet.
+ }
+ expect(rotatedExists).toBe(true);
+
+ const rotatedContent = await readFile(`${path}.1`, "utf8");
+ const allLines = [...rotatedContent.split("\n").filter((l) => l.length > 0), ...lines];
+ for (const line of allLines) {
+ const parsed = JSON.parse(line);
+ expect(["log", "span-open", "span-close"]).toContain(parsed.kind);
+ }
+ });
+});
+
+// --- Fsync ---
+
+describe("fsync", () => {
+ it("calls fsync periodically when mode is periodic", () => {
+ let fsyncCalls = 0;
+ let currentTime = 0;
+ const mockFs: FsOps = {
+ open: () => 1,
+ write: () => {},
+ close: () => {},
+ rename: () => {},
+ statSize: () => 0,
+ fsync: () => {
+ fsyncCalls++;
+ },
+ };
+ const mockClock: ClockOps = {
+ now: () => currentTime,
+ };
+ const sink = createJournalSink({
+ path: join(tmpDir, "j.log"),
+ fs: mockFs,
+ clock: mockClock,
+ fsync: "periodic",
+ });
+
+ sink.emit(logRecord);
+ expect(fsyncCalls).toBe(0);
+
+ currentTime = 6_000;
+ sink.emit(logRecordMinimal);
+ expect(fsyncCalls).toBe(1);
+
+ sink.emit(spanOpenRecord);
+ expect(fsyncCalls).toBe(1);
+
+ currentTime = 12_000;
+ sink.emit(spanCloseRecord);
+ expect(fsyncCalls).toBe(2);
+ });
+
+ it("never calls fsync when mode is none", () => {
+ let fsyncCalls = 0;
+ const mockFs: FsOps = {
+ open: () => 1,
+ write: () => {},
+ close: () => {},
+ rename: () => {},
+ statSize: () => 0,
+ fsync: () => {
+ fsyncCalls++;
+ },
+ };
+ const sink = createJournalSink({
+ path: join(tmpDir, "j.log"),
+ fs: mockFs,
+ fsync: "none",
+ });
+
+ sink.emit(logRecord);
+ sink.emit(logRecord);
+ sink.emit(logRecord);
+ expect(fsyncCalls).toBe(0);
+ });
+});
diff --git a/packages/journal-sink/src/journal-sink.ts b/packages/journal-sink/src/journal-sink.ts
new file mode 100644
index 0000000..8e36fd6
--- /dev/null
+++ b/packages/journal-sink/src/journal-sink.ts
@@ -0,0 +1,171 @@
+import { closeSync, fsyncSync, openSync, renameSync, statSync, writeSync } from "node:fs";
+import type { LogRecord, LogSink } from "@dispatch/kernel";
+
+// --- Pure core (no I/O) ---
+
+/**
+ * Serialize a LogRecord to exactly one NDJSON line (record JSON + newline).
+ * Pure function — no I/O, no side effects.
+ */
+export function serialize(record: LogRecord): string {
+ return `${JSON.stringify(record)}\n`;
+}
+
+// --- Imperative shell (fs edge) ---
+
+/** Injectable fs operations — confined to the edge. */
+export interface FsOps {
+ readonly open: (path: string) => number;
+ readonly write: (fd: number, data: string) => void;
+ readonly close: (fd: number) => void;
+ readonly rename: (oldPath: string, newPath: string) => void;
+ readonly statSize: (path: string) => number;
+ readonly fsync: (fd: number) => void;
+}
+
+/** Clock injection for fsync interval. */
+export interface ClockOps {
+ readonly now: () => number;
+}
+
+export interface JournalSinkOpts {
+ readonly path: string;
+ readonly maxBytes?: number;
+ readonly fsync?: "periodic" | "none";
+ readonly fs?: FsOps;
+ readonly clock?: ClockOps;
+}
+
+const DEFAULT_MAX_BYTES = 50 * 1024 * 1024; // 50 MB
+const FSYNC_INTERVAL_MS = 5_000; // 5 seconds
+
+/**
+ * Create a LogSink that durably appends each record to the journal file
+ * as one NDJSON line. Fail-safe: write errors drop + warn, never throw.
+ * Rotates when file exceeds maxBytes (rename → .1, reopen fresh).
+ */
+export function createJournalSink(opts: JournalSinkOpts): LogSink {
+ const filePath = opts.path;
+ const maxBytes = opts.maxBytes ?? DEFAULT_MAX_BYTES;
+ const syncMode = opts.fsync ?? "periodic";
+ const fs = opts.fs ?? createDefaultFsOps();
+ const clock = opts.clock ?? { now: () => Date.now() };
+
+ const NO_FD = -1;
+ let fd: number;
+ let bytesWritten: number;
+ try {
+ fd = fs.open(filePath);
+ bytesWritten = fs.statSize(filePath);
+ } catch {
+ fd = NO_FD;
+ bytesWritten = 0;
+ }
+ let lastFsyncAt = clock.now();
+
+ function tryOpen(): boolean {
+ if (fd !== NO_FD) return true;
+ try {
+ fd = fs.open(filePath);
+ bytesWritten = 0;
+ return true;
+ } catch {
+ return false;
+ }
+ }
+
+ function rotate(): void {
+ if (fd !== NO_FD) {
+ try {
+ fs.close(fd);
+ } catch {
+ // Ignore close errors during rotation.
+ }
+ }
+ const rotatedPath = `${filePath}.1`;
+ try {
+ fs.rename(filePath, rotatedPath);
+ } catch {
+ // If rename fails, just truncate by reopening.
+ }
+ try {
+ fd = fs.open(filePath);
+ } catch {
+ fd = NO_FD;
+ }
+ bytesWritten = 0;
+ }
+
+ function maybeFsync(): void {
+ if (syncMode !== "periodic" || fd === NO_FD) return;
+ const now = clock.now();
+ if (now - lastFsyncAt >= FSYNC_INTERVAL_MS) {
+ try {
+ fs.fsync(fd);
+ } catch {
+ // Swallow — fail-safe.
+ }
+ lastFsyncAt = now;
+ }
+ }
+
+ const sink: LogSink = {
+ emit(record: LogRecord): void {
+ try {
+ if (!tryOpen()) {
+ console.warn("[journal-sink] cannot open journal, dropping record");
+ return;
+ }
+
+ const line = serialize(record);
+ const lineBytes = Buffer.byteLength(line, "utf8");
+
+ if (bytesWritten + lineBytes > maxBytes) {
+ rotate();
+ if (fd === NO_FD) {
+ console.warn("[journal-sink] rotation failed, dropping record");
+ return;
+ }
+ }
+
+ fs.write(fd, line);
+ bytesWritten += lineBytes;
+ maybeFsync();
+ } catch (err) {
+ // Fail-safe: drop + warn, never throw to the caller (D3/D7).
+ console.warn("[journal-sink] write failed, dropping record:", err);
+ }
+ },
+ };
+
+ return sink;
+}
+
+// --- Default fs ops (real Bun/Node I/O) ---
+
+function createDefaultFsOps(): FsOps {
+ return {
+ open(path: string): number {
+ return openSync(path, "a");
+ },
+ write(fd: number, data: string): void {
+ writeSync(fd, data);
+ },
+ close(fd: number): void {
+ closeSync(fd);
+ },
+ rename(oldPath: string, newPath: string): void {
+ renameSync(oldPath, newPath);
+ },
+ statSize(path: string): number {
+ try {
+ return statSync(path).size;
+ } catch {
+ return 0;
+ }
+ },
+ fsync(fd: number): void {
+ fsyncSync(fd);
+ },
+ };
+}