diff options
| -rw-r--r-- | packages/host-bin/src/collector-supervisor.test.ts | 301 | ||||
| -rw-r--r-- | packages/host-bin/src/collector-supervisor.ts | 138 | ||||
| -rw-r--r-- | packages/host-bin/src/main.ts | 28 |
3 files changed, 467 insertions, 0 deletions
diff --git a/packages/host-bin/src/collector-supervisor.test.ts b/packages/host-bin/src/collector-supervisor.test.ts new file mode 100644 index 0000000..8c1f104 --- /dev/null +++ b/packages/host-bin/src/collector-supervisor.test.ts @@ -0,0 +1,301 @@ +import { describe, expect, it } from "vitest"; +import { type ChildHandle, createCollectorSupervisor } from "./collector-supervisor.js"; + +interface FakeChild { + readonly handle: ChildHandle; + resolveExit: (code: number) => void; + readonly signals: string[]; +} + +function createFakeChild(code = 0): FakeChild { + let resolveExit!: (code: number) => void; + const exited = new Promise<number>((r) => { + resolveExit = r; + }); + const signals: string[] = []; + const handle: ChildHandle = { + kill: (signal?: string) => { + signals.push(signal ?? "SIGTERM"); + if (signal === "SIGKILL") resolveExit(code); + }, + exited, + }; + return { handle, resolveExit, signals }; +} + +function createFakeLogger() { + const msgs: Array<{ level: string; msg: string }> = []; + return { + msgs, + debug: () => {}, + info: (msg: string) => msgs.push({ level: "info", msg }), + warn: (msg: string) => msgs.push({ level: "warn", msg }), + error: () => {}, + child: () => createFakeLogger(), + span: () => ({ + id: "s", + log: createFakeLogger(), + setAttributes: () => {}, + addLink: () => {}, + child: () => ({}) as never, + end: () => {}, + }), + }; +} + +describe("createCollectorSupervisor", () => { + const DEFAULTS = { + journalPath: "/tmp/journal.ndjson", + dbPath: "/tmp/traces.db", + }; + + it("start() spawns with the correct command and args", () => { + let capturedCmd: string[] = []; + const children: FakeChild[] = []; + const spawn = (cmd: string[]) => { + capturedCmd = cmd; + const child = createFakeChild(); + children.push(child); + return child.handle; + }; + + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: createFakeLogger() as never, + }); + supervisor.start(); + + expect(capturedCmd).toEqual([ + "bun", + "packages/observability-collector/src/main.ts", + "--journal", + "/tmp/journal.ndjson", + "--db", + "/tmp/traces.db", + ]); + }); + + it("start() passes --interval when provided", () => { + let capturedCmd: string[] = []; + const spawn = (cmd: string[]) => { + capturedCmd = cmd; + return createFakeChild().handle; + }; + + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + interval: 500, + spawn, + logger: createFakeLogger() as never, + }); + supervisor.start(); + + expect(capturedCmd).toContain("--interval"); + expect(capturedCmd).toContain("500"); + }); + + it("unexpected child exit respawns the collector", async () => { + const children: FakeChild[] = []; + let spawnCount = 0; + const spawn = () => { + spawnCount++; + const child = createFakeChild(); + children.push(child); + return child.handle; + }; + + const time = 0; + const now = () => time; + const delayResolvers: Array<() => void> = []; + const delay = (_ms: number) => + new Promise<void>((r) => { + delayResolvers.push(r); + }); + + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: createFakeLogger() as never, + now, + delay, + }); + supervisor.start(); + + expect(spawnCount).toBe(1); + + // Simulate unexpected exit + children[0]?.resolveExit(1); + await Promise.resolve(); + await Promise.resolve(); + + // Trigger the backoff delay resolver + expect(delayResolvers.length).toBe(1); + delayResolvers[0]?.(); + await Promise.resolve(); + await Promise.resolve(); + + expect(spawnCount).toBe(2); + }); + + it("restart guard caps respawns in a tight loop", async () => { + const children: FakeChild[] = []; + let spawnCount = 0; + const spawn = () => { + spawnCount++; + const child = createFakeChild(); + children.push(child); + return child.handle; + }; + + const time = 0; + const now = () => time; + const delayResolvers: Array<() => void> = []; + const delay = (_ms: number) => + new Promise<void>((r) => { + delayResolvers.push(r); + }); + + const logger = createFakeLogger(); + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: logger as never, + now, + delay, + }); + supervisor.start(); + + // Simulate rapid crashes (within the restart window) + for (let i = 0; i < 5; i++) { + children[i]?.resolveExit(1); + await Promise.resolve(); + await Promise.resolve(); + if (delayResolvers.length > i) { + delayResolvers[i]?.(); + await Promise.resolve(); + await Promise.resolve(); + } + } + + // Should have spawned 6 times (1 initial + 5 restarts) + expect(spawnCount).toBe(6); + + // 6th child also dies — should NOT respawn (cap reached) + children[5]?.resolveExit(1); + await Promise.resolve(); + await Promise.resolve(); + + // spawnCount should still be 6 + expect(spawnCount).toBe(6); + expect(logger.msgs.some((m) => m.msg === "Collector restart cap reached; giving up")).toBe( + true, + ); + }); + + it("stop() sends SIGTERM and does not respawn", async () => { + const child = createFakeChild(); + const spawn = () => child.handle; + + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: createFakeLogger() as never, + }); + supervisor.start(); + + // Resolve exit after SIGTERM (simulating graceful shutdown) + const stopPromise = supervisor.stop(); + child.resolveExit(0); + await stopPromise; + + expect(child.signals).toContain("SIGTERM"); + }); + + it("stop() sends SIGKILL when child does not exit in time", async () => { + const child = createFakeChild(); + const spawn = () => child.handle; + + const time = 0; + const now = () => time; + const delayResolvers: Array<() => void> = []; + const delay = (_ms: number) => + new Promise<void>((r) => { + delayResolvers.push(r); + }); + + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: createFakeLogger() as never, + now, + delay, + }); + supervisor.start(); + + const stopPromise = supervisor.stop(); + + // Don't resolve exit — simulate hung child + // Resolve the timeout delay instead + expect(delayResolvers.length).toBe(1); + delayResolvers[0]?.(); + await stopPromise; + + expect(child.signals).toContain("SIGTERM"); + expect(child.signals).toContain("SIGKILL"); + }); + + it("stop() does not respawn after unexpected exit during stop", async () => { + const children: FakeChild[] = []; + let spawnCount = 0; + const spawn = () => { + spawnCount++; + const child = createFakeChild(); + children.push(child); + return child.handle; + }; + + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: createFakeLogger() as never, + }); + supervisor.start(); + expect(spawnCount).toBe(1); + + // Child exits during stop — supervisor is already stopping + const stopPromise = supervisor.stop(); + children[0]?.resolveExit(1); + await stopPromise; + + // Should NOT have respawned + expect(spawnCount).toBe(1); + }); + + it("spawn throwing does not throw to caller", () => { + const spawn = () => { + throw new Error("spawn failed"); + }; + + const logger = createFakeLogger(); + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: logger as never, + }); + + expect(() => supervisor.start()).not.toThrow(); + expect(logger.msgs.some((m) => m.msg === "Failed to spawn collector")).toBe(true); + }); + + it("stop() is safe to call when no child was started", async () => { + const spawn = () => createFakeChild().handle; + const supervisor = createCollectorSupervisor({ + ...DEFAULTS, + spawn, + logger: createFakeLogger() as never, + }); + + await expect(supervisor.stop()).resolves.toBeUndefined(); + }); +}); diff --git a/packages/host-bin/src/collector-supervisor.ts b/packages/host-bin/src/collector-supervisor.ts new file mode 100644 index 0000000..7b893b9 --- /dev/null +++ b/packages/host-bin/src/collector-supervisor.ts @@ -0,0 +1,138 @@ +import type { Logger } from "@dispatch/kernel"; + +export interface ChildHandle { + readonly kill: (signal?: string) => void; + readonly exited: Promise<number>; +} + +export interface SupervisorDeps { + readonly spawn: (cmd: string[]) => ChildHandle; + readonly journalPath: string; + readonly dbPath: string; + readonly interval?: number; + readonly logger: Logger; + readonly now?: () => number; + readonly delay?: (ms: number) => Promise<void>; +} + +const RESTART_WINDOW_MS = 10_000; +const MAX_RESTARTS_IN_WINDOW = 5; +const BACKOFF_BASE_MS = 500; +const STOP_TIMEOUT_MS = 5_000; + +export function createCollectorSupervisor(deps: SupervisorDeps): { + start: () => void; + stop: () => Promise<void>; +} { + const { + spawn, + journalPath, + dbPath, + interval, + logger, + now = () => Date.now(), + delay = (ms) => new Promise((r) => setTimeout(r, ms)), + } = deps; + + let child: ChildHandle | null = null; + let stopping = false; + const restartTimestamps: number[] = []; + + function buildCmd(): string[] { + const cmd = [ + "bun", + "packages/observability-collector/src/main.ts", + "--journal", + journalPath, + "--db", + dbPath, + ]; + if (interval !== undefined) { + cmd.push("--interval", String(interval)); + } + return cmd; + } + + function pruneOldRestarts(): void { + const cutoff = now() - RESTART_WINDOW_MS; + while (restartTimestamps.length > 0) { + const oldest = restartTimestamps[0]; + if (oldest === undefined || oldest > cutoff) break; + restartTimestamps.shift(); + } + } + + function shouldRestart(): boolean { + pruneOldRestarts(); + return restartTimestamps.length < MAX_RESTARTS_IN_WINDOW; + } + + function getBackoffMs(): number { + return BACKOFF_BASE_MS * 2 ** restartTimestamps.length; + } + + function onChildExit(code: number): void { + child = null; + if (stopping) return; + + logger.warn("Collector exited unexpectedly", { code } as never); + if (!shouldRestart()) { + logger.warn("Collector restart cap reached; giving up", { + restarts: restartTimestamps.length, + windowMs: RESTART_WINDOW_MS, + } as never); + return; + } + + restartTimestamps.push(now()); + const backoff = getBackoffMs(); + logger.info("Restarting collector after backoff", { backoffMs: backoff } as never); + delay(backoff) + .then(() => { + if (!stopping) spawnChild(); + }) + .catch(() => {}); + } + + function spawnChild(): void { + try { + const handle = spawn(buildCmd()); + child = handle; + logger.info("Collector started"); + handle.exited.then( + (code) => onChildExit(code), + () => {}, + ); + } catch (err) { + logger.warn("Failed to spawn collector", { err } as never); + } + } + + function start(): void { + spawnChild(); + } + + async function stop(): Promise<void> { + stopping = true; + if (!child) return; + + const handle = child; + handle.kill("SIGTERM"); + + let resolved = false; + const exitedPromise = handle.exited.then(() => { + resolved = true; + }); + + const timeoutPromise = delay(STOP_TIMEOUT_MS).then(() => { + if (!resolved) { + handle.kill("SIGKILL"); + return handle.exited; + } + }); + + await Promise.race([exitedPromise, timeoutPromise]); + } + + return { start, stop }; +} diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index 0c795b8..4e4b3e4 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -22,6 +22,8 @@ import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestra import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite"; import { extension as toolReadFileExt } from "@dispatch/tool-read-file"; import { createServer, extension as transportHttpExt } from "@dispatch/transport-http"; +import type { ChildHandle } from "./collector-supervisor.js"; +import { createCollectorSupervisor } from "./collector-supervisor.js"; import { configMapToAccess, envToConfigMap } from "./config.js"; function createEmptySecrets(): SecretsAccess { @@ -63,6 +65,23 @@ async function boot(): Promise<void> { const logDeps: LogDeps = { now: () => Date.now(), newId: () => crypto.randomUUID() }; const logger = createLogger({ extensionId: "host-bin" }, logSink, logDeps); + const traceDbPath = process.env.DISPATCH_TRACE_DB ?? "./.dispatch-data/traces.db"; + + const supervisor = createCollectorSupervisor({ + spawn: (cmd: string[]) => { + const proc = Bun.spawn(cmd, { stdout: "inherit", stderr: "inherit" }); + const handle: ChildHandle = { + kill: (signal?: string) => proc.kill(signal as NodeJS.Signals), + exited: proc.exited, + }; + return handle; + }, + journalPath, + dbPath: traceDbPath, + logger: logger.child({ extensionId: "collector-supervisor" }), + }); + supervisor.start(); + const dbPath = process.env.DISPATCH_DB ?? "./.dispatch-data/dispatch.db"; mkdirSync(dirname(dbPath), { recursive: true }); const sqliteBackend = createSqliteStorage({ path: dbPath }); @@ -100,6 +119,15 @@ async function boot(): Promise<void> { // Port precedence: BACKEND_PORT (the rewrite's assigned port) → PORT → default. const port = Number(process.env.BACKEND_PORT) || Number(process.env.PORT) || 24203; const server = Bun.serve({ fetch: app.fetch, port }); + + const shutdown = async () => { + logger.info("Shutting down — draining collector"); + await supervisor.stop(); + process.exit(0); + }; + process.on("SIGINT", shutdown); + process.on("SIGTERM", shutdown); + logger.info(`Dispatch listening on http://localhost:${server.port}`); console.info(`Dispatch listening on http://localhost:${server.port}`); } |
