summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-28 08:43:29 +0900
committerAdam Malczewski <[email protected]>2026-06-28 08:43:29 +0900
commit1cd66da48f8c0a35b4208202d07bcd9f20fbc2c2 (patch)
treeae452a0ea54b8c8de493875aeb81d4402eab3819
parentb3b6eb41c5a436c95602aeaacb93ef63ec6c285f (diff)
downloaddispatch-1cd66da48f8c0a35b4208202d07bcd9f20fbc2c2.tar.gz
dispatch-1cd66da48f8c0a35b4208202d07bcd9f20fbc2c2.zip
feat(observability): periodic memory-usage logging to localize leak source
-rw-r--r--packages/host-bin/src/main.ts40
-rw-r--r--packages/host-bin/src/mem-telemetry.test.ts225
-rw-r--r--packages/host-bin/src/mem-telemetry.ts160
-rw-r--r--packages/session-orchestrator/src/extension.ts13
-rw-r--r--packages/session-orchestrator/src/index.ts3
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts138
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts53
-rw-r--r--packages/session-orchestrator/src/pure.test.ts71
-rw-r--r--packages/session-orchestrator/src/pure.ts69
9 files changed, 770 insertions, 2 deletions
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index aa114d5..f892fe8 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -27,7 +27,11 @@ import { extension as messageQueueExt } from "@dispatch/message-queue";
import { extension as providerConcurrencyExt } from "@dispatch/provider-concurrency";
import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat";
import { extension as providerUmansExt } from "@dispatch/provider-umans";
-import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator";
+import {
+ type MemorySample,
+ extension as sessionOrchestratorExt,
+ sessionOrchestratorHandle,
+} from "@dispatch/session-orchestrator";
import { extension as skillsExt } from "@dispatch/skills";
import { extension as sshExt } from "@dispatch/ssh";
import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite";
@@ -49,6 +53,7 @@ import type { ChildHandle } from "./collector-supervisor.js";
import { createCollectorSupervisor } from "./collector-supervisor.js";
import { configMapToAccess, envToConfigMap } from "./config.js";
import { loadExternalExtensions } from "./load-external.js";
+import { startMemoryTelemetry } from "./mem-telemetry.js";
function createEmptySecrets(): SecretsAccess {
return {
@@ -227,10 +232,43 @@ async function boot(): Promise<void> {
}
}
+ // Periodic memory telemetry — leak-localization edge effect (AGENTS.md:
+ // timers are edge effects owned by host-bin, the composition root, NOT the
+ // kernel). Logs process.memoryUsage() every 60s tagged with the active-
+ // conversation count, and every 5 min runs Bun.gc(true) + logs RSS
+ // before/after to distinguish live retained objects from GC fragmentation.
+ // The per-turn before/after sampling lives in session-orchestrator; this
+ // owns the PERIODIC baseline. All effects are injected (no ambient state);
+ // stop() is cleared on shutdown so timers never leak across a restart.
+ let memoryTelemetry: { stop: () => void } | undefined;
+ try {
+ const orchestrator = host.getHostAPI().getService(sessionOrchestratorHandle);
+ memoryTelemetry = startMemoryTelemetry({
+ logger: logger.child({ extensionId: "mem-telemetry" }),
+ sampleMemory: (): MemorySample => {
+ const m = process.memoryUsage();
+ return {
+ rss: m.rss,
+ heapUsed: m.heapUsed,
+ heapTotal: m.heapTotal,
+ external: m.external,
+ arrayBuffers: m.arrayBuffers,
+ };
+ },
+ gc: () => Bun.gc(true),
+ getActiveConversationCount: () => orchestrator.getActiveConversationCount(),
+ });
+ } catch (err) {
+ logger.error("Memory telemetry not started (session-orchestrator unavailable)", {
+ err,
+ });
+ }
+
let shuttingDown = false;
const shutdown = async () => {
if (shuttingDown) return;
shuttingDown = true;
+ memoryTelemetry?.stop();
logger.info("Shutting down — deactivating extensions");
await host.deactivate();
logger.info("Draining collector");
diff --git a/packages/host-bin/src/mem-telemetry.test.ts b/packages/host-bin/src/mem-telemetry.test.ts
new file mode 100644
index 0000000..20237ef
--- /dev/null
+++ b/packages/host-bin/src/mem-telemetry.test.ts
@@ -0,0 +1,225 @@
+import type { Attributes, Logger } from "@dispatch/kernel";
+import type { MemorySample } from "@dispatch/session-orchestrator";
+import { describe, expect, it } from "vitest";
+import {
+ buildGcAttributes,
+ buildPeriodicAttributes,
+ type MemoryTelemetryDeps,
+ startMemoryTelemetry,
+} from "./mem-telemetry.js";
+
+/** Minimal capturing logger — records every info/debug/warn/error call. */
+interface CapturedLog {
+ readonly level: string;
+ readonly msg: string;
+ readonly attrs?: Attributes;
+}
+
+function capturingLogger(): { logger: Logger; logs: CapturedLog[] } {
+ const logs: CapturedLog[] = [];
+ const record = (level: string) => (msg: string, attrs?: Attributes) => {
+ logs.push({ level, msg, attrs });
+ };
+ const logger: Logger = {
+ debug: record("debug"),
+ info: record("info"),
+ warn: record("warn"),
+ error: () => {},
+ child: () => logger,
+ span: () => ({
+ id: "s",
+ log: logger,
+ setAttributes: () => {},
+ addLink: () => {},
+ child: () => ({}) as never,
+ end: () => {},
+ }),
+ };
+ return { logger, logs };
+}
+
+const SAMPLE_A: MemorySample = {
+ rss: 100 * 1024 * 1024,
+ heapUsed: 40 * 1024 * 1024,
+ heapTotal: 60 * 1024 * 1024,
+ external: 5 * 1024 * 1024,
+ arrayBuffers: 2 * 1024 * 1024,
+};
+const SAMPLE_B: MemorySample = {
+ rss: 300 * 1024 * 1024,
+ heapUsed: 80 * 1024 * 1024,
+ heapTotal: 60 * 1024 * 1024,
+ external: 5 * 1024 * 1024,
+ arrayBuffers: 6 * 1024 * 1024,
+};
+
+describe("buildPeriodicAttributes", () => {
+ it("formats the sample as MB and tags the active-conversation count", () => {
+ const attrs = buildPeriodicAttributes(SAMPLE_A, 3);
+ expect(attrs).toEqual({
+ rssMB: 100,
+ heapUsedMB: 40,
+ heapTotalMB: 60,
+ externalMB: 5,
+ arrayBuffersMB: 2,
+ activeConversations: 3,
+ });
+ });
+});
+
+describe("buildGcAttributes", () => {
+ it("carries absolute after values plus reclaimed (before-after) delta", () => {
+ const attrs = buildGcAttributes(SAMPLE_A, SAMPLE_B);
+ // Absolute "after" values (SAMPLE_B).
+ expect(attrs.rssMB).toBe(300);
+ expect(attrs.heapUsedMB).toBe(80);
+ // Reclaimed delta: before - after is negative here (memory GREW), so
+ // reclaimedRssMB = round((100-300) MB) = -200.
+ expect(attrs.reclaimedRssMB).toBe(-200);
+ expect(attrs.reclaimedHeapUsedMB).toBe(-40);
+ expect(attrs.reclaimedHeapTotalMB).toBe(0);
+ expect(attrs.reclaimedArrayBuffersMB).toBe(-4);
+ });
+
+ it("shows a positive reclaimed value when GC freed memory", () => {
+ const attrs = buildGcAttributes(SAMPLE_B, SAMPLE_A);
+ expect(attrs.reclaimedRssMB).toBe(200);
+ });
+});
+
+describe("startMemoryTelemetry", () => {
+ function fakeTimers(): {
+ setInterval: MemoryTelemetryDeps["setInterval"];
+ clearInterval: MemoryTelemetryDeps["clearInterval"];
+ tick: (name: "sample" | "gc") => void;
+ cleared: { sample: number; gc: number };
+ } {
+ // Store the callbacks keyed by interval so we can drive either one. The
+ // gc interval is always larger than the sample interval, so route the
+ // larger ms to the gc callback.
+ let sampleCb: (() => void) | undefined;
+ let gcCb: (() => void) | undefined;
+ let firstMs = 0;
+ const cleared = { sample: 0, gc: 0 };
+ return {
+ setInterval: (fn, ms) => {
+ if (firstMs === 0) {
+ firstMs = ms;
+ sampleCb = fn;
+ return "sample" as never;
+ }
+ // The second timer registered is the gc one (larger interval).
+ gcCb = fn;
+ return "gc" as never;
+ },
+ clearInterval: (handle) => {
+ if (handle === "sample") cleared.sample++;
+ else if (handle === "gc") cleared.gc++;
+ },
+ tick: (name) => {
+ if (name === "sample") sampleCb?.();
+ else gcCb?.();
+ },
+ cleared,
+ };
+ }
+
+ it("logs a periodic sample tagged with the active-conversation count", () => {
+ const { logger, logs } = capturingLogger();
+ let active = 2;
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => SAMPLE_A,
+ gc: () => {},
+ getActiveConversationCount: () => active,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ timers.tick("sample");
+ const periodic = logs.find((l) => l.msg === "memory:periodic");
+ expect(periodic).toBeDefined();
+ expect(periodic?.attrs?.rssMB).toBe(100);
+ expect(periodic?.attrs?.activeConversations).toBe(2);
+
+ // The count is read live (re-evaluated each tick).
+ active = 5;
+ timers.tick("sample");
+ const periodic2 = logs.filter((l) => l.msg === "memory:periodic");
+ expect(periodic2).toHaveLength(2);
+ expect(periodic2[1]?.attrs?.activeConversations).toBe(5);
+
+ handle.stop();
+ expect(timers.cleared.sample).toBe(1);
+ expect(timers.cleared.gc).toBe(1);
+ });
+
+ it("runs gc and logs RSS before/after on the gc interval", () => {
+ const { logger, logs } = capturingLogger();
+ let calls = 0;
+ const samples = [SAMPLE_A, SAMPLE_B];
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => samples[calls++] as MemorySample,
+ gc: () => {},
+ getActiveConversationCount: () => 0,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ timers.tick("gc");
+ const gcLog = logs.find((l) => l.msg === "memory:gc");
+ expect(gcLog).toBeDefined();
+ // before=SAMPLE_A (call 0), after=SAMPLE_B (call 1) — rss grew 100→300.
+ expect(gcLog?.attrs?.rssMB).toBe(300);
+ expect(gcLog?.attrs?.reclaimedRssMB).toBe(-200);
+
+ handle.stop();
+ });
+
+ it("actually calls the injected gc function", () => {
+ const { logger } = capturingLogger();
+ let gcCalls = 0;
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => SAMPLE_A,
+ gc: () => {
+ gcCalls++;
+ },
+ getActiveConversationCount: () => 0,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ timers.tick("gc");
+ expect(gcCalls).toBe(1);
+ // Periodic ticks do NOT trigger gc.
+ timers.tick("sample");
+ expect(gcCalls).toBe(1);
+
+ handle.stop();
+ });
+
+ it("stop is idempotent (clearing twice is harmless)", () => {
+ const { logger } = capturingLogger();
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => SAMPLE_A,
+ gc: () => {},
+ getActiveConversationCount: () => 0,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ handle.stop();
+ handle.stop();
+ // Both timers cleared exactly once each (second stop is a no-op on the
+ // same handles — clear count stays at 1 per handle).
+ expect(timers.cleared.sample).toBe(1);
+ expect(timers.cleared.gc).toBe(1);
+ });
+});
diff --git a/packages/host-bin/src/mem-telemetry.ts b/packages/host-bin/src/mem-telemetry.ts
new file mode 100644
index 0000000..7f0bb42
--- /dev/null
+++ b/packages/host-bin/src/mem-telemetry.ts
@@ -0,0 +1,160 @@
+/**
+ * Periodic memory telemetry — leak-localization edge effect.
+ *
+ * Owns the timers (setInterval) that periodically log process.memoryUsage()
+ * so RSS growth can be correlated with active conversations/turns and the
+ * leaking subsystem pinpointed. This is the composition-root edge effect
+ * (AGENTS.md: timers are edge effects owned by host-bin, NOT the kernel).
+ *
+ * All effects are injected (sampler, GC, clock, logger, active-conversation
+ * count) so the timer logic is fully testable — tests pass fakes and assert
+ * the emitted log calls, never waiting on a real wall clock. No ambient
+ * state (P3): the timers are owned explicitly and returned as a `stop()`
+ * handle that the composition root clears on shutdown.
+ *
+ * The per-turn before/after sampling lives in session-orchestrator (it wraps
+ * the stream boundary); this module owns the PERIODIC baseline + GC logging.
+ */
+
+import type { Logger } from "@dispatch/kernel";
+import {
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
+} from "@dispatch/session-orchestrator";
+
+/** Default periodic sample interval: every 60s. */
+export const DEFAULT_MEMORY_SAMPLE_INTERVAL_MS = 60_000;
+
+/** Default GC interval: every 5 min (longer than the sample interval). */
+export const DEFAULT_GC_INTERVAL_MS = 5 * 60_000;
+
+/**
+ * Deps injected into {@link startMemoryTelemetry}. Every effect is explicit so
+ * the timer logic is reproducible from its inputs (no ambient state, P3) and
+ * testable without a real clock or process.
+ */
+export interface MemoryTelemetryDeps {
+ /** Logger (auto-scoped to host-bin by the composition root). */
+ readonly logger: Logger;
+ /** Edge effect: capture a {@link MemorySample} now (process.memoryUsage()). */
+ readonly sampleMemory: () => MemorySample;
+ /**
+ * Edge effect: run a full GC cycle. In production this is `() =>
+ * Bun.gc(true)`; tests pass a no-op or a counting fake. Used on the longer
+ * GC interval to distinguish live retained objects from GC fragmentation.
+ */
+ readonly gc: () => void;
+ /**
+ * The number of conversations currently driving a turn (from the
+ * session-orchestrator's activeConversations set). Tags each periodic
+ * sample so growth can be attributed to the streaming/turn path vs an idle
+ * baseline.
+ */
+ readonly getActiveConversationCount: () => number;
+ /** Periodic sample interval (ms). Defaults to 60s. */
+ readonly sampleIntervalMs?: number;
+ /** GC interval (ms). Defaults to 5 min. */
+ readonly gcIntervalMs?: number;
+ /**
+ * Injected timer scheduler (defaults to global setInterval). Tests pass a
+ * fake to drive ticks deterministically without real wall-clock waits. The
+ * handle type is opaque (the same type {@link clearInterval} accepts).
+ */
+ readonly setInterval?: (fn: () => void, ms: number) => MemoryTimerHandle;
+ /** Injected timer clearer (defaults to global clearInterval). */
+ readonly clearInterval?: (handle: MemoryTimerHandle | undefined) => void;
+}
+
+/** Opaque timer handle shared by {@link MemoryTelemetryDeps.setInterval} / clearInterval. */
+export type MemoryTimerHandle = ReturnType<typeof globalThis.setInterval>;
+
+/** Handle returned by {@link startMemoryTelemetry} to stop the timers. */
+export interface MemoryTelemetryHandle {
+ /** Stop both timers. Idempotent. Called by the composition root on shutdown. */
+ readonly stop: () => void;
+}
+
+/**
+ * Start periodic memory telemetry. Logs process.memoryUsage() every
+ * `sampleIntervalMs` (default 60s) tagged with the active-conversation count,
+ * and every `gcIntervalMs` (default 5 min) runs `gc()` and logs RSS
+ * before/after to distinguish live retained objects from GC fragmentation.
+ *
+ * Returns a `stop()` handle that clears both timers. The composition root
+ * (host-bin main) owns this handle and calls `stop()` on shutdown so the
+ * timers never leak across a restart.
+ *
+ * Pure decision logic: {@link buildPeriodicAttributes} /
+ * {@link buildGcAttributes} are exported separately for unit testing.
+ */
+export function startMemoryTelemetry(deps: MemoryTelemetryDeps): MemoryTelemetryHandle {
+ const sampleIntervalMs = deps.sampleIntervalMs ?? DEFAULT_MEMORY_SAMPLE_INTERVAL_MS;
+ const gcIntervalMs = deps.gcIntervalMs ?? DEFAULT_GC_INTERVAL_MS;
+ const setIntervalFn = deps.setInterval ?? globalThis.setInterval;
+ const clearIntervalFn = deps.clearInterval ?? globalThis.clearInterval;
+
+ let gcHandle: MemoryTimerHandle | undefined;
+
+ // Periodic sample: log rss/heap/external/arrayBuffers + active-conversation
+ // count every 60s. Correlates RSS growth with active turns so the leak can
+ // be attributed to the streaming path vs an idle baseline.
+ const sampleHandle: MemoryTimerHandle | undefined = setIntervalFn(() => {
+ const sample = deps.sampleMemory();
+ const activeConversations = deps.getActiveConversationCount();
+ deps.logger.info("memory:periodic", buildPeriodicAttributes(sample, activeConversations));
+ }, sampleIntervalMs);
+
+ // GC log: on a longer interval, force a full GC and log RSS before/after.
+ // A small/no drop after gc means the memory is LIVE (retained objects — the
+ // leak); a large drop means it was GC fragmentation (reclaimable). This
+ // distinguishes the two failure modes the crash investigation flagged.
+ gcHandle = setIntervalFn(() => {
+ const before = deps.sampleMemory();
+ deps.gc();
+ const after = deps.sampleMemory();
+ deps.logger.info("memory:gc", buildGcAttributes(before, after));
+ }, gcIntervalMs);
+
+ let stopped = false;
+ return {
+ stop() {
+ if (stopped) return; // idempotent — safe to call on every shutdown path
+ stopped = true;
+ clearIntervalFn(sampleHandle);
+ clearIntervalFn(gcHandle);
+ },
+ };
+}
+
+/**
+ * Pure: build the logger attributes for a periodic sample. Exported for unit
+ * testing (no I/O, no clock). The `activeConversations` count tags the sample
+ * so growth can be attributed to the streaming/turn path vs idle baseline.
+ */
+export function buildPeriodicAttributes(
+ sample: MemorySample,
+ activeConversations: number,
+): ReturnType<typeof memorySampleAttributes> & { activeConversations: number } {
+ return { ...memorySampleAttributes(sample), activeConversations };
+}
+
+/**
+ * Pure: build the logger attributes for a GC sample. Exported for unit testing.
+ * Carries the absolute after-sample plus the `reclaimed` delta
+ * (`before - after`): a POSITIVE `reclaimedRssMB` means GC freed memory
+ * (fragmentation, reclaimable); near-zero/negative means the memory is LIVE
+ * (retained objects — the leak). This distinguishes the two failure modes the
+ * crash investigation flagged.
+ */
+export function buildGcAttributes(
+ before: MemorySample,
+ after: MemorySample,
+): ReturnType<typeof memorySampleAttributes> {
+ // reclaimed = before - after (how much GC freed). memoryDelta(a, b) = b - a,
+ // so pass (after, before) to get before - after.
+ return {
+ ...memorySampleAttributes(after),
+ ...memorySampleAttributes(memoryDelta(after, before), "reclaimed"),
+ };
+}
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index 783d894..777feaa 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -65,6 +65,19 @@ export function activate(host: HostAPI): void {
logger: host.logger,
now: () => Date.now(),
emit: (hook, payload) => host.emit(hook, payload),
+ // Injected process.memoryUsage() sampler — the production edge. Tests
+ // inject a fake to assert per-turn before/after telemetry. Wired in the
+ // shell (like `now: () => Date.now()`); pure decision logic is untouched.
+ sampleMemory: () => {
+ const m = process.memoryUsage();
+ return {
+ rss: m.rss,
+ heapUsed: m.heapUsed,
+ heapTotal: m.heapTotal,
+ external: m.external,
+ arrayBuffers: m.arrayBuffers,
+ };
+ },
resolveQueue: () => {
// Lazily resolve the message-queue service. Returns undefined when the
// extension isn't loaded (feature degrades off) — checked via the
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index cc41fa8..3d3e816 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -39,6 +39,9 @@ export {
defaultDispatchPolicy,
delayFor,
generateTurnId,
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
RETRY_BUDGET_MS,
RETRY_SCHEDULE_MS,
RETRY_TAIL_MS,
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index 076ad51..400ca0b 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -4,7 +4,10 @@ import type {
AgentEvent,
ChatMessage,
EventHookDescriptor,
+ LogDeps,
Logger,
+ LogRecord,
+ LogSink,
ProviderContract,
ProviderEvent,
ProviderStreamOptions,
@@ -15,7 +18,7 @@ import type {
ToolContract,
TurnMetrics,
} from "@dispatch/kernel";
-import { runTurn } from "@dispatch/kernel";
+import { createLogger, runTurn } from "@dispatch/kernel";
import type { SystemPromptService } from "@dispatch/system-prompt";
import { describe, expect, it } from "vitest";
import {
@@ -27,6 +30,7 @@ import {
type TurnLifecyclePayload,
type WarmCompletedPayload,
} from "./orchestrator.js";
+import type { MemorySample } from "./pure.js";
import type { ToolAssembly } from "./tools-filter.js";
function createInMemoryStore(): ConversationStore & {
@@ -3980,3 +3984,135 @@ describe("system prompt: compaction flow", () => {
expect(capturedSystemPrompt?.startsWith("RECONSTRUCTED")).toBe(false);
});
});
+
+describe("per-turn memory telemetry", () => {
+ function capturingLogger(): { logger: Logger; records: LogRecord[] } {
+ let id = 0;
+ const deps: LogDeps = { now: () => 1000 + id++, newId: () => `id-${id++}` };
+ const records: LogRecord[] = [];
+ const sink: LogSink = { emit: (r) => records.push(r) };
+ return { logger: createLogger({ extensionId: "session-orchestrator" }, sink, deps), records };
+ }
+
+ it("logs before/after samples around the stream, tagged with conversationId + turnId", async () => {
+ const store = createInMemoryStore();
+ const provider: ProviderContract = { id: "p", stream: async function* () {} };
+ const { captured, captureRunTurn } = createCapturingRunTurn();
+ const { logger, records } = capturingLogger();
+
+ const samples: MemorySample[] = [
+ { rss: 100, heapUsed: 10, heapTotal: 20, external: 1, arrayBuffers: 0 },
+ { rss: 250, heapUsed: 30, heapTotal: 20, external: 1, arrayBuffers: 5 },
+ ];
+ let sampleIdx = 0;
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: captureRunTurn,
+ logger,
+ sampleMemory: () => samples[sampleIdx++] as MemorySample,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-mem",
+ text: "hi",
+ onEvent: () => {},
+ });
+
+ expect(captured).toHaveLength(1);
+ const beforeLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:before");
+ const afterLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:after");
+ expect(beforeLogs).toHaveLength(1);
+ expect(afterLogs).toHaveLength(1);
+
+ // Both samples carry the turn's conversationId + turnId correlation.
+ const turnId = captured[0]?.turnId;
+ expect(turnId).toMatch(/^turn-/);
+ const before = beforeLogs[0] as Extract<LogRecord, { kind: "log" }>;
+ const after = afterLogs[0] as Extract<LogRecord, { kind: "log" }>;
+ expect(before.conversationId).toBe("conv-mem");
+ expect(before.turnId).toBe(turnId);
+ expect(after.conversationId).toBe("conv-mem");
+ expect(after.turnId).toBe(turnId);
+
+ // Before sample carries absolute MB values.
+ expect(before.attributes?.rssMB).toBe(0); // 100 bytes rounds to 0 MB
+ // After sample carries absolute + delta (delta rss = 150 bytes → 0 MB).
+ expect(after.attributes?.rssMB).toBe(0);
+ // deltaRssMB is the rounded delta (150 bytes → 0 MB).
+ expect(after.attributes).toHaveProperty("deltaRssMB");
+ });
+
+ it("emits no memory logs when sampleMemory is not injected (degrades off)", async () => {
+ const store = createInMemoryStore();
+ const provider: ProviderContract = { id: "p", stream: async function* () {} };
+ const { captureRunTurn } = createCapturingRunTurn();
+ const { logger, records } = capturingLogger();
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: captureRunTurn,
+ logger,
+ // sampleMemory intentionally omitted
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-no-mem",
+ text: "hi",
+ onEvent: () => {},
+ });
+
+ const memLogs = records.filter(
+ (r) => r.kind === "log" && typeof r.msg === "string" && r.msg.startsWith("memory:turn"),
+ );
+ expect(memLogs).toHaveLength(0);
+ });
+
+ it("getActiveConversationCount tracks in-flight turns", async () => {
+ const store = createInMemoryStore();
+ const result: RunTurnResult = {
+ messages: [{ role: "assistant", chunks: [{ type: "text", text: "ok" }] }],
+ usage: { inputTokens: 1, outputTokens: 1 },
+ finishReason: "stop",
+ };
+ // A runTurn that blocks until the test releases it — keeps the turn
+ // active so getActiveConversationCount reflects an in-flight turn.
+ let release: () => void = () => {};
+ const blocked = new Promise<void>((resolve) => {
+ release = resolve;
+ });
+ const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => {
+ await blocked;
+ return result;
+ };
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => ({ id: "p", stream: async function* () {} }),
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: blockingRunTurn,
+ });
+
+ expect(orchestrator.getActiveConversationCount()).toBe(0);
+ const done = orchestrator.handleMessage({
+ conversationId: "conv-active",
+ text: "hi",
+ onEvent: () => {},
+ });
+ // Give the detached turn a tick to register as active.
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(orchestrator.getActiveConversationCount()).toBe(1);
+
+ release();
+ await done;
+ expect(orchestrator.getActiveConversationCount()).toBe(0);
+ });
+});
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 5c36922..8cc0a97 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -30,6 +30,9 @@ import {
defaultDispatchPolicy,
delayFor,
generateTurnId,
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
resolveModelName,
resolveReasoningEffort,
} from "./pure.js";
@@ -331,6 +334,13 @@ export interface SessionOrchestrator {
subscribe(conversationId: string, listener: TurnEventListener): () => void;
isActive(conversationId: string): boolean;
/**
+ * The number of conversations currently driving a turn (in the
+ * `activeConversations` set). Used by host-bin's periodic memory telemetry
+ * to tag each RSS sample with the active-conversation count, so growth can
+ * be attributed to the streaming/turn path vs an idle baseline.
+ */
+ getActiveConversationCount(): number;
+ /**
* Explicitly close a conversation (the user closed its tab — distinct from a
* socket disconnect, which never touches the turn): aborts any in-flight turn
* (the kernel finishes with `finishReason: "aborted"`, partial messages are
@@ -431,6 +441,16 @@ export interface SessionOrchestratorDeps {
readonly logger?: Logger;
/** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */
readonly now?: () => number;
+ /**
+ * Optional process.memoryUsage() sampler, injected for testability. When
+ * present, the orchestrator captures a sample immediately before and after
+ * each turn's stream completes (`deps.runTurn`) and logs the per-turn delta
+ * tagged with conversationId + turnId — correlating RSS growth with the
+ * streaming path (the prime leak suspect). When absent (undefined), no
+ * per-turn memory telemetry is emitted (feature degrades off cleanly).
+ * Pure decision logic stays unchanged; this is additive observability.
+ */
+ readonly sampleMemory?: () => MemorySample;
/** Emit a lifecycle event hook to subscribers. Injected from host. */
readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void;
}
@@ -885,6 +905,18 @@ export function createSessionOrchestrator(
// FE to syncTail during generation (CR-6).
await deps.conversationStore.append(conversationId, [userMsg]);
+ // Per-turn memory telemetry: capture a sample immediately BEFORE the
+ // stream starts. Paired with the post-stream sample below, this
+ // measures the streaming path's memory footprint per turn (the prime
+ // leak suspect — AI-SDK streaming buffers + per-turn message arrays).
+ // Tagged with conversationId + turnId via the turnLogger's correlation
+ // context. Additive observability only — does not alter the stream.
+ const sampleMem = deps.sampleMemory;
+ const memBefore = sampleMem?.();
+ if (memBefore !== undefined) {
+ turnLogger?.debug("memory:turn:before", memorySampleAttributes(memBefore));
+ }
+
let stepsPersisted = false;
const result = await deps.runTurn({
...opts,
@@ -898,6 +930,23 @@ export function createSessionOrchestrator(
},
});
+ // Per-turn memory telemetry: capture a sample immediately AFTER the
+ // stream completes and log the per-turn delta vs `memBefore`. A
+ // positive rss delta on a sealed turn flags memory retained by the
+ // streaming path (the leak we are localizing). No I/O beyond the
+ // injected sampler; pure delta computation via memoryDelta(). The
+ // delta attributes carry a `delta` prefix so the absolute "after"
+ // values and the per-turn delta coexist without key collision.
+ if (memBefore !== undefined) {
+ const memAfter = sampleMem?.();
+ if (memAfter !== undefined) {
+ turnLogger?.info("memory:turn:after", {
+ ...memorySampleAttributes(memAfter),
+ ...memorySampleAttributes(memoryDelta(memBefore, memAfter), "delta"),
+ });
+ }
+ }
+
// Fallback: if onStepComplete was never called (e.g., a fake
// runTurn in tests), persist all result messages as a batch.
if (!stepsPersisted && result.messages.length > 0) {
@@ -1041,6 +1090,10 @@ export function createSessionOrchestrator(
return activeTurns.has(conversationId);
},
+ getActiveConversationCount() {
+ return activeConversations.size;
+ },
+
closeConversation(conversationId) {
const turn = activeTurns.get(conversationId);
const abortedTurn = turn !== undefined;
diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts
index 7a574f1..d4fe84a 100644
--- a/packages/session-orchestrator/src/pure.test.ts
+++ b/packages/session-orchestrator/src/pure.test.ts
@@ -6,6 +6,9 @@ import {
defaultDispatchPolicy,
delayFor,
generateTurnId,
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
RETRY_BUDGET_MS,
RETRY_SCHEDULE_MS,
RETRY_TAIL_MS,
@@ -194,3 +197,71 @@ describe("retry backoff schedule (delayFor)", () => {
expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true);
});
});
+
+describe("memorySampleAttributes", () => {
+ const sample: MemorySample = {
+ rss: 100 * 1024 * 1024, // 100 MB
+ heapUsed: 40 * 1024 * 1024,
+ heapTotal: 60 * 1024 * 1024,
+ external: 5 * 1024 * 1024,
+ arrayBuffers: 2 * 1024 * 1024,
+ };
+
+ it("formats fields as rounded MB with no prefix", () => {
+ const attrs = memorySampleAttributes(sample);
+ expect(attrs).toEqual({
+ rssMB: 100,
+ heapUsedMB: 40,
+ heapTotalMB: 60,
+ externalMB: 5,
+ arrayBuffersMB: 2,
+ });
+ });
+
+ it("namespaces keys with the given prefix", () => {
+ const attrs = memorySampleAttributes(sample, "delta");
+ expect(attrs).toEqual({
+ deltaRssMB: 100,
+ deltaHeapUsedMB: 40,
+ deltaHeapTotalMB: 60,
+ deltaExternalMB: 5,
+ deltaArrayBuffersMB: 2,
+ });
+ });
+
+ it("rounds fractional MB", () => {
+ const attrs = memorySampleAttributes({ ...sample, rss: 100.6 * 1024 * 1024 });
+ expect(attrs.rssMB).toBe(101);
+ });
+});
+
+describe("memoryDelta", () => {
+ const before: MemorySample = {
+ rss: 200 * 1024 * 1024,
+ heapUsed: 100 * 1024 * 1024,
+ heapTotal: 150 * 1024 * 1024,
+ external: 10 * 1024 * 1024,
+ arrayBuffers: 4 * 1024 * 1024,
+ };
+ const after: MemorySample = {
+ rss: 350 * 1024 * 1024,
+ heapUsed: 120 * 1024 * 1024,
+ heapTotal: 150 * 1024 * 1024,
+ external: 10 * 1024 * 1024,
+ arrayBuffers: 8 * 1024 * 1024,
+ };
+
+ it("computes signed after - before per field", () => {
+ const delta = memoryDelta(before, after);
+ expect(delta.rss).toBe(150 * 1024 * 1024);
+ expect(delta.heapUsed).toBe(20 * 1024 * 1024);
+ expect(delta.heapTotal).toBe(0);
+ expect(delta.external).toBe(0);
+ expect(delta.arrayBuffers).toBe(4 * 1024 * 1024);
+ });
+
+ it("is negative when memory dropped", () => {
+ const delta = memoryDelta(after, before);
+ expect(delta.rss).toBe(-150 * 1024 * 1024);
+ });
+});
diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts
index 0d2068f..489b140 100644
--- a/packages/session-orchestrator/src/pure.ts
+++ b/packages/session-orchestrator/src/pure.ts
@@ -1,4 +1,5 @@
import type {
+ Attributes,
ChatMessage,
Chunk,
ImageInput,
@@ -134,3 +135,71 @@ export function defaultDispatchPolicy(): ToolDispatchPolicy {
export function generateTurnId(): string {
return `turn-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
+
+// ── Memory telemetry (leak localization) ────────────────────────────────────
+//
+// Pure helpers for process.memoryUsage() sampling. The orchestrator owns the
+// sample SHAPE (this type) so its per-turn sampling and the host-bin periodic
+// timer share one contract without a cross-package import of an
+// implementation — host-bin imports this type, the orchestrator never imports
+// host-bin. Pure: inputs → attributes/delta, no I/O, no clock.
+
+/**
+ * A snapshot of process.memoryUsage() at one instant. Mirrors the subset of
+ * Node/Bun's MemoryUsage we log for leak localization (rss, heapUsed,
+ * heapTotal, external, arrayBuffers). Owned here so the orchestrator's
+ * per-turn sampling and the host-bin periodic timer agree on the shape.
+ */
+export interface MemorySample {
+ readonly rss: number;
+ readonly heapUsed: number;
+ readonly heapTotal: number;
+ readonly external: number;
+ readonly arrayBuffers: number;
+}
+
+const BYTES_PER_MB = 1024 * 1024;
+
+function mb(bytes: number): number {
+ return Math.round(bytes / BYTES_PER_MB);
+}
+
+/**
+ * Pure: format a {@link MemorySample} as flat logger {@link Attributes}
+ * (values in MB, rounded). Flat scalars are serializable (D3) and queryable
+ * (D9) in the journal. No I/O.
+ *
+ * Pass a `prefix` to namespace the keys — e.g. `memorySampleAttributes(delta,
+ * "delta")` yields `deltaRssMB`, so an "after" log can carry both the absolute
+ * sample (`rssMB`) and the per-turn delta (`deltaRssMB`) without key collision.
+ * The first letter of each field is capitalized after the prefix for
+ * readability (`deltaRssMB`, not `deltarssMB`).
+ */
+export function memorySampleAttributes(sample: MemorySample, prefix?: string): Attributes {
+ const p = prefix === undefined ? "" : prefix;
+ const cap = (s: string): string =>
+ s.length === 0 ? s : `${s[0]?.toUpperCase() ?? ""}${s.slice(1)}`;
+ const field = (name: string): string => (p.length === 0 ? name : `${p}${cap(name)}`);
+ return {
+ [field("rssMB")]: mb(sample.rss),
+ [field("heapUsedMB")]: mb(sample.heapUsed),
+ [field("heapTotalMB")]: mb(sample.heapTotal),
+ [field("externalMB")]: mb(sample.external),
+ [field("arrayBuffersMB")]: mb(sample.arrayBuffers),
+ };
+}
+
+/**
+ * Pure: compute the signed per-field delta `after - before`. A positive
+ * `rss` delta on a sealed turn flags memory retained by the streaming path
+ * (the prime leak suspect). No I/O.
+ */
+export function memoryDelta(before: MemorySample, after: MemorySample): MemorySample {
+ return {
+ rss: after.rss - before.rss,
+ heapUsed: after.heapUsed - before.heapUsed,
+ heapTotal: after.heapTotal - before.heapTotal,
+ external: after.external - before.external,
+ arrayBuffers: after.arrayBuffers - before.arrayBuffers,
+ };
+}