summaryrefslogtreecommitdiffhomepage
path: root/packages/host-bin/src
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-05 15:52:42 +0900
committerAdam Malczewski <[email protected]>2026-06-05 15:52:42 +0900
commitdded4cc5570f47167edf3ea36f232ad4a32bcbc4 (patch)
treedbcb57a7daad05b6daa6e24d0e192c47e036890a /packages/host-bin/src
parent2bf8f9ec9714cbd33a0ecfb7b40dd7a335180fb7 (diff)
downloaddispatch-dded4cc5570f47167edf3ea36f232ad4a32bcbc4.tar.gz
dispatch-dded4cc5570f47167edf3ea36f232ad4a32bcbc4.zip
feat(observability): host-bin supervises the collector (spawn-first / drain-last / restart) — 288 tests
host-bin spawns the out-of-process collector before serving (real Bun.spawn adapted to a ChildHandle), restarts on unexpected exit (backoff + restart-guard cap), drains on SIGINT/SIGTERM (collector final-drain, SIGKILL fallback on timeout). createCollectorSupervisor takes an injected spawn so the lifecycle is unit-tested with a fake (no real subprocess). Collector failures never crash the app (D3 subordinate/fail-safe). New env DISPATCH_TRACE_DB (default ./.dispatch-data/traces.db). Verified: tsc -b clean, 288 tests (279 + 9 supervisor), biome 0/0. Live (clean single run): 1 collector during, trace DB auto-populated (nested easy-view), 0 collectors after shutdown.
Diffstat (limited to 'packages/host-bin/src')
-rw-r--r--packages/host-bin/src/collector-supervisor.test.ts301
-rw-r--r--packages/host-bin/src/collector-supervisor.ts138
-rw-r--r--packages/host-bin/src/main.ts28
3 files changed, 467 insertions, 0 deletions
diff --git a/packages/host-bin/src/collector-supervisor.test.ts b/packages/host-bin/src/collector-supervisor.test.ts
new file mode 100644
index 0000000..8c1f104
--- /dev/null
+++ b/packages/host-bin/src/collector-supervisor.test.ts
@@ -0,0 +1,301 @@
+import { describe, expect, it } from "vitest";
+import { type ChildHandle, createCollectorSupervisor } from "./collector-supervisor.js";
+
+interface FakeChild {
+ readonly handle: ChildHandle;
+ resolveExit: (code: number) => void;
+ readonly signals: string[];
+}
+
+function createFakeChild(code = 0): FakeChild {
+ let resolveExit!: (code: number) => void;
+ const exited = new Promise<number>((r) => {
+ resolveExit = r;
+ });
+ const signals: string[] = [];
+ const handle: ChildHandle = {
+ kill: (signal?: string) => {
+ signals.push(signal ?? "SIGTERM");
+ if (signal === "SIGKILL") resolveExit(code);
+ },
+ exited,
+ };
+ return { handle, resolveExit, signals };
+}
+
+function createFakeLogger() {
+ const msgs: Array<{ level: string; msg: string }> = [];
+ return {
+ msgs,
+ debug: () => {},
+ info: (msg: string) => msgs.push({ level: "info", msg }),
+ warn: (msg: string) => msgs.push({ level: "warn", msg }),
+ error: () => {},
+ child: () => createFakeLogger(),
+ span: () => ({
+ id: "s",
+ log: createFakeLogger(),
+ setAttributes: () => {},
+ addLink: () => {},
+ child: () => ({}) as never,
+ end: () => {},
+ }),
+ };
+}
+
+describe("createCollectorSupervisor", () => {
+ const DEFAULTS = {
+ journalPath: "/tmp/journal.ndjson",
+ dbPath: "/tmp/traces.db",
+ };
+
+ it("start() spawns with the correct command and args", () => {
+ let capturedCmd: string[] = [];
+ const children: FakeChild[] = [];
+ const spawn = (cmd: string[]) => {
+ capturedCmd = cmd;
+ const child = createFakeChild();
+ children.push(child);
+ return child.handle;
+ };
+
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: createFakeLogger() as never,
+ });
+ supervisor.start();
+
+ expect(capturedCmd).toEqual([
+ "bun",
+ "packages/observability-collector/src/main.ts",
+ "--journal",
+ "/tmp/journal.ndjson",
+ "--db",
+ "/tmp/traces.db",
+ ]);
+ });
+
+ it("start() passes --interval when provided", () => {
+ let capturedCmd: string[] = [];
+ const spawn = (cmd: string[]) => {
+ capturedCmd = cmd;
+ return createFakeChild().handle;
+ };
+
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ interval: 500,
+ spawn,
+ logger: createFakeLogger() as never,
+ });
+ supervisor.start();
+
+ expect(capturedCmd).toContain("--interval");
+ expect(capturedCmd).toContain("500");
+ });
+
+ it("unexpected child exit respawns the collector", async () => {
+ const children: FakeChild[] = [];
+ let spawnCount = 0;
+ const spawn = () => {
+ spawnCount++;
+ const child = createFakeChild();
+ children.push(child);
+ return child.handle;
+ };
+
+ const time = 0;
+ const now = () => time;
+ const delayResolvers: Array<() => void> = [];
+ const delay = (_ms: number) =>
+ new Promise<void>((r) => {
+ delayResolvers.push(r);
+ });
+
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: createFakeLogger() as never,
+ now,
+ delay,
+ });
+ supervisor.start();
+
+ expect(spawnCount).toBe(1);
+
+ // Simulate unexpected exit
+ children[0]?.resolveExit(1);
+ await Promise.resolve();
+ await Promise.resolve();
+
+ // Trigger the backoff delay resolver
+ expect(delayResolvers.length).toBe(1);
+ delayResolvers[0]?.();
+ await Promise.resolve();
+ await Promise.resolve();
+
+ expect(spawnCount).toBe(2);
+ });
+
+ it("restart guard caps respawns in a tight loop", async () => {
+ const children: FakeChild[] = [];
+ let spawnCount = 0;
+ const spawn = () => {
+ spawnCount++;
+ const child = createFakeChild();
+ children.push(child);
+ return child.handle;
+ };
+
+ const time = 0;
+ const now = () => time;
+ const delayResolvers: Array<() => void> = [];
+ const delay = (_ms: number) =>
+ new Promise<void>((r) => {
+ delayResolvers.push(r);
+ });
+
+ const logger = createFakeLogger();
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: logger as never,
+ now,
+ delay,
+ });
+ supervisor.start();
+
+ // Simulate rapid crashes (within the restart window)
+ for (let i = 0; i < 5; i++) {
+ children[i]?.resolveExit(1);
+ await Promise.resolve();
+ await Promise.resolve();
+ if (delayResolvers.length > i) {
+ delayResolvers[i]?.();
+ await Promise.resolve();
+ await Promise.resolve();
+ }
+ }
+
+ // Should have spawned 6 times (1 initial + 5 restarts)
+ expect(spawnCount).toBe(6);
+
+ // 6th child also dies — should NOT respawn (cap reached)
+ children[5]?.resolveExit(1);
+ await Promise.resolve();
+ await Promise.resolve();
+
+ // spawnCount should still be 6
+ expect(spawnCount).toBe(6);
+ expect(logger.msgs.some((m) => m.msg === "Collector restart cap reached; giving up")).toBe(
+ true,
+ );
+ });
+
+ it("stop() sends SIGTERM and does not respawn", async () => {
+ const child = createFakeChild();
+ const spawn = () => child.handle;
+
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: createFakeLogger() as never,
+ });
+ supervisor.start();
+
+ // Resolve exit after SIGTERM (simulating graceful shutdown)
+ const stopPromise = supervisor.stop();
+ child.resolveExit(0);
+ await stopPromise;
+
+ expect(child.signals).toContain("SIGTERM");
+ });
+
+ it("stop() sends SIGKILL when child does not exit in time", async () => {
+ const child = createFakeChild();
+ const spawn = () => child.handle;
+
+ const time = 0;
+ const now = () => time;
+ const delayResolvers: Array<() => void> = [];
+ const delay = (_ms: number) =>
+ new Promise<void>((r) => {
+ delayResolvers.push(r);
+ });
+
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: createFakeLogger() as never,
+ now,
+ delay,
+ });
+ supervisor.start();
+
+ const stopPromise = supervisor.stop();
+
+ // Don't resolve exit — simulate hung child
+ // Resolve the timeout delay instead
+ expect(delayResolvers.length).toBe(1);
+ delayResolvers[0]?.();
+ await stopPromise;
+
+ expect(child.signals).toContain("SIGTERM");
+ expect(child.signals).toContain("SIGKILL");
+ });
+
+ it("stop() does not respawn after unexpected exit during stop", async () => {
+ const children: FakeChild[] = [];
+ let spawnCount = 0;
+ const spawn = () => {
+ spawnCount++;
+ const child = createFakeChild();
+ children.push(child);
+ return child.handle;
+ };
+
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: createFakeLogger() as never,
+ });
+ supervisor.start();
+ expect(spawnCount).toBe(1);
+
+ // Child exits during stop — supervisor is already stopping
+ const stopPromise = supervisor.stop();
+ children[0]?.resolveExit(1);
+ await stopPromise;
+
+ // Should NOT have respawned
+ expect(spawnCount).toBe(1);
+ });
+
+ it("spawn throwing does not throw to caller", () => {
+ const spawn = () => {
+ throw new Error("spawn failed");
+ };
+
+ const logger = createFakeLogger();
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: logger as never,
+ });
+
+ expect(() => supervisor.start()).not.toThrow();
+ expect(logger.msgs.some((m) => m.msg === "Failed to spawn collector")).toBe(true);
+ });
+
+ it("stop() is safe to call when no child was started", async () => {
+ const spawn = () => createFakeChild().handle;
+ const supervisor = createCollectorSupervisor({
+ ...DEFAULTS,
+ spawn,
+ logger: createFakeLogger() as never,
+ });
+
+ await expect(supervisor.stop()).resolves.toBeUndefined();
+ });
+});
diff --git a/packages/host-bin/src/collector-supervisor.ts b/packages/host-bin/src/collector-supervisor.ts
new file mode 100644
index 0000000..7b893b9
--- /dev/null
+++ b/packages/host-bin/src/collector-supervisor.ts
@@ -0,0 +1,138 @@
+import type { Logger } from "@dispatch/kernel";
+
+export interface ChildHandle {
+ readonly kill: (signal?: string) => void;
+ readonly exited: Promise<number>;
+}
+
+export interface SupervisorDeps {
+ readonly spawn: (cmd: string[]) => ChildHandle;
+ readonly journalPath: string;
+ readonly dbPath: string;
+ readonly interval?: number;
+ readonly logger: Logger;
+ readonly now?: () => number;
+ readonly delay?: (ms: number) => Promise<void>;
+}
+
+const RESTART_WINDOW_MS = 10_000;
+const MAX_RESTARTS_IN_WINDOW = 5;
+const BACKOFF_BASE_MS = 500;
+const STOP_TIMEOUT_MS = 5_000;
+
+export function createCollectorSupervisor(deps: SupervisorDeps): {
+ start: () => void;
+ stop: () => Promise<void>;
+} {
+ const {
+ spawn,
+ journalPath,
+ dbPath,
+ interval,
+ logger,
+ now = () => Date.now(),
+ delay = (ms) => new Promise((r) => setTimeout(r, ms)),
+ } = deps;
+
+ let child: ChildHandle | null = null;
+ let stopping = false;
+ const restartTimestamps: number[] = [];
+
+ function buildCmd(): string[] {
+ const cmd = [
+ "bun",
+ "packages/observability-collector/src/main.ts",
+ "--journal",
+ journalPath,
+ "--db",
+ dbPath,
+ ];
+ if (interval !== undefined) {
+ cmd.push("--interval", String(interval));
+ }
+ return cmd;
+ }
+
+ function pruneOldRestarts(): void {
+ const cutoff = now() - RESTART_WINDOW_MS;
+ while (restartTimestamps.length > 0) {
+ const oldest = restartTimestamps[0];
+ if (oldest === undefined || oldest > cutoff) break;
+ restartTimestamps.shift();
+ }
+ }
+
+ function shouldRestart(): boolean {
+ pruneOldRestarts();
+ return restartTimestamps.length < MAX_RESTARTS_IN_WINDOW;
+ }
+
+ function getBackoffMs(): number {
+ return BACKOFF_BASE_MS * 2 ** restartTimestamps.length;
+ }
+
+ function onChildExit(code: number): void {
+ child = null;
+ if (stopping) return;
+
+ logger.warn("Collector exited unexpectedly", { code } as never);
+ if (!shouldRestart()) {
+ logger.warn("Collector restart cap reached; giving up", {
+ restarts: restartTimestamps.length,
+ windowMs: RESTART_WINDOW_MS,
+ } as never);
+ return;
+ }
+
+ restartTimestamps.push(now());
+ const backoff = getBackoffMs();
+ logger.info("Restarting collector after backoff", { backoffMs: backoff } as never);
+ delay(backoff)
+ .then(() => {
+ if (!stopping) spawnChild();
+ })
+ .catch(() => {});
+ }
+
+ function spawnChild(): void {
+ try {
+ const handle = spawn(buildCmd());
+ child = handle;
+ logger.info("Collector started");
+ handle.exited.then(
+ (code) => onChildExit(code),
+ () => {},
+ );
+ } catch (err) {
+ logger.warn("Failed to spawn collector", { err } as never);
+ }
+ }
+
+ function start(): void {
+ spawnChild();
+ }
+
+ async function stop(): Promise<void> {
+ stopping = true;
+ if (!child) return;
+
+ const handle = child;
+ handle.kill("SIGTERM");
+
+ let resolved = false;
+ const exitedPromise = handle.exited.then(() => {
+ resolved = true;
+ });
+
+ const timeoutPromise = delay(STOP_TIMEOUT_MS).then(() => {
+ if (!resolved) {
+ handle.kill("SIGKILL");
+ return handle.exited;
+ }
+ });
+
+ await Promise.race([exitedPromise, timeoutPromise]);
+ }
+
+ return { start, stop };
+}
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index 0c795b8..4e4b3e4 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -22,6 +22,8 @@ import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestra
import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite";
import { extension as toolReadFileExt } from "@dispatch/tool-read-file";
import { createServer, extension as transportHttpExt } from "@dispatch/transport-http";
+import type { ChildHandle } from "./collector-supervisor.js";
+import { createCollectorSupervisor } from "./collector-supervisor.js";
import { configMapToAccess, envToConfigMap } from "./config.js";
function createEmptySecrets(): SecretsAccess {
@@ -63,6 +65,23 @@ async function boot(): Promise<void> {
const logDeps: LogDeps = { now: () => Date.now(), newId: () => crypto.randomUUID() };
const logger = createLogger({ extensionId: "host-bin" }, logSink, logDeps);
+ const traceDbPath = process.env.DISPATCH_TRACE_DB ?? "./.dispatch-data/traces.db";
+
+ const supervisor = createCollectorSupervisor({
+ spawn: (cmd: string[]) => {
+ const proc = Bun.spawn(cmd, { stdout: "inherit", stderr: "inherit" });
+ const handle: ChildHandle = {
+ kill: (signal?: string) => proc.kill(signal as NodeJS.Signals),
+ exited: proc.exited,
+ };
+ return handle;
+ },
+ journalPath,
+ dbPath: traceDbPath,
+ logger: logger.child({ extensionId: "collector-supervisor" }),
+ });
+ supervisor.start();
+
const dbPath = process.env.DISPATCH_DB ?? "./.dispatch-data/dispatch.db";
mkdirSync(dirname(dbPath), { recursive: true });
const sqliteBackend = createSqliteStorage({ path: dbPath });
@@ -100,6 +119,15 @@ async function boot(): Promise<void> {
// Port precedence: BACKEND_PORT (the rewrite's assigned port) → PORT → default.
const port = Number(process.env.BACKEND_PORT) || Number(process.env.PORT) || 24203;
const server = Bun.serve({ fetch: app.fetch, port });
+
+ const shutdown = async () => {
+ logger.info("Shutting down — draining collector");
+ await supervisor.stop();
+ process.exit(0);
+ };
+ process.on("SIGINT", shutdown);
+ process.on("SIGTERM", shutdown);
+
logger.info(`Dispatch listening on http://localhost:${server.port}`);
console.info(`Dispatch listening on http://localhost:${server.port}`);
}