summaryrefslogtreecommitdiffhomepage
path: root/packages/observability-collector/src/main.ts
blob: 828b8376d7f2314a1a7195954ebfa2bea636e4e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import { createTraceStore, DEFAULT_RETENTION } from "@dispatch/trace-store";
import type { Logger } from "./collector.js";
import { drainOnce, readOffset, shouldPrune, writeOffset } from "./collector.js";

// --- Argv parsing ---

interface CliArgs {
	readonly journal: string;
	readonly db: string;
	readonly interval: number;
	readonly pruneIntervalMs: number;
}

function parseArgs(argv: string[]): CliArgs {
	let journal = "";
	let db = "./.dispatch-data/traces.db";
	let interval = 250;
	let pruneIntervalMs = 60_000;

	for (let i = 0; i < argv.length; i++) {
		const arg = argv[i];
		if (arg === "--journal" && i + 1 < argv.length) {
			journal = argv[i + 1] ?? "";
			i++;
		} else if (arg === "--db" && i + 1 < argv.length) {
			db = argv[i + 1] ?? db;
			i++;
		} else if (arg === "--interval" && i + 1 < argv.length) {
			const val = Number(argv[i + 1]);
			if (Number.isFinite(val) && val > 0) interval = val;
			i++;
		} else if (arg === "--prune-interval-ms" && i + 1 < argv.length) {
			const val = Number(argv[i + 1]);
			if (Number.isFinite(val) && val > 0) pruneIntervalMs = val;
			i++;
		}
	}

	if (!journal) {
		console.error(
			"Usage: observability-collector --journal <path> [--db <path>] [--interval <ms>] [--prune-interval-ms <ms>]",
		);
		process.exit(1);
	}

	return { journal, db, interval, pruneIntervalMs };
}

// --- Logger ---

const logger: Logger = {
	info: (...args: readonly unknown[]) => console.log("[observability-collector]", ...args),
	debug: (...args: readonly unknown[]) => console.debug("[observability-collector]", ...args),
};

// --- Main loop ---

async function main(): Promise<void> {
	const args = parseArgs(process.argv.slice(2));
	const sidecarPath = `${args.journal}.collector-offset`;
	const store = createTraceStore({ path: args.db });

	let offset = readOffset(sidecarPath);
	let lastPruneAt = Date.now();

	let shuttingDown = false;

	function onSignal(): void {
		if (shuttingDown) return;
		shuttingDown = true;
	}

	process.on("SIGINT", onSignal);
	process.on("SIGTERM", onSignal);

	while (!shuttingDown) {
		const result = drainOnce({ journalPath: args.journal, offset, store });
		if (result.newOffset !== offset) {
			offset = result.newOffset;
			writeOffset(sidecarPath, offset);
		}

		const now = Date.now();
		if (shouldPrune(now, lastPruneAt, args.pruneIntervalMs)) {
			lastPruneAt = now;
			try {
				const summary = store.prune(DEFAULT_RETENTION);
				logger.debug("prune completed", summary);
			} catch (err) {
				logger.info("prune failed (non-fatal)", err);
			}
		}

		await sleep(args.interval);
	}

	// Final drain on shutdown
	const finalResult = drainOnce({ journalPath: args.journal, offset, store });
	if (finalResult.newOffset !== offset) {
		writeOffset(sidecarPath, finalResult.newOffset);
	}

	store.close();
	process.exit(0);
}

function sleep(ms: number): Promise<void> {
	return new Promise((resolve) => setTimeout(resolve, ms));
}

main().catch((err) => {
	console.error("[observability-collector] fatal:", err);
	process.exit(1);
});