1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
import { createTraceStore, DEFAULT_RETENTION } from "@dispatch/trace-store";
import type { Logger } from "./collector.js";
import { drainOnce, readOffset, shouldPrune, writeOffset } from "./collector.js";
// --- Argv parsing ---
interface CliArgs {
readonly journal: string;
readonly db: string;
readonly interval: number;
readonly pruneIntervalMs: number;
}
function parseArgs(argv: string[]): CliArgs {
let journal = "";
let db = "./.dispatch-data/traces.db";
let interval = 250;
let pruneIntervalMs = 60_000;
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
if (arg === "--journal" && i + 1 < argv.length) {
journal = argv[i + 1] ?? "";
i++;
} else if (arg === "--db" && i + 1 < argv.length) {
db = argv[i + 1] ?? db;
i++;
} else if (arg === "--interval" && i + 1 < argv.length) {
const val = Number(argv[i + 1]);
if (Number.isFinite(val) && val > 0) interval = val;
i++;
} else if (arg === "--prune-interval-ms" && i + 1 < argv.length) {
const val = Number(argv[i + 1]);
if (Number.isFinite(val) && val > 0) pruneIntervalMs = val;
i++;
}
}
if (!journal) {
console.error(
"Usage: observability-collector --journal <path> [--db <path>] [--interval <ms>] [--prune-interval-ms <ms>]",
);
process.exit(1);
}
return { journal, db, interval, pruneIntervalMs };
}
// --- Logger ---
const logger: Logger = {
info: (...args: readonly unknown[]) => console.log("[observability-collector]", ...args),
debug: (...args: readonly unknown[]) => console.debug("[observability-collector]", ...args),
};
// --- Main loop ---
async function main(): Promise<void> {
const args = parseArgs(process.argv.slice(2));
const sidecarPath = `${args.journal}.collector-offset`;
const store = createTraceStore({ path: args.db });
let offset = readOffset(sidecarPath);
let lastPruneAt = Date.now();
let shuttingDown = false;
function onSignal(): void {
if (shuttingDown) return;
shuttingDown = true;
}
process.on("SIGINT", onSignal);
process.on("SIGTERM", onSignal);
while (!shuttingDown) {
const result = drainOnce({ journalPath: args.journal, offset, store });
if (result.newOffset !== offset) {
offset = result.newOffset;
writeOffset(sidecarPath, offset);
}
const now = Date.now();
if (shouldPrune(now, lastPruneAt, args.pruneIntervalMs)) {
lastPruneAt = now;
try {
const summary = store.prune(DEFAULT_RETENTION);
logger.debug("prune completed", summary);
} catch (err) {
logger.info("prune failed (non-fatal)", err);
}
}
await sleep(args.interval);
}
// Final drain on shutdown
const finalResult = drainOnce({ journalPath: args.journal, offset, store });
if (finalResult.newOffset !== offset) {
writeOffset(sidecarPath, finalResult.newOffset);
}
store.close();
process.exit(0);
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
main().catch((err) => {
console.error("[observability-collector] fatal:", err);
process.exit(1);
});
|