diff options
| author | Adam Malczewski <[email protected]> | 2026-06-28 08:43:29 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-28 08:43:29 +0900 |
| commit | 1cd66da48f8c0a35b4208202d07bcd9f20fbc2c2 (patch) | |
| tree | ae452a0ea54b8c8de493875aeb81d4402eab3819 | |
| parent | b3b6eb41c5a436c95602aeaacb93ef63ec6c285f (diff) | |
| download | dispatch-1cd66da48f8c0a35b4208202d07bcd9f20fbc2c2.tar.gz dispatch-1cd66da48f8c0a35b4208202d07bcd9f20fbc2c2.zip | |
feat(observability): periodic memory-usage logging to localize leak source
| -rw-r--r-- | packages/host-bin/src/main.ts | 40 | ||||
| -rw-r--r-- | packages/host-bin/src/mem-telemetry.test.ts | 225 | ||||
| -rw-r--r-- | packages/host-bin/src/mem-telemetry.ts | 160 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/extension.ts | 13 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/index.ts | 3 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 138 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 53 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/pure.test.ts | 71 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/pure.ts | 69 |
9 files changed, 770 insertions, 2 deletions
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index aa114d5..f892fe8 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -27,7 +27,11 @@ import { extension as messageQueueExt } from "@dispatch/message-queue"; import { extension as providerConcurrencyExt } from "@dispatch/provider-concurrency"; import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat"; import { extension as providerUmansExt } from "@dispatch/provider-umans"; -import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator"; +import { + type MemorySample, + extension as sessionOrchestratorExt, + sessionOrchestratorHandle, +} from "@dispatch/session-orchestrator"; import { extension as skillsExt } from "@dispatch/skills"; import { extension as sshExt } from "@dispatch/ssh"; import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite"; @@ -49,6 +53,7 @@ import type { ChildHandle } from "./collector-supervisor.js"; import { createCollectorSupervisor } from "./collector-supervisor.js"; import { configMapToAccess, envToConfigMap } from "./config.js"; import { loadExternalExtensions } from "./load-external.js"; +import { startMemoryTelemetry } from "./mem-telemetry.js"; function createEmptySecrets(): SecretsAccess { return { @@ -227,10 +232,43 @@ async function boot(): Promise<void> { } } + // Periodic memory telemetry — leak-localization edge effect (AGENTS.md: + // timers are edge effects owned by host-bin, the composition root, NOT the + // kernel). Logs process.memoryUsage() every 60s tagged with the active- + // conversation count, and every 5 min runs Bun.gc(true) + logs RSS + // before/after to distinguish live retained objects from GC fragmentation. + // The per-turn before/after sampling lives in session-orchestrator; this + // owns the PERIODIC baseline. All effects are injected (no ambient state); + // stop() is cleared on shutdown so timers never leak across a restart. + let memoryTelemetry: { stop: () => void } | undefined; + try { + const orchestrator = host.getHostAPI().getService(sessionOrchestratorHandle); + memoryTelemetry = startMemoryTelemetry({ + logger: logger.child({ extensionId: "mem-telemetry" }), + sampleMemory: (): MemorySample => { + const m = process.memoryUsage(); + return { + rss: m.rss, + heapUsed: m.heapUsed, + heapTotal: m.heapTotal, + external: m.external, + arrayBuffers: m.arrayBuffers, + }; + }, + gc: () => Bun.gc(true), + getActiveConversationCount: () => orchestrator.getActiveConversationCount(), + }); + } catch (err) { + logger.error("Memory telemetry not started (session-orchestrator unavailable)", { + err, + }); + } + let shuttingDown = false; const shutdown = async () => { if (shuttingDown) return; shuttingDown = true; + memoryTelemetry?.stop(); logger.info("Shutting down — deactivating extensions"); await host.deactivate(); logger.info("Draining collector"); diff --git a/packages/host-bin/src/mem-telemetry.test.ts b/packages/host-bin/src/mem-telemetry.test.ts new file mode 100644 index 0000000..20237ef --- /dev/null +++ b/packages/host-bin/src/mem-telemetry.test.ts @@ -0,0 +1,225 @@ +import type { Attributes, Logger } from "@dispatch/kernel"; +import type { MemorySample } from "@dispatch/session-orchestrator"; +import { describe, expect, it } from "vitest"; +import { + buildGcAttributes, + buildPeriodicAttributes, + type MemoryTelemetryDeps, + startMemoryTelemetry, +} from "./mem-telemetry.js"; + +/** Minimal capturing logger — records every info/debug/warn/error call. */ +interface CapturedLog { + readonly level: string; + readonly msg: string; + readonly attrs?: Attributes; +} + +function capturingLogger(): { logger: Logger; logs: CapturedLog[] } { + const logs: CapturedLog[] = []; + const record = (level: string) => (msg: string, attrs?: Attributes) => { + logs.push({ level, msg, attrs }); + }; + const logger: Logger = { + debug: record("debug"), + info: record("info"), + warn: record("warn"), + error: () => {}, + child: () => logger, + span: () => ({ + id: "s", + log: logger, + setAttributes: () => {}, + addLink: () => {}, + child: () => ({}) as never, + end: () => {}, + }), + }; + return { logger, logs }; +} + +const SAMPLE_A: MemorySample = { + rss: 100 * 1024 * 1024, + heapUsed: 40 * 1024 * 1024, + heapTotal: 60 * 1024 * 1024, + external: 5 * 1024 * 1024, + arrayBuffers: 2 * 1024 * 1024, +}; +const SAMPLE_B: MemorySample = { + rss: 300 * 1024 * 1024, + heapUsed: 80 * 1024 * 1024, + heapTotal: 60 * 1024 * 1024, + external: 5 * 1024 * 1024, + arrayBuffers: 6 * 1024 * 1024, +}; + +describe("buildPeriodicAttributes", () => { + it("formats the sample as MB and tags the active-conversation count", () => { + const attrs = buildPeriodicAttributes(SAMPLE_A, 3); + expect(attrs).toEqual({ + rssMB: 100, + heapUsedMB: 40, + heapTotalMB: 60, + externalMB: 5, + arrayBuffersMB: 2, + activeConversations: 3, + }); + }); +}); + +describe("buildGcAttributes", () => { + it("carries absolute after values plus reclaimed (before-after) delta", () => { + const attrs = buildGcAttributes(SAMPLE_A, SAMPLE_B); + // Absolute "after" values (SAMPLE_B). + expect(attrs.rssMB).toBe(300); + expect(attrs.heapUsedMB).toBe(80); + // Reclaimed delta: before - after is negative here (memory GREW), so + // reclaimedRssMB = round((100-300) MB) = -200. + expect(attrs.reclaimedRssMB).toBe(-200); + expect(attrs.reclaimedHeapUsedMB).toBe(-40); + expect(attrs.reclaimedHeapTotalMB).toBe(0); + expect(attrs.reclaimedArrayBuffersMB).toBe(-4); + }); + + it("shows a positive reclaimed value when GC freed memory", () => { + const attrs = buildGcAttributes(SAMPLE_B, SAMPLE_A); + expect(attrs.reclaimedRssMB).toBe(200); + }); +}); + +describe("startMemoryTelemetry", () => { + function fakeTimers(): { + setInterval: MemoryTelemetryDeps["setInterval"]; + clearInterval: MemoryTelemetryDeps["clearInterval"]; + tick: (name: "sample" | "gc") => void; + cleared: { sample: number; gc: number }; + } { + // Store the callbacks keyed by interval so we can drive either one. The + // gc interval is always larger than the sample interval, so route the + // larger ms to the gc callback. + let sampleCb: (() => void) | undefined; + let gcCb: (() => void) | undefined; + let firstMs = 0; + const cleared = { sample: 0, gc: 0 }; + return { + setInterval: (fn, ms) => { + if (firstMs === 0) { + firstMs = ms; + sampleCb = fn; + return "sample" as never; + } + // The second timer registered is the gc one (larger interval). + gcCb = fn; + return "gc" as never; + }, + clearInterval: (handle) => { + if (handle === "sample") cleared.sample++; + else if (handle === "gc") cleared.gc++; + }, + tick: (name) => { + if (name === "sample") sampleCb?.(); + else gcCb?.(); + }, + cleared, + }; + } + + it("logs a periodic sample tagged with the active-conversation count", () => { + const { logger, logs } = capturingLogger(); + let active = 2; + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => SAMPLE_A, + gc: () => {}, + getActiveConversationCount: () => active, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + timers.tick("sample"); + const periodic = logs.find((l) => l.msg === "memory:periodic"); + expect(periodic).toBeDefined(); + expect(periodic?.attrs?.rssMB).toBe(100); + expect(periodic?.attrs?.activeConversations).toBe(2); + + // The count is read live (re-evaluated each tick). + active = 5; + timers.tick("sample"); + const periodic2 = logs.filter((l) => l.msg === "memory:periodic"); + expect(periodic2).toHaveLength(2); + expect(periodic2[1]?.attrs?.activeConversations).toBe(5); + + handle.stop(); + expect(timers.cleared.sample).toBe(1); + expect(timers.cleared.gc).toBe(1); + }); + + it("runs gc and logs RSS before/after on the gc interval", () => { + const { logger, logs } = capturingLogger(); + let calls = 0; + const samples = [SAMPLE_A, SAMPLE_B]; + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => samples[calls++] as MemorySample, + gc: () => {}, + getActiveConversationCount: () => 0, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + timers.tick("gc"); + const gcLog = logs.find((l) => l.msg === "memory:gc"); + expect(gcLog).toBeDefined(); + // before=SAMPLE_A (call 0), after=SAMPLE_B (call 1) — rss grew 100→300. + expect(gcLog?.attrs?.rssMB).toBe(300); + expect(gcLog?.attrs?.reclaimedRssMB).toBe(-200); + + handle.stop(); + }); + + it("actually calls the injected gc function", () => { + const { logger } = capturingLogger(); + let gcCalls = 0; + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => SAMPLE_A, + gc: () => { + gcCalls++; + }, + getActiveConversationCount: () => 0, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + timers.tick("gc"); + expect(gcCalls).toBe(1); + // Periodic ticks do NOT trigger gc. + timers.tick("sample"); + expect(gcCalls).toBe(1); + + handle.stop(); + }); + + it("stop is idempotent (clearing twice is harmless)", () => { + const { logger } = capturingLogger(); + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => SAMPLE_A, + gc: () => {}, + getActiveConversationCount: () => 0, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + handle.stop(); + handle.stop(); + // Both timers cleared exactly once each (second stop is a no-op on the + // same handles — clear count stays at 1 per handle). + expect(timers.cleared.sample).toBe(1); + expect(timers.cleared.gc).toBe(1); + }); +}); diff --git a/packages/host-bin/src/mem-telemetry.ts b/packages/host-bin/src/mem-telemetry.ts new file mode 100644 index 0000000..7f0bb42 --- /dev/null +++ b/packages/host-bin/src/mem-telemetry.ts @@ -0,0 +1,160 @@ +/** + * Periodic memory telemetry — leak-localization edge effect. + * + * Owns the timers (setInterval) that periodically log process.memoryUsage() + * so RSS growth can be correlated with active conversations/turns and the + * leaking subsystem pinpointed. This is the composition-root edge effect + * (AGENTS.md: timers are edge effects owned by host-bin, NOT the kernel). + * + * All effects are injected (sampler, GC, clock, logger, active-conversation + * count) so the timer logic is fully testable — tests pass fakes and assert + * the emitted log calls, never waiting on a real wall clock. No ambient + * state (P3): the timers are owned explicitly and returned as a `stop()` + * handle that the composition root clears on shutdown. + * + * The per-turn before/after sampling lives in session-orchestrator (it wraps + * the stream boundary); this module owns the PERIODIC baseline + GC logging. + */ + +import type { Logger } from "@dispatch/kernel"; +import { + type MemorySample, + memoryDelta, + memorySampleAttributes, +} from "@dispatch/session-orchestrator"; + +/** Default periodic sample interval: every 60s. */ +export const DEFAULT_MEMORY_SAMPLE_INTERVAL_MS = 60_000; + +/** Default GC interval: every 5 min (longer than the sample interval). */ +export const DEFAULT_GC_INTERVAL_MS = 5 * 60_000; + +/** + * Deps injected into {@link startMemoryTelemetry}. Every effect is explicit so + * the timer logic is reproducible from its inputs (no ambient state, P3) and + * testable without a real clock or process. + */ +export interface MemoryTelemetryDeps { + /** Logger (auto-scoped to host-bin by the composition root). */ + readonly logger: Logger; + /** Edge effect: capture a {@link MemorySample} now (process.memoryUsage()). */ + readonly sampleMemory: () => MemorySample; + /** + * Edge effect: run a full GC cycle. In production this is `() => + * Bun.gc(true)`; tests pass a no-op or a counting fake. Used on the longer + * GC interval to distinguish live retained objects from GC fragmentation. + */ + readonly gc: () => void; + /** + * The number of conversations currently driving a turn (from the + * session-orchestrator's activeConversations set). Tags each periodic + * sample so growth can be attributed to the streaming/turn path vs an idle + * baseline. + */ + readonly getActiveConversationCount: () => number; + /** Periodic sample interval (ms). Defaults to 60s. */ + readonly sampleIntervalMs?: number; + /** GC interval (ms). Defaults to 5 min. */ + readonly gcIntervalMs?: number; + /** + * Injected timer scheduler (defaults to global setInterval). Tests pass a + * fake to drive ticks deterministically without real wall-clock waits. The + * handle type is opaque (the same type {@link clearInterval} accepts). + */ + readonly setInterval?: (fn: () => void, ms: number) => MemoryTimerHandle; + /** Injected timer clearer (defaults to global clearInterval). */ + readonly clearInterval?: (handle: MemoryTimerHandle | undefined) => void; +} + +/** Opaque timer handle shared by {@link MemoryTelemetryDeps.setInterval} / clearInterval. */ +export type MemoryTimerHandle = ReturnType<typeof globalThis.setInterval>; + +/** Handle returned by {@link startMemoryTelemetry} to stop the timers. */ +export interface MemoryTelemetryHandle { + /** Stop both timers. Idempotent. Called by the composition root on shutdown. */ + readonly stop: () => void; +} + +/** + * Start periodic memory telemetry. Logs process.memoryUsage() every + * `sampleIntervalMs` (default 60s) tagged with the active-conversation count, + * and every `gcIntervalMs` (default 5 min) runs `gc()` and logs RSS + * before/after to distinguish live retained objects from GC fragmentation. + * + * Returns a `stop()` handle that clears both timers. The composition root + * (host-bin main) owns this handle and calls `stop()` on shutdown so the + * timers never leak across a restart. + * + * Pure decision logic: {@link buildPeriodicAttributes} / + * {@link buildGcAttributes} are exported separately for unit testing. + */ +export function startMemoryTelemetry(deps: MemoryTelemetryDeps): MemoryTelemetryHandle { + const sampleIntervalMs = deps.sampleIntervalMs ?? DEFAULT_MEMORY_SAMPLE_INTERVAL_MS; + const gcIntervalMs = deps.gcIntervalMs ?? DEFAULT_GC_INTERVAL_MS; + const setIntervalFn = deps.setInterval ?? globalThis.setInterval; + const clearIntervalFn = deps.clearInterval ?? globalThis.clearInterval; + + let gcHandle: MemoryTimerHandle | undefined; + + // Periodic sample: log rss/heap/external/arrayBuffers + active-conversation + // count every 60s. Correlates RSS growth with active turns so the leak can + // be attributed to the streaming path vs an idle baseline. + const sampleHandle: MemoryTimerHandle | undefined = setIntervalFn(() => { + const sample = deps.sampleMemory(); + const activeConversations = deps.getActiveConversationCount(); + deps.logger.info("memory:periodic", buildPeriodicAttributes(sample, activeConversations)); + }, sampleIntervalMs); + + // GC log: on a longer interval, force a full GC and log RSS before/after. + // A small/no drop after gc means the memory is LIVE (retained objects — the + // leak); a large drop means it was GC fragmentation (reclaimable). This + // distinguishes the two failure modes the crash investigation flagged. + gcHandle = setIntervalFn(() => { + const before = deps.sampleMemory(); + deps.gc(); + const after = deps.sampleMemory(); + deps.logger.info("memory:gc", buildGcAttributes(before, after)); + }, gcIntervalMs); + + let stopped = false; + return { + stop() { + if (stopped) return; // idempotent — safe to call on every shutdown path + stopped = true; + clearIntervalFn(sampleHandle); + clearIntervalFn(gcHandle); + }, + }; +} + +/** + * Pure: build the logger attributes for a periodic sample. Exported for unit + * testing (no I/O, no clock). The `activeConversations` count tags the sample + * so growth can be attributed to the streaming/turn path vs idle baseline. + */ +export function buildPeriodicAttributes( + sample: MemorySample, + activeConversations: number, +): ReturnType<typeof memorySampleAttributes> & { activeConversations: number } { + return { ...memorySampleAttributes(sample), activeConversations }; +} + +/** + * Pure: build the logger attributes for a GC sample. Exported for unit testing. + * Carries the absolute after-sample plus the `reclaimed` delta + * (`before - after`): a POSITIVE `reclaimedRssMB` means GC freed memory + * (fragmentation, reclaimable); near-zero/negative means the memory is LIVE + * (retained objects — the leak). This distinguishes the two failure modes the + * crash investigation flagged. + */ +export function buildGcAttributes( + before: MemorySample, + after: MemorySample, +): ReturnType<typeof memorySampleAttributes> { + // reclaimed = before - after (how much GC freed). memoryDelta(a, b) = b - a, + // so pass (after, before) to get before - after. + return { + ...memorySampleAttributes(after), + ...memorySampleAttributes(memoryDelta(after, before), "reclaimed"), + }; +} diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index 783d894..777feaa 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -65,6 +65,19 @@ export function activate(host: HostAPI): void { logger: host.logger, now: () => Date.now(), emit: (hook, payload) => host.emit(hook, payload), + // Injected process.memoryUsage() sampler — the production edge. Tests + // inject a fake to assert per-turn before/after telemetry. Wired in the + // shell (like `now: () => Date.now()`); pure decision logic is untouched. + sampleMemory: () => { + const m = process.memoryUsage(); + return { + rss: m.rss, + heapUsed: m.heapUsed, + heapTotal: m.heapTotal, + external: m.external, + arrayBuffers: m.arrayBuffers, + }; + }, resolveQueue: () => { // Lazily resolve the message-queue service. Returns undefined when the // extension isn't loaded (feature degrades off) — checked via the diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts index cc41fa8..3d3e816 100644 --- a/packages/session-orchestrator/src/index.ts +++ b/packages/session-orchestrator/src/index.ts @@ -39,6 +39,9 @@ export { defaultDispatchPolicy, delayFor, generateTurnId, + type MemorySample, + memoryDelta, + memorySampleAttributes, RETRY_BUDGET_MS, RETRY_SCHEDULE_MS, RETRY_TAIL_MS, diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index 076ad51..400ca0b 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -4,7 +4,10 @@ import type { AgentEvent, ChatMessage, EventHookDescriptor, + LogDeps, Logger, + LogRecord, + LogSink, ProviderContract, ProviderEvent, ProviderStreamOptions, @@ -15,7 +18,7 @@ import type { ToolContract, TurnMetrics, } from "@dispatch/kernel"; -import { runTurn } from "@dispatch/kernel"; +import { createLogger, runTurn } from "@dispatch/kernel"; import type { SystemPromptService } from "@dispatch/system-prompt"; import { describe, expect, it } from "vitest"; import { @@ -27,6 +30,7 @@ import { type TurnLifecyclePayload, type WarmCompletedPayload, } from "./orchestrator.js"; +import type { MemorySample } from "./pure.js"; import type { ToolAssembly } from "./tools-filter.js"; function createInMemoryStore(): ConversationStore & { @@ -3980,3 +3984,135 @@ describe("system prompt: compaction flow", () => { expect(capturedSystemPrompt?.startsWith("RECONSTRUCTED")).toBe(false); }); }); + +describe("per-turn memory telemetry", () => { + function capturingLogger(): { logger: Logger; records: LogRecord[] } { + let id = 0; + const deps: LogDeps = { now: () => 1000 + id++, newId: () => `id-${id++}` }; + const records: LogRecord[] = []; + const sink: LogSink = { emit: (r) => records.push(r) }; + return { logger: createLogger({ extensionId: "session-orchestrator" }, sink, deps), records }; + } + + it("logs before/after samples around the stream, tagged with conversationId + turnId", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captured, captureRunTurn } = createCapturingRunTurn(); + const { logger, records } = capturingLogger(); + + const samples: MemorySample[] = [ + { rss: 100, heapUsed: 10, heapTotal: 20, external: 1, arrayBuffers: 0 }, + { rss: 250, heapUsed: 30, heapTotal: 20, external: 1, arrayBuffers: 5 }, + ]; + let sampleIdx = 0; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: captureRunTurn, + logger, + sampleMemory: () => samples[sampleIdx++] as MemorySample, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-mem", + text: "hi", + onEvent: () => {}, + }); + + expect(captured).toHaveLength(1); + const beforeLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:before"); + const afterLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:after"); + expect(beforeLogs).toHaveLength(1); + expect(afterLogs).toHaveLength(1); + + // Both samples carry the turn's conversationId + turnId correlation. + const turnId = captured[0]?.turnId; + expect(turnId).toMatch(/^turn-/); + const before = beforeLogs[0] as Extract<LogRecord, { kind: "log" }>; + const after = afterLogs[0] as Extract<LogRecord, { kind: "log" }>; + expect(before.conversationId).toBe("conv-mem"); + expect(before.turnId).toBe(turnId); + expect(after.conversationId).toBe("conv-mem"); + expect(after.turnId).toBe(turnId); + + // Before sample carries absolute MB values. + expect(before.attributes?.rssMB).toBe(0); // 100 bytes rounds to 0 MB + // After sample carries absolute + delta (delta rss = 150 bytes → 0 MB). + expect(after.attributes?.rssMB).toBe(0); + // deltaRssMB is the rounded delta (150 bytes → 0 MB). + expect(after.attributes).toHaveProperty("deltaRssMB"); + }); + + it("emits no memory logs when sampleMemory is not injected (degrades off)", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captureRunTurn } = createCapturingRunTurn(); + const { logger, records } = capturingLogger(); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: captureRunTurn, + logger, + // sampleMemory intentionally omitted + }); + + await orchestrator.handleMessage({ + conversationId: "conv-no-mem", + text: "hi", + onEvent: () => {}, + }); + + const memLogs = records.filter( + (r) => r.kind === "log" && typeof r.msg === "string" && r.msg.startsWith("memory:turn"), + ); + expect(memLogs).toHaveLength(0); + }); + + it("getActiveConversationCount tracks in-flight turns", async () => { + const store = createInMemoryStore(); + const result: RunTurnResult = { + messages: [{ role: "assistant", chunks: [{ type: "text", text: "ok" }] }], + usage: { inputTokens: 1, outputTokens: 1 }, + finishReason: "stop", + }; + // A runTurn that blocks until the test releases it — keeps the turn + // active so getActiveConversationCount reflects an in-flight turn. + let release: () => void = () => {}; + const blocked = new Promise<void>((resolve) => { + release = resolve; + }); + const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => { + await blocked; + return result; + }; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => ({ id: "p", stream: async function* () {} }), + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: blockingRunTurn, + }); + + expect(orchestrator.getActiveConversationCount()).toBe(0); + const done = orchestrator.handleMessage({ + conversationId: "conv-active", + text: "hi", + onEvent: () => {}, + }); + // Give the detached turn a tick to register as active. + await Promise.resolve(); + await Promise.resolve(); + expect(orchestrator.getActiveConversationCount()).toBe(1); + + release(); + await done; + expect(orchestrator.getActiveConversationCount()).toBe(0); + }); +}); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 5c36922..8cc0a97 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -30,6 +30,9 @@ import { defaultDispatchPolicy, delayFor, generateTurnId, + type MemorySample, + memoryDelta, + memorySampleAttributes, resolveModelName, resolveReasoningEffort, } from "./pure.js"; @@ -331,6 +334,13 @@ export interface SessionOrchestrator { subscribe(conversationId: string, listener: TurnEventListener): () => void; isActive(conversationId: string): boolean; /** + * The number of conversations currently driving a turn (in the + * `activeConversations` set). Used by host-bin's periodic memory telemetry + * to tag each RSS sample with the active-conversation count, so growth can + * be attributed to the streaming/turn path vs an idle baseline. + */ + getActiveConversationCount(): number; + /** * Explicitly close a conversation (the user closed its tab — distinct from a * socket disconnect, which never touches the turn): aborts any in-flight turn * (the kernel finishes with `finishReason: "aborted"`, partial messages are @@ -431,6 +441,16 @@ export interface SessionOrchestratorDeps { readonly logger?: Logger; /** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */ readonly now?: () => number; + /** + * Optional process.memoryUsage() sampler, injected for testability. When + * present, the orchestrator captures a sample immediately before and after + * each turn's stream completes (`deps.runTurn`) and logs the per-turn delta + * tagged with conversationId + turnId — correlating RSS growth with the + * streaming path (the prime leak suspect). When absent (undefined), no + * per-turn memory telemetry is emitted (feature degrades off cleanly). + * Pure decision logic stays unchanged; this is additive observability. + */ + readonly sampleMemory?: () => MemorySample; /** Emit a lifecycle event hook to subscribers. Injected from host. */ readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void; } @@ -885,6 +905,18 @@ export function createSessionOrchestrator( // FE to syncTail during generation (CR-6). await deps.conversationStore.append(conversationId, [userMsg]); + // Per-turn memory telemetry: capture a sample immediately BEFORE the + // stream starts. Paired with the post-stream sample below, this + // measures the streaming path's memory footprint per turn (the prime + // leak suspect — AI-SDK streaming buffers + per-turn message arrays). + // Tagged with conversationId + turnId via the turnLogger's correlation + // context. Additive observability only — does not alter the stream. + const sampleMem = deps.sampleMemory; + const memBefore = sampleMem?.(); + if (memBefore !== undefined) { + turnLogger?.debug("memory:turn:before", memorySampleAttributes(memBefore)); + } + let stepsPersisted = false; const result = await deps.runTurn({ ...opts, @@ -898,6 +930,23 @@ export function createSessionOrchestrator( }, }); + // Per-turn memory telemetry: capture a sample immediately AFTER the + // stream completes and log the per-turn delta vs `memBefore`. A + // positive rss delta on a sealed turn flags memory retained by the + // streaming path (the leak we are localizing). No I/O beyond the + // injected sampler; pure delta computation via memoryDelta(). The + // delta attributes carry a `delta` prefix so the absolute "after" + // values and the per-turn delta coexist without key collision. + if (memBefore !== undefined) { + const memAfter = sampleMem?.(); + if (memAfter !== undefined) { + turnLogger?.info("memory:turn:after", { + ...memorySampleAttributes(memAfter), + ...memorySampleAttributes(memoryDelta(memBefore, memAfter), "delta"), + }); + } + } + // Fallback: if onStepComplete was never called (e.g., a fake // runTurn in tests), persist all result messages as a batch. if (!stepsPersisted && result.messages.length > 0) { @@ -1041,6 +1090,10 @@ export function createSessionOrchestrator( return activeTurns.has(conversationId); }, + getActiveConversationCount() { + return activeConversations.size; + }, + closeConversation(conversationId) { const turn = activeTurns.get(conversationId); const abortedTurn = turn !== undefined; diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts index 7a574f1..d4fe84a 100644 --- a/packages/session-orchestrator/src/pure.test.ts +++ b/packages/session-orchestrator/src/pure.test.ts @@ -6,6 +6,9 @@ import { defaultDispatchPolicy, delayFor, generateTurnId, + type MemorySample, + memoryDelta, + memorySampleAttributes, RETRY_BUDGET_MS, RETRY_SCHEDULE_MS, RETRY_TAIL_MS, @@ -194,3 +197,71 @@ describe("retry backoff schedule (delayFor)", () => { expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true); }); }); + +describe("memorySampleAttributes", () => { + const sample: MemorySample = { + rss: 100 * 1024 * 1024, // 100 MB + heapUsed: 40 * 1024 * 1024, + heapTotal: 60 * 1024 * 1024, + external: 5 * 1024 * 1024, + arrayBuffers: 2 * 1024 * 1024, + }; + + it("formats fields as rounded MB with no prefix", () => { + const attrs = memorySampleAttributes(sample); + expect(attrs).toEqual({ + rssMB: 100, + heapUsedMB: 40, + heapTotalMB: 60, + externalMB: 5, + arrayBuffersMB: 2, + }); + }); + + it("namespaces keys with the given prefix", () => { + const attrs = memorySampleAttributes(sample, "delta"); + expect(attrs).toEqual({ + deltaRssMB: 100, + deltaHeapUsedMB: 40, + deltaHeapTotalMB: 60, + deltaExternalMB: 5, + deltaArrayBuffersMB: 2, + }); + }); + + it("rounds fractional MB", () => { + const attrs = memorySampleAttributes({ ...sample, rss: 100.6 * 1024 * 1024 }); + expect(attrs.rssMB).toBe(101); + }); +}); + +describe("memoryDelta", () => { + const before: MemorySample = { + rss: 200 * 1024 * 1024, + heapUsed: 100 * 1024 * 1024, + heapTotal: 150 * 1024 * 1024, + external: 10 * 1024 * 1024, + arrayBuffers: 4 * 1024 * 1024, + }; + const after: MemorySample = { + rss: 350 * 1024 * 1024, + heapUsed: 120 * 1024 * 1024, + heapTotal: 150 * 1024 * 1024, + external: 10 * 1024 * 1024, + arrayBuffers: 8 * 1024 * 1024, + }; + + it("computes signed after - before per field", () => { + const delta = memoryDelta(before, after); + expect(delta.rss).toBe(150 * 1024 * 1024); + expect(delta.heapUsed).toBe(20 * 1024 * 1024); + expect(delta.heapTotal).toBe(0); + expect(delta.external).toBe(0); + expect(delta.arrayBuffers).toBe(4 * 1024 * 1024); + }); + + it("is negative when memory dropped", () => { + const delta = memoryDelta(after, before); + expect(delta.rss).toBe(-150 * 1024 * 1024); + }); +}); diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts index 0d2068f..489b140 100644 --- a/packages/session-orchestrator/src/pure.ts +++ b/packages/session-orchestrator/src/pure.ts @@ -1,4 +1,5 @@ import type { + Attributes, ChatMessage, Chunk, ImageInput, @@ -134,3 +135,71 @@ export function defaultDispatchPolicy(): ToolDispatchPolicy { export function generateTurnId(): string { return `turn-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; } + +// ── Memory telemetry (leak localization) ──────────────────────────────────── +// +// Pure helpers for process.memoryUsage() sampling. The orchestrator owns the +// sample SHAPE (this type) so its per-turn sampling and the host-bin periodic +// timer share one contract without a cross-package import of an +// implementation — host-bin imports this type, the orchestrator never imports +// host-bin. Pure: inputs → attributes/delta, no I/O, no clock. + +/** + * A snapshot of process.memoryUsage() at one instant. Mirrors the subset of + * Node/Bun's MemoryUsage we log for leak localization (rss, heapUsed, + * heapTotal, external, arrayBuffers). Owned here so the orchestrator's + * per-turn sampling and the host-bin periodic timer agree on the shape. + */ +export interface MemorySample { + readonly rss: number; + readonly heapUsed: number; + readonly heapTotal: number; + readonly external: number; + readonly arrayBuffers: number; +} + +const BYTES_PER_MB = 1024 * 1024; + +function mb(bytes: number): number { + return Math.round(bytes / BYTES_PER_MB); +} + +/** + * Pure: format a {@link MemorySample} as flat logger {@link Attributes} + * (values in MB, rounded). Flat scalars are serializable (D3) and queryable + * (D9) in the journal. No I/O. + * + * Pass a `prefix` to namespace the keys — e.g. `memorySampleAttributes(delta, + * "delta")` yields `deltaRssMB`, so an "after" log can carry both the absolute + * sample (`rssMB`) and the per-turn delta (`deltaRssMB`) without key collision. + * The first letter of each field is capitalized after the prefix for + * readability (`deltaRssMB`, not `deltarssMB`). + */ +export function memorySampleAttributes(sample: MemorySample, prefix?: string): Attributes { + const p = prefix === undefined ? "" : prefix; + const cap = (s: string): string => + s.length === 0 ? s : `${s[0]?.toUpperCase() ?? ""}${s.slice(1)}`; + const field = (name: string): string => (p.length === 0 ? name : `${p}${cap(name)}`); + return { + [field("rssMB")]: mb(sample.rss), + [field("heapUsedMB")]: mb(sample.heapUsed), + [field("heapTotalMB")]: mb(sample.heapTotal), + [field("externalMB")]: mb(sample.external), + [field("arrayBuffersMB")]: mb(sample.arrayBuffers), + }; +} + +/** + * Pure: compute the signed per-field delta `after - before`. A positive + * `rss` delta on a sealed turn flags memory retained by the streaming path + * (the prime leak suspect). No I/O. + */ +export function memoryDelta(before: MemorySample, after: MemorySample): MemorySample { + return { + rss: after.rss - before.rss, + heapUsed: after.heapUsed - before.heapUsed, + heapTotal: after.heapTotal - before.heapTotal, + external: after.external - before.external, + arrayBuffers: after.arrayBuffers - before.arrayBuffers, + }; +} |
