summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-28 14:24:58 +0900
committerAdam Malczewski <[email protected]>2026-06-28 14:24:58 +0900
commit841e776635d2a93371f302f0617e729626a69fe5 (patch)
treec416925f6730a35f2560451ca7c6edaf41e8b1e4
parent71c635f7d8ee01a2b23d5ddfdfc4bff043980052 (diff)
parent414080e271ea44df0a7affc154b62e39b51a11a0 (diff)
downloaddispatch-841e776635d2a93371f302f0617e729626a69fe5.tar.gz
dispatch-841e776635d2a93371f302f0617e729626a69fe5.zip
Merge branch 'dev' into feature/workspace-star
-rwxr-xr-xbin/apply-memory-limits.sh45
-rwxr-xr-xbin/install2
-rw-r--r--crash-review-report.md86
-rw-r--r--notes/crash-investigation-findings.md256
-rw-r--r--notes/memory-leak-investigation-handoff.md331
-rw-r--r--packages/host-bin/src/main.ts81
-rw-r--r--packages/host-bin/src/mem-telemetry.test.ts225
-rw-r--r--packages/host-bin/src/mem-telemetry.ts160
-rw-r--r--packages/session-orchestrator/src/extension.ts13
-rw-r--r--packages/session-orchestrator/src/index.ts3
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts138
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts53
-rw-r--r--packages/session-orchestrator/src/pure.test.ts71
-rw-r--r--packages/session-orchestrator/src/pure.ts69
-rw-r--r--packages/ssh/src/pool.test.ts176
-rw-r--r--packages/ssh/src/pool.ts18
-rw-r--r--packages/transport-http/src/extension.ts13
-rw-r--r--systemd/dispatch.service6
18 files changed, 1739 insertions, 7 deletions
diff --git a/bin/apply-memory-limits.sh b/bin/apply-memory-limits.sh
new file mode 100755
index 0000000..f5c9cf0
--- /dev/null
+++ b/bin/apply-memory-limits.sh
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+# apply-memory-limits.sh — apply the MemoryMax/MemoryHigh cgroup limits to the
+# LIVE dispatch.service WITHOUT a full reinstall. Safe to run while the server
+# is up (the limits take effect on the next restart).
+#
+# What it does (all privileged lines use sudo):
+# 1. Copies the updated dispatch.service template to /etc/systemd/system/ (sudo)
+# 2. Reloads systemd so it picks up the new unit file (sudo)
+# 3. Restarts dispatch so the cgroup limits are applied (sudo)
+#
+# Why sudo: /etc/systemd/system/ is root-owned; daemon-reload and restart
+# require root. The script carries its own sudo — run it directly (no sudo prefix).
+#
+# Run: ./bin/apply-memory-limits.sh
+
+set -euo pipefail
+
+HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+ROOT="$(cd "$HERE/.." && pwd)"
+
+SERVICE_SRC="$ROOT/systemd/dispatch.service"
+SERVICE_DST="/etc/systemd/system/dispatch.service"
+
+if [ ! -f "$SERVICE_SRC" ]; then
+ echo "apply-memory-limits: template not found at $SERVICE_SRC" >&2
+ exit 1
+fi
+
+echo "[apply] copying updated dispatch.service → $SERVICE_DST"
+# Patch in the real user (same as bin/install does)
+REAL_USER="${SUDO_USER:-$USER}"
+REAL_GROUP=$(id -gn "$REAL_USER" 2>/dev/null || echo "$REAL_USER")
+sed "s/^# User\/Group are set by bin/install.*/User=$REAL_USER\nGroup=$REAL_GROUP/" "$SERVICE_SRC" | sudo tee "$SERVICE_DST" > /dev/null
+sudo chmod 644 "$SERVICE_DST"
+
+echo "[apply] daemon-reload…"
+sudo systemctl daemon-reload
+
+echo "[apply] restarting dispatch (cgroup limits take effect)…"
+sudo systemctl restart dispatch
+
+echo "[apply] done!"
+echo " Status: systemctl status dispatch"
+echo " Memory: systemctl show dispatch -p MemoryHigh -p MemoryMax"
+echo " Logs: journalctl -u dispatch -f"
diff --git a/bin/install b/bin/install
index 4899939..3e2a62f 100755
--- a/bin/install
+++ b/bin/install
@@ -104,6 +104,8 @@ EnvironmentFile=/etc/dispatch/env
WorkingDirectory=/var/lib/dispatch
Restart=on-failure
RestartSec=5
+MemoryHigh=20G
+MemoryMax=24G
StandardOutput=journal
StandardError=journal
diff --git a/crash-review-report.md b/crash-review-report.md
new file mode 100644
index 0000000..272abdb
--- /dev/null
+++ b/crash-review-report.md
@@ -0,0 +1,86 @@
+# Production Crash Investigation — Independent Review
+
+## Executive Summary
+
+The production Dispatch server is experiencing two distinct failure modes under load:
+1. **Exit-code 1 Crashes**: Driven by an unhandled `EventEmitter` `'error'` event from the `ssh2` connection pool, **not** the AI-SDK as previously suspected.
+2. **Bun Runtime Segfaults**: Triggered by massive memory pressure from unbounded conversation history serialization during long multi-step agent turns, confirming the "leak" is actually a massive live working set, not a persistent memory leak.
+
+Additionally, a suspected latent crash path in the cache-warming probe has been confirmed as an Unhandled Promise Rejection.
+
+---
+
+## 1. The Exit-1 Crash ("Timed out while waiting for handshake")
+
+### Finding: Confirmed Dispatch Bug in SSH Pool (Incorrect Preliminary Finding)
+The preliminary analysis hypothesized that the `error: Timed out while waiting for handshake` crash was caused by an unhandled `'error'` on an outbound TLS socket to the AI provider. **This is incorrect.**
+
+The crash actually originates from the `ssh2` package managing outbound remote computer connections, specifically within `packages/ssh/src/pool.ts`.
+
+### Technical Analysis
+- **The Evidence**: The exact string `'Timed out while waiting for handshake'` is hardcoded in `ssh2/lib/client.js` when the SSH handshake times out or keepalives fail during a re-keying phase.
+- **The Code Path**: In `packages/ssh/src/pool.ts`, the `doConnect` function attaches an `onError` listener to the `ssh2.Client` instance to catch connection failures:
+ ```typescript
+ client.on("error", onError);
+ ```
+ However, when the connection succeeds (`onReady` fires), the `cleanup()` function is called, which **removes the error listener**:
+ ```typescript
+ function cleanup(): void {
+ clearTimeout(timer);
+ client.removeListener("ready", onReady);
+ client.removeListener("error", onError); // <-- Listener removed here
+ }
+ ```
+- **The Crash Mechanism**: After `doConnect` succeeds, the client is placed in the pool and returned to callers. If the SSH connection drops later or a timeout occurs, the `ssh2.Client` emits an `'error'` event. Because there are no longer any listeners attached for `'error'`, Node.js's `EventEmitter` escalates it to an uncaught exception, instantly crashing the process with exit code 1.
+
+### Recommendation
+**Dispatch Code Change**: Add a persistent `.on("error", ...)` handler to the `client` in `buildConnection` (or refrain from removing it) to gracefully catch post-connection drops, tear down the connection, and transition `state.value = "error"`.
+
+---
+
+## 2. The Bun Native Segfaults (The 6.2 GB "Leak")
+
+### Finding: Massive Live Working Set, Not a Persistent Leak
+The preliminary investigation suspected a 2.5 GB/hour slow leak. The telemetry data confirms that the memory is **not permanently leaked**—when `activeConversations` drops to `0`, the RSS cleanly drops back down to the ~84 MB baseline. The crash is caused by unbounded live working set growth during concurrent agent turns, which fragments and overwhelms Bun's allocator.
+
+### Technical Analysis
+- **The Code Path**: `MAX_STEPS` in `packages/kernel/src/runtime/run-turn.ts` is set to `0` (unlimited). A single turn can run for hundreds of steps.
+- **The Mechanism**: In `executeStep`, every step appends new tool calls and results to the `messages` array. This array is then passed to `provider.stream()`.
+- Inside `packages/openai-stream/src/stream.ts`, the entire unbounded array is serialized into a single contiguous string every step:
+ ```typescript
+ const bodyString = JSON.stringify(body);
+ ```
+- **The Crash**: If 4 concurrent conversations (`activeConversations = 4`) run for hundreds of steps, the `messages` arrays grow to hundreds of megabytes each. Serializing these arrays copies them into massive contiguous strings on the V8 heap on *every step*. This causes gigabytes of memory allocation churn, memory pressure spikes (peaking at 6.2 GB), and eventually triggers a native `SIGSEGV`/`SIGILL` in Bun's allocator.
+
+### Recommendation
+**Dispatch Code Change**:
+1. Reintroduce a sane `MAX_STEPS` limit (e.g., `50` or `100`) to bound the maximum length of a single turn.
+2. Implement a sliding window or context-truncation strategy for `messages` before serializing to prevent the payload from growing infinitely.
+3. **Operational Mitigation**: Apply the `MemoryMax` cgroup circuit breaker to turn the segfault into a controlled recycle while the codebase fix is developed.
+
+---
+
+## 3. The Cache-Warming Latent Crash
+
+### Finding: Confirmed Latent Unhandled Promise Rejection
+The preliminary finding suspected a latent crash path due to a missing `try/catch` in the cache-warming probe. This is confirmed.
+
+### Technical Analysis
+- **The Code Path**: In `packages/session-orchestrator/src/orchestrator.ts`, the `createWarmService`'s `warm` function consumes the provider stream:
+ ```typescript
+ for await (const event of provider.stream(messages, assembled.tools, providerOpts)) {
+ ```
+- **The Mechanism**: If the AI provider connection fails or aborts, `provider.stream` throws. Because there is no `try/catch` around this loop, the async `warm` function rejects its returned promise.
+- In `packages/cache-warming/src/warmer.ts`, the timer fires `void fireWarm(conversationId, token);`. Since the returned promise is not `await`ed or `.catch()`'d at the top level, it results in an **Unhandled Promise Rejection**.
+
+### Recommendation
+**Dispatch Code Change**: Add a `try/catch` block around the `for await` loop in `createWarmService` (or add `.catch()` to `fireWarm`) to gracefully emit an error result instead of throwing an unhandled rejection.
+
+---
+
+## Assessment of Preliminary Findings
+
+- ❌ **"Unhandled TLS Socket to AI Provider"**: Incorrect. The exit-1 crash was a race condition in error listener attachment for outbound SSH connections (`ssh2`), not the AI SDK's TLS socket.
+- ✅ **"MAX_STEPS = 0 ... structural enabler for large working sets"**: Correct. Unbounded history serialization caused the massive gigabyte allocations that crashed Bun.
+- ✅ **"Cache-warming missing try/catch"**: Correct. It is an unhandled promise rejection waiting to happen.
+- ✅ **"Not an LSP leak"**: Confirmed. The memory growth is strictly tied to `activeConversations` and the unbounded turn array serialization.
diff --git a/notes/crash-investigation-findings.md b/notes/crash-investigation-findings.md
new file mode 100644
index 0000000..485e02d
--- /dev/null
+++ b/notes/crash-investigation-findings.md
@@ -0,0 +1,256 @@
+# Production Crash Investigation — Findings
+
+> **Definitive post-investigation record.** Supersedes `notes/memory-leak-investigation-handoff.md`
+> (which contained three material errors — see §3). Authored by opencode (umans-glm-5.2) on
+> 2026-06-28, incorporating an independent Gemini review (`crash-review-report.md`) whose pivotal
+> finding (the SSH pool) overturned an earlier wrong conclusion.
+
+**Last updated:** 2026-06-28
+**Status:** Root cause of the **live** (1.3.14) crash is confirmed and fixable in Dispatch code.
+The native segfault is **not** root-caused and may already be fixed by the 1.3.14 upgrade —
+needs observation under real load.
+
+---
+
+## 0. TL;DR
+
+- The production server was crash-looping (~20 restarts on Jun 28). **Two distinct failure modes.**
+- **Live crash (Bun 1.3.14):** `exit-1 "Timed out while waiting for handshake"`. Root cause is a
+ **Dispatch bug** in `packages/ssh/src/pool.ts`: the pooled `ssh2.Client` has **no permanent
+ `'error'` listener** (it's removed after connect by `cleanup()`, pool.ts:266), so a post-connect
+ ssh2 error emits `'error'` with no listener → uncaught → **no `process.on("uncaughtException")`
+ guard in main.ts** → process exit-1. Fixable in Dispatch code; no runtime change needed.
+- **Segfaults:** all on **Bun v1.3.13** (`Bun v1.3.13 (bf2e2cec)` printed at each). **Zero
+ segfaults observed on 1.3.14** so far. Not root-caused; may be fixed by the upgrade.
+- The prior "~2.5 GB/hour memory leak" framing is **wrong** (see §3). Idle memory is flat at 84 MB.
+- **Recovery on restart** is repair-and-seal, not resume: conversations are swept `active→idle`
+ and partial turns reconciled to a consistent state, but in-flight turns are sealed (user re-sends).
+
+---
+
+## 1. The live crash — SSH pool missing error listener (CONFIRMED, Dispatch bug)
+
+### Crash signature (the only crash on the current 1.3.14 binary)
+```
+error: Timed out while waiting for handshake
+ at emitError (node:events:51:13)
+ at <anonymous> (/$bunfs/root/dispatch-server:16:75617)
+Bun v1.3.14 (Linux x64)
+→ Main process exited, code=exited, status=1/FAILURE
+```
+- 09:22:04 JST crash; last telemetry sample (09:21:52) showed `rss=116MB, activeConversations=4`.
+- systemd reported 2.7 GB peak, but the crash was **exit-1 (self-exit), not OOM/SIGKILL** — memory
+ was a symptom, not the kill trigger.
+
+### Root cause chain
+1. The error string `"Timed out while waiting for handshake"` is **ssh2's**, not Bun's TLS. It is
+ hardcoded in `ssh2/lib/client.js:1114`:
+ ```js
+ this._readyTimeout = setTimeout(() => {
+ const err = new Error('Timed out while waiting for handshake');
+ this.emit('error', err); // ← ssh2 emits 'error' on the ssh2.Client
+ sock.destroy();
+ }, this.config.readyTimeout);
+ ```
+ (Bun's own TLS string is the different `"TLS handshake timeout"` — confirmed via `strings` on
+ the binary.)
+2. In `packages/ssh/src/pool.ts`, `doConnect` attaches `client.on("error", onError)` **only during
+ the connect promise** and removes it in `cleanup()` (pool.ts:266) on success. `buildConnection`
+ (pool.ts:101-175) returns a long-lived `SshConnection` (keepalive 30s, idle-reap 15m) and
+ **never attaches a permanent `'error'` listener** to the pooled client.
+3. So after a successful connect, the `ssh2.Client` sits in the pool with no `'error'` listener. A
+ later ssh2 error (re-handshake/re-key/keepalive timeout emitting the string above) →
+ `emit('error')` → `emitError (node:events)` with no listener → EventEmitter default throws →
+ uncaught.
+4. `packages/host-bin/src/main.ts` has **only `SIGINT`/`SIGTERM` handlers** (main.ts:280-281) —
+ **no `process.on("uncaughtException")` / `process.on("unhandledRejection")`**. So the uncaught
+ error exits the process (exit-1), killing every in-flight turn — not just the one using SSH.
+
+### Why capping concurrency is NOT the fix
+A single pooled SSH connection dropping crashes the server regardless of how many turns are
+concurrent. Capping `activeConversations` would only reduce frequency, not fix the root cause —
+and it cripples a product built around concurrent agents. (The existing provider-concurrency
+limiter, capped at 3-4, already bounds concurrent *provider streams*; SSH connections are
+tool-execution connections and bypass it.)
+
+### The fix (root cause)
+1. **`packages/ssh/src/pool.ts` — `buildConnection`:** attach a permanent `'error'` listener to
+ the `ssh2.Client` that tears down the connection, sets `state.value = "error"` /
+ `state.error = <msg>`, and **logs** (computer alias, error message, connection state) so the
+ failure is observable. Keep the existing connect-time `onError` for the connect promise.
+2. **`packages/host-bin/src/main.ts` — boot:** add `process.on("uncaughtException")` and
+ `process.on("unhandledRejection")` handlers that **log rich detail** (message, stack,
+ `activeConversations` count, `process.memoryUsage()` snapshot, timestamp) so we know *where*
+ they happen, then either survive (for non-fatal) or seal in-flight work + exit gracefully. This
+ is the defense-in-depth that ensures any *future* unhandled error degrades instead of taking
+ down the whole server and every in-flight turn.
+
+---
+
+## 2. The segfaults — NOT root-caused; 1.3.13 only (may be fixed on 1.3.14)
+
+### Crash signature
+```
+panic(main thread): Segmentation fault at address 0x0
+oh no: Bun has crashed. This indicates a bug in Bun, not your code.
+→ Main process exited, code=dumped, status=4/ILL
+```
+- **Every segfault was on Bun v1.3.13** (the log prints `Bun v1.3.13 (bf2e2cec)` at each:
+ 00:12, 00:45, 00:57, 08:50, 09:05).
+- Memory peaks ranged widely: **370 MB** (09:05, 1-min uptime) up to 6.2 GB (00:12, 2.5h uptime).
+- The 1.3.14 binary first ran at 09:06 JST. **No segfault has been observed on 1.3.14** — the only
+ 1.3.14 crash is the SSH exit-1 above. The current process has been stable idle.
+
+### What this means
+- The 370 MB segfault is **not memory exhaustion** (24 GB cgroup) — it's a native concurrency/race
+ bug in Bun's runtime. No amount of memory bounding prevents a native race.
+- The 1.3.14 upgrade **may** have fixed it; the sample so far is small and mostly idle, so this is
+ **not confirmed**.
+- **A native segfault cannot be caught by `process.on("uncaughtException")`** (that catches JS
+ exceptions, not SIGILL/SIGSEGV). So if the segfault recurs on 1.3.14, the only complete fixes
+ are: leave Bun (port to Node) or root-cause the native trigger (core dump → backtrace → repro →
+ upstream Bun report).
+
+### Plan for the segfault
+1. Enable core-dump capture (`ulimit -c unlimited` / systemd `COREdump`) so the next segfault
+ produces a backtrace rather than a bare `address 0x0`.
+2. Run 1.3.14 under the real orchestrator workload (concurrent backend+frontend+subagent turns).
+3. If no segfault recurs → likely already fixed; close the issue.
+4. If one recurs → root-cause via the backtrace; do NOT paper over it with concurrency caps.
+
+---
+
+## 3. What was WRONG in the prior handoff (corrections)
+
+`notes/memory-leak-investigation-handoff.md` was a careful document but contained three material
+errors that this investigation corrected:
+
+1. **"Telemetry is in journald — `journalctl | grep memory:periodic`."** Wrong. The logger writes
+ to a **journal sink file** (`DISPATCH_JOURNAL=/var/log/dispatch/app.ndjson`, main.ts:139),
+ not stderr/journald. The handoff's grep found nothing *by design* — the data was in the file
+ all along. (Also: the observability-collector that would drain this file into `traces.db` is
+ **disabled in compiled binaries** — main.ts:150 guards on a source path that doesn't exist in
+ prod — so the journal just accumulates and rotates; only `app.ndjson` + `.1` are retained.)
+2. **"The segfault persists on Bun 1.3.14" (handoff hypothesis #3 disproven).** Wrong. All
+ segfaults were on **1.3.13**; no segfault has been seen on 1.3.14. The 1.3.14 upgrade may have
+ fixed it — the handoff's hypothesis #3 is *not* disproven after all.
+3. **"~2.5 GB/hour slow memory leak."** Wrong framing. Telemetry shows **idle is flat at 84 MB**
+ (20+ min, zero reclaimable, zero growth) — there is no background/timer/closure leak. The
+ "leak" was the live working set of concurrent multi-step turns, which is reclaimed when turns
+ end. Moreover, raw context size is never GB-scale: 300k tokens ≈ 1-2 MB, so the gigabytes
+ under load must come from pathological tool payloads, retention, or native bugs — not
+ "unbounded context." (See §5.)
+
+---
+
+## 4. What was ruled out
+
+- **Warm-cache probe** — confirmed (Gemini + my read) as a *latent* unhandled-rejection path
+ (`createWarmService`'s `warm` has no try/catch around its `for await (provider.stream(...))` at
+ orchestrator.ts:1276; `fireWarm` drops the promise with `void` at warmer.ts:130). **But it has
+ never fired in production** — zero `cache-warming` activity in the journal, zero enabled
+ conversations in the `kv` table (53k rows, all conversation settings; warming is opt-in default
+ OFF). It is a latent risk worth fixing (the `unhandledRejection` guard from §1 catches its
+ rejection too), but it is **not** the cause of any observed crash.
+- **LSP** — exonerated (per handoff §7; caches bounded; disabled as a precaution, unrelated to
+ crashes).
+- **Idle/background leak** — refuted by telemetry (idle flat at 84 MB).
+- **`activeConversations` cap as a fix** — rejected. It's a workaround that cripples the
+ concurrent-agent product; the existing provider-concurrency limiter (3-4) already bounds
+ provider streams. The root cause is the missing error listener + missing guard.
+
+---
+
+## 5. Memory: what actually drives GB-scale (and what doesn't)
+
+The user's key insight: **300k tokens ≈ 1-2 MB** — raw context size is never GB-scale (capped at
+single-digit MB by the context window). So "unbounded context → GB crash" is numerically wrong.
+The only ways a turn reaches GB-scale:
+
+1. **A pathological tool payload** — a single tool result that's itself hundreds of MB (huge
+ file/log read). It enters `messages` unbounded and each step re-`JSON.stringify`s it (stream.ts:91)
+ **plus** the `reqSpan` captures a second copy (stream.ts:112) → ~2× transient per step.
+ **Tool-output bounding** directly prevents this (opencode caps at 50 KB / 2000 lines,
+ offloading full text to a managed file — `packages/opencode/src/tool/truncate.ts`).
+2. **O(N²) re-serialization across many steps**, only if GC can't keep up or there's retention.
+3. **A genuine retention leak** (message arrays / spans / closures held after the step or turn) —
+ accumulates per-turn over time (the 6.2 GB-over-2.5 h pattern). **Bounding context does NOT
+ fix this** — the retained reference must be found. (Not yet investigated; the span's
+ `bodyString` copy is written to the journal sink on disk, so it's likely transient, not
+ retained — but unverified.)
+
+**Dispatch vs opencode (context hygiene — NOT crash fixes):**
+| Aspect | Dispatch | opencode |
+|---|---|---|
+| Mid-turn compaction | refuses during active conv (orchestrator.ts:1325) | compacts between steps near context window (llm.ts:210) |
+| Tool result size in history | unbounded | bounded 50 KB / 2000 lines (truncate.ts) |
+| Step limit | `MAX_STEPS = 0` (unlimited) | configured per agent; `MAX_STEPS_PROMPT` disables tools at last step |
+| History sent to provider | full raw `messages` array | projected `toLLMMessages(...)` after compaction |
+
+These are general hygiene improvements, **not** crash fixes (the live crash is the SSH bug; the
+segfault is native). Tool-output bounding is the one with direct crash relevance (prevents
+pathological-payload spikes). Compaction/max-steps do not reduce RAM for context the agent
+genuinely needs.
+
+---
+
+## 6. Recovery behavior on restart (repair-and-seal, NOT resume)
+
+When the server crashes and systemd restarts it:
+- **`conversation-store/extension.ts:21-27`** — boot-sweep: lists all conversations with
+ `status: ["active"]` and sets them to `"idle"`.
+- **`store.ts:681`** — on load, `reconcileWithReport` repairs partial turns: synthesizes error
+ results for orphaned tool-calls, strips error-only trailing assistant messages, drops empty
+ messages. Emits a `reconcile.repair` span when repairs occur.
+- **`heartbeat/heartbeat.ts:299`** — sweeps stale "running" runs to "stopped".
+
+**Result:** the DB is always consistent after a restart (no broken conversations), but in-flight
+turns are **sealed, not resumed** — partial assistant output is preserved (reconciled), the
+conversation is marked idle, and the user must re-send. This is why the `uncaughtException` guard
+matters: without it, one SSH error kills the process → **every** in-flight turn (not just the one
+using SSH) is sealed and must re-send. With the guard, only the failing turn seals; the rest
+continue.
+
+---
+
+## 7. Data artifacts & quick-reference
+
+```bash
+# Live state
+systemctl status dispatch
+systemctl show dispatch -p MemoryMax -p MemoryHigh # 24G / 20G (live, applied)
+curl http://localhost:24991/health # → {"ok":true}
+
+# Telemetry lives HERE (not journald):
+/var/log/dispatch/app.ndjson # current process
+/var/log/dispatch/app.ndjson.1 # rotated (older crash windows rotated away — only .1 kept)
+# grep: rg 'memory:periodic|memory:gc' /var/log/dispatch/app.ndjson
+
+# Journal (crash stacks):
+journalctl -u dispatch --since '24 hours ago' | rg 'Main process exited|panic|Bun v'
+
+# The two investigation docs:
+notes/memory-leak-investigation-handoff.md # prior (has the 3 errors in §3)
+notes/crash-investigation-findings.md # THIS file (definitive)
+crash-review-report.md # independent Gemini review (found the SSH bug)
+
+# Suspect code:
+packages/ssh/src/pool.ts # buildConnection/doConnect — the crash
+packages/host-bin/src/main.ts # missing uncaughtException guard
+packages/kernel/src/runtime/run-turn.ts # MAX_STEPS=0, streaming retry loop
+packages/openai-stream/src/stream.ts # JSON.stringify whole body + span copy
+packages/session-orchestrator/src/orchestrator.ts # warm probe (latent), activeConversations
+```
+
+## 8. Current fix scope
+
+**In scope now (root-cause, Dispatch code):**
+1. `packages/ssh/src/pool.ts` — permanent `'error'` listener on the pooled `ssh2.Client` + logging.
+2. `packages/host-bin/src/main.ts` — `uncaughtException` + `unhandledRejection` guards with rich
+ logging (message, stack, activeConversations, memory snapshot, timestamp).
+
+**Deferred (separate decisions):**
+- Tool-output bounding, mid-turn compaction, MAX_STEPS (general hygiene; not crash fixes).
+- Warm-probe try/catch (latent; the `unhandledRejection` guard covers it for now).
+- Port to Node (only if segfault recurs on 1.3.14 under real load).
+- Core-dump capture setup (operational; do before the next load test).
diff --git a/notes/memory-leak-investigation-handoff.md b/notes/memory-leak-investigation-handoff.md
new file mode 100644
index 0000000..3dc4ad0
--- /dev/null
+++ b/notes/memory-leak-investigation-handoff.md
@@ -0,0 +1,331 @@
+# Memory-Leak Investigation — Handoff for OpenCode Agent
+
+> **Purpose:** This document gives an OpenCode harness agent — running outside
+> the Dispatch system, with no prior context — everything needed to
+> independently investigate the memory leak that is crashing the production
+> Dispatch server. Read it top to bottom before touching anything.
+
+**Last updated:** 2026-06-28
+**Author of this handoff:** umans/umans-glm-5.2 (Dispatch agent)
+**Originating investigation:** `notes/server-crash-investigation.md` (commit `b83aa8d`)
+
+---
+
+## 0. TL;DR — what you are here to do
+
+The production Dispatch server leaks memory at **~2.5 GB/hour** and eventually
+hits a **Bun runtime segfault** (native crash, not a JS exception). A prior
+investigation already ruled out LSP as the cause. Your job is to **find where
+memory is being retained** and propose a fix. There are four undeployed commits
+on the `dev` branch that add memory telemetry and a cgroup circuit breaker —
+**those need to be built, installed, and observed first** to get the data that
+will localize the leak. Do not start code-diving blindly; deploy the telemetry,
+collect a crash cycle, then read the logs.
+
+---
+
+## 1. System Overview — what Dispatch is
+
+Dispatch is an **AI agent orchestration platform**: a backend that runs one or
+more AI "agents" through turns (each turn = a step loop of model calls +
+tool dispatch), plus a separate frontend that consumes the backend's typed
+contracts over HTTP + WebSocket.
+
+**Repo layout** — all under `/home/tradam/projects/dispatch/`:
+
+| Path | What | Git remote | Branch |
+|---|---|---|---|
+| `backend/` | The server (this codebase) | `[email protected]:realtradam/dispatch.git` | `dev` |
+| `frontend/` | The web UI (separate repo, same methodology) | `[email protected]:realtradam/frontend.git` | `dev` |
+| `worktrees/` | Per-task git worktrees for isolated feature branches | (varies) | (varies) |
+| `bin/` *(inside backend)* | Operational shell scripts (build, install, up, secrets, certs) | — | — |
+
+Both repos use **`dev` as the active development branch**. The backend is a
+**Bun + TypeScript** monorepo (project references via `tsc -b`, Biome for
+lint/format, Vitest for tests, `bun:sqlite` for storage, the `ai` /
+`@ai-sdk/*` packages for model providers). Architecture rules are in
+`backend/AGENTS.md` — **read it** before editing (especially the "kernel
+touches no I/O" and "no ambient state" rules).
+
+---
+
+## 2. Where things are installed (production)
+
+| Artifact | Path | Notes |
+|---|---|---|
+| Production server binary | `/usr/bin/dispatch-server` | Bun **standalone compiled** binary. Currently the OLD one (built Jun 27 21:40). Not you to install — see §6. |
+| CLI binary | `/usr/bin/dispatch` | `dispatch send/list/read/...` |
+| Frontend static files | `/usr/share/dispatch/web/` | Built by `bin/build` (Vite, `VITE_HTTP_PORT=24991 VITE_WS_PORT=24990`) |
+| Config / env file | `/etc/dispatch/env` | `EnvironmentFile` for systemd. **Root-readable only** (`Permission denied` for non-root). Source template in repo: `systemd/dispatch.env` (installed by `bin/setup-env`). |
+| Data directory | `/var/lib/dispatch/` | WorkingDirectory of the service. SQLite DBs live here: `dispatch.db`, `dispatch.db-shm`, `dispatch.db-wal`. |
+| Logs | the journal | `journalctl -u dispatch` — there are no log files on disk; everything goes to journald. |
+| systemd unit (live) | `/etc/systemd/system/dispatch.service` | Installed from the repo template `systemd/dispatch.service` by `bin/install`. |
+| systemd unit (repo template) | `backend/systemd/dispatch.service` | **Edit this one**, not the live file — `bin/install` copies it over. |
+| Install script | `backend/bin/install` | Builds + installs the binary, unit, env, frontend. Uses `sudo` on privileged lines only (the agent has no sudo — hand scripts to the user). |
+| Build script | `backend/bin/build` | `tsc --build` then `bun build --compile` → `dist/dispatch-server`. |
+| Memory-limits script | `backend/bin/apply-memory-limits.sh` | Applies `MemoryHigh`/`MemoryMax` to the live service. **User has NOT run it yet.** |
+
+---
+
+## 3. Production ports
+
+| Service | Port | Confirmed |
+|---|---|---|
+| HTTP API | **24991** | `BACKEND_PORT=24991` in `systemd/dispatch.env` (set by `bin/setup-env`). Live: `ss -tlnp` shows `dispatch-server` listening on `*:24991`. |
+| Surface WebSocket | **24990** | `SURFACE_WS_PORT`. Live: `ss -tlnp` shows `dispatch-server` listening on `*:24990`. |
+
+The **dev** stack (`bin/up`) uses different ports: **24203** (HTTP) + **24205** (WS) + **24204** (frontend Vite dev server). Don't confuse dev ports with prod ports.
+
+**Health check:** `curl http://localhost:24991/health` → `{"ok":true}`
+
+---
+
+## 4. How to access the running server
+
+```bash
+# Is it running?
+systemctl status dispatch
+
+# Live logs (follow)
+journalctl -u dispatch -f
+
+# Recent crashes / errors (last 2h)
+journalctl -u dispatch --since '2 hours ago' -n 200
+
+# Memory telemetry — ONLY visible once the new binary is deployed (see §6).
+# The EXACT log tags to grep (NOT "[mem-telemetry]" — that is the module name):
+journalctl -u dispatch -f | rg 'memory:periodic|memory:gc'
+
+# Restart (needs root — hand to the user)
+sudo systemctl restart dispatch
+```
+
+> **The service is named `dispatch`, NOT `dispatch-server`.** The binary is
+> `dispatch-server`; the systemd unit is `dispatch.service`. This mismatch
+> tripped up the first investigation — don't repeat it.
+
+The backend checkout at `/home/tradam/projects/dispatch/backend/` is on `dev`
+and contains the source. The running binary is compiled, so **editing source
+does nothing until you rebuild + install + restart** (§6).
+
+---
+
+## 5. The memory leak — what we know
+
+### 5.1 Symptom & crash signature
+
+- The server grows **~2.5 GB/hour** under load, eventually triggering a **Bun
+ runtime segfault** (native crash).
+- **Last crash:** Jun 28 00:12 JST.
+ ```
+ panic(main thread): Segmentation fault at address 0x0
+ oh no: Bun has crashed. This indicates a bug in Bun, not your code.
+ Main process exited, code=dumped, status=4/ILL
+ ```
+ RSS was **6.2 GB** at crash (over 2h31m uptime). A prior session hit
+ **25.7 GB over 18h**.
+- The crash is a **native segfault inside Bun's runtime** (NULL deref in the
+ allocator/GC), NOT a JS exception. The crash-time memory readout was itself
+ corrupted (`RSS: 0.02ZB` — impossible), and systemd saw SIGILL while Bun
+ reported SIGSEGV — both signs of **heap corruption under memory pressure**.
+- This is the **only** segfault in 5 days of logs. Earlier crashes (Jun 27
+ ~02:44–02:52) were `exit-code` failures = application-level LSP bugs, **now
+ fixed** (see §7).
+
+### 5.2 What it is NOT
+
+- **NOT LSP.** The Language Server Protocol extension was the original prime
+ suspect; it is exonerated. Its caches are bounded (`MAX_OPEN_DOCUMENTS = 50`,
+ `MAX_PUSH_DIAGNOSTICS = 100` — see `packages/lsp/src/client.ts:153` and
+ `diagnostics.ts:47`). 50 docs + 100 diag entries cannot explain gigabytes.
+- **NOT the conversation store.** `packages/conversation-store` is
+ **SQLite-backed** (all reads/writes go through a `StorageNamespace` to
+ `bun:sqlite`) — it does not hold conversations in an in-memory map.
+- **NOT `activeConversations`.** The session-orchestrator's
+ `activeConversations` is just a `Set<string>` of conversation IDs (tiny).
+
+### 5.3 Prime suspects (not yet confirmed — that's your job)
+
+1. **The AI-SDK streaming path** — the `async` generator returned by
+ `provider.stream(...)` (`@ai-sdk/*`, including the `openai-stream` package).
+ Streaming response buffers may be retained if the generator is not fully
+ drained or if the SDK holds internal buffers per request.
+2. **Per-turn message arrays** in `session-orchestrator` — the history +
+ `userMsg` + `providerMessages` arrays assembled before each
+ `provider.stream(...)` call. For long multi-step agent turns these can be
+ large; if references outlive the turn (closures, unresolved promises, event
+ listeners), they leak.
+3. **A Bun bug fixed in 1.3.14.** The crash was on Bun **v1.3.13**. Bun 1.3.14
+ has been built locally but not installed (§6.3) — deploying it may itself
+ resolve the segfault even if a leak remains.
+
+### 5.4 Key files to examine (the leak-suspect code path)
+
+| File | What's there | Why it matters |
+|---|---|---|
+| `packages/session-orchestrator/src/orchestrator.ts` | `runTurnDetached()` at **line 541** — kicks off a turn; the `activeConversations.add/delete` around it (556, 979) brackets the turn lifecycle. | Turn lifecycle: where memory should be acquired and released. If a turn's buffers aren't released here, it leaks per-turn. |
+| `packages/session-orchestrator/src/orchestrator.ts` | `for await (const event of provider.stream(messages, assembled.tools, providerOpts))` at **line 1276** (and a second one at **1430** for compaction/summary). | **The streaming loop** — the #1 suspect. This is where the AI-SDK async generator is consumed. If the generator is abandoned mid-stream (error, `break`, abort) or its internal buffers are retained, memory grows per turn. |
+| `packages/session-orchestrator/src/pure.ts` | The pure turn/step decision logic + the `MemorySample`/`memoryDelta`/`memorySampleAttributes` helpers used by telemetry. | The per-turn before/after sampling wraps the stream boundary (referenced by `mem-telemetry.ts`). |
+| `packages/kernel/src/runtime/run-turn.ts` | The **step loop** (`runTurn`) — one agent turn = N steps of (model call → tool dispatch → feed results back). | MAX_STEPS is disabled (`0 = unlimited`, commit `e8b4bf1`), so a single turn can run many steps. Many steps ⇒ many accumulated message arrays ⇒ more retention surface. |
+| `packages/kernel/src/runtime/dispatch.ts` | **Tool dispatch** (the `maxConcurrent`/`eager` tool-execution loop). | Tool results accumulate into the turn's message history. A rogue tool returning huge payloads could balloon arrays. |
+| `packages/host-bin/src/mem-telemetry.ts` | The periodic telemetry edge effect. | Read this to understand exactly what `memory:periodic` / `memory:gc` log entries contain (rss, heapUsed, heapTotal, external, arrayBuffers, activeConversations, reclaimedRssMB). |
+
+> **Architecture note (AGENTS.md):** the kernel (`packages/kernel`) touches NO
+> I/O and names no concrete feature. Decision logic is pure; effects are
+> injected at the edges (host-bin). When investigating, keep this layering in
+> mind — a leak "in the kernel" would actually be in a closure captured by an
+> edge that feeds the kernel, not in the kernel's own (stateless) turn loop.
+
+---
+
+## 6. What's been done (committed to `dev`, NOT yet deployed)
+
+The running binary is still the **old one** (built Jun 27 21:40, pre-telemetry,
+pre-Bun-1.3.14, pre-LSP-disable). Four commits on `dev` need building +
+installing before any of this takes effect. **Verify before you trust:** the
+live systemd still reports `MemoryMax=infinity` (confirmed via
+`systemctl show dispatch -p MemoryMax`).
+
+| Commit | What | Status |
+|---|---|---|
+| `d1de9ed` | `systemd/dispatch.service` gets `MemoryHigh=20G` + `MemoryMax=24G` circuit breaker. | Committed. **Live = infinity (not applied).** |
+| `b3b6eb4` | `bin/apply-memory-limits.sh` — applies the limits to the live service. | Committed. **User has NOT run it.** |
+| `1cd66da` | Memory telemetry: periodic `process.memoryUsage()` every 15s, per-turn before/after sampling, `activeConversations` tagging, `Bun.gc(true)` every 5 min. Files: `packages/host-bin/src/mem-telemetry.ts` + `packages/session-orchestrator/src/pure.ts`. | Committed. **NOT deployed.** |
+| `73ff84c` | **LSP disabled** as a precaution (`main.ts` import commented out + removed from `CORE_EXTENSIONS`; `transport-http` tolerates its absence). Also set telemetry interval 60s→15s. | Committed. **NOT deployed.** LSP stays disabled until the leak is understood. |
+| (bun upgrade) | Bun **1.3.13 → 1.3.14** via `bun upgrade`. New binary built at `dist/dispatch-server`. | Rebuilt. **NOT installed** to `/usr/bin/dispatch-server`. |
+
+### 6.1 The telemetry data you will collect (once deployed)
+
+The exact log tags to grep in journald are **`memory:periodic`** and
+**`memory:gc`** (the module is `mem-telemetry.ts`, but the log *messages* are
+those two strings — do not grep for `[mem-telemetry]`).
+
+- `memory:periodic` — every 15s: `rss`, `heapUsed`, `heapTotal`, `external`,
+ `arrayBuffers` (bytes), plus `activeConversations` (count). Correlate RSS
+ growth with active turns: if RSS climbs while `activeConversations` is 0,
+ the leak is background/idle (timers, watchers, retained closures); if it
+ climbs only during/after turns, it's the streaming/turn path.
+- `memory:gc` — every 5 min: runs `Bun.gc(true)` and logs RSS before/after +
+ `reclaimedRssMB` (`before - after`). **Interpretation:** a large positive
+ `reclaimedRssMB` = GC freed it (fragmentation, reclaimable — not a true
+ leak). Near-zero/negative `reclaimedRssMB` = memory is **LIVE** (retained
+ objects — a real leak). This single field distinguishes the two failure
+ modes; check it first when you get data.
+
+### 6.2 Deploy sequence (hand to the user — you have no sudo)
+
+The agent cannot run `sudo` or install to `/usr/bin`. Write a script with
+`sudo` on the privileged lines and hand it to the user (per the repo's sudo
+policy). The sequence, roughly:
+
+1. From `backend/`: `bun run typecheck && bun run test && bun run check` —
+ make sure `dev` is green (it should be; these commits are committed).
+2. `bin/build` — produces `dist/dispatch-server` (Bun 1.3.14, with telemetry +
+ LSP-disabled).
+3. `bin/install` (or a targeted script) — copies `dist/dispatch-server` →
+ `/usr/bin/dispatch-server`, the unit template → `/etc/systemd/system/`,
+ `systemd/dispatch.env` → `/etc/dispatch/env`. (Or run `bin/apply-memory-limits.sh`
+ separately if you only want the cgroup limits without a full reinstall.)
+4. `sudo systemctl daemon-reload && sudo systemctl restart dispatch`.
+5. Confirm: `systemctl show dispatch -p MemoryMax` should show `24G`, and
+ `journalctl -u dispatch -f | rg 'memory:periodic'` should show telemetry
+ within 15s.
+
+> ⚠️ **Warning:** deploying restarts the production server. The live server is
+> actively serving agent conversations. Coordinate with the user (ceb2 /
+> tradam) before restarting — don't just yank it. Also note the running server
+> at time of writing is PID 28956 (started Jun 28 08:51).
+
+---
+
+## 7. Background — the original crash investigation (already done)
+
+You do **not** need to redo this; it's recorded for context.
+
+- **Original report:** `notes/server-crash-investigation.md` (commit `b83aa8d`).
+- The first investigation's task description was **stale**: it claimed an
+ "uncommitted hot-fix that disables LSP" existed. It did not — the working
+ tree was clean and LSP was *enabled*. The developer then *chose* to disable
+ LSP (commit `73ff84c`) as a precaution, but the root cause was already
+ identified as the Bun segfault + leak, not LSP.
+- The **three original LSP suspects** (JSON-parse TypeError, `fs.watch` ENOENT,
+ unbounded cache leak) were all fixed in commits `05ff256` + `f9d1ca5` and
+ are correct/tested. LSP being disabled now is a precaution, not a fix for the
+ crash. Once the leak is understood, LSP can be re-enabled safely.
+- **Do not spend time re-investigating LSP.** It is exonerated.
+
+---
+
+## 8. What needs investigation (your tasks, in order)
+
+1. **Deploy the telemetry + cgroup limits** (§6) — you cannot localize the leak
+ without the `memory:periodic` / `memory:gc` data. Get a full crash cycle's
+ worth of logs (let it run until it either hits `MemoryMax=24G` and restarts,
+ or until the growth pattern is clear).
+2. **Read the telemetry first.** The `memory:gc` `reclaimedRssMB` field tells
+ you live-retained vs fragmentation. If near-zero after GC → real retained
+ leak; chase which subsystem holds it.
+3. **Correlate growth with `activeConversations`.** Idle-growth ⇒ timers /
+ watchers / retained closures / the LSP file-watcher (if any servers still
+ spawn — but LSP is disabled). Turn-bound growth ⇒ the streaming/turn path
+ (suspects in §5.4).
+4. **Examine the streaming loop** (`orchestrator.ts:1276`): is the
+ `provider.stream(...)` async generator fully drained on every path
+ (success, error, abort, `break`)? Does the AI SDK retain response buffers?
+ Consider adding a before/after `memoryUsage()` around a single turn to
+ measure per-turn delta directly.
+5. **Examine the message arrays** assembled before `provider.stream` — are
+ they scoped to the turn and released when the turn completes, or do
+ closures/promises/event-listeners keep them alive?
+6. **Test Bun 1.3.14 in isolation.** Since the upgrade is built but not
+ installed, see if a memory-stress repro under 1.3.13 vs 1.3.14 behaves
+ differently — the segfault may be a Bun allocator bug that 1.3.14 fixes.
+7. **Propose a fix** (do not implement without coordinating — report up to the
+ orchestrator). Likely candidates: ensure the streaming generator is always
+ fully consumed / explicitly closed on error; bound per-turn message
+ history; release references at `activeConversations.delete` time; or
+ confirm it's purely a Bun bug requiring the 1.3.14 deploy.
+
+---
+
+## 9. Quick-reference commands
+
+```bash
+# --- status & logs ---
+systemctl status dispatch
+systemctl show dispatch -p MemoryMax -p MemoryHigh # expect 24G/20G after deploy
+journalctl -u dispatch -f # live logs
+journalctl -u dispatch -f | rg 'memory:periodic|memory:gc' # telemetry (after deploy)
+journalctl -u dispatch --since '2 hours ago' -n 200 # recent history / crashes
+curl http://localhost:24991/health # → {"ok":true}
+ss -tlnp | rg dispatch # ports 24991 + 24990
+
+# --- source (the leak path) ---
+cd /home/tradam/projects/dispatch/backend
+git log --oneline -n 8 # see the undeployed commits
+rg -n 'runTurnDetached|provider\.stream|for await' packages/session-orchestrator/src/orchestrator.ts
+
+# --- build & verify (NO sudo needed) ---
+bun run typecheck && bun run test && bun run check
+bin/build # → dist/dispatch-server
+
+# --- deploy (NEEDS sudo — hand a script to the user) ---
+# bin/install (or: sudo cp dist/dispatch-server /usr/bin/dispatch-server)
+# bin/apply-memory-limits.sh (applies MemoryMax to live service)
+# sudo systemctl daemon-reload && sudo systemctl restart dispatch
+```
+
+---
+
+## 10. Guardrails
+
+- **Do NOT edit implementation code without reporting to the orchestrator.**
+ This is an investigation handoff; propose fixes, don't apply them
+ unilaterally.
+- **You have no sudo.** Any step touching `/usr/bin`, `/etc/`, or
+ `systemctl restart` must be a script handed to the user with `sudo` on the
+ privileged lines.
+- **Do not re-enable LSP** until the leak is understood (it's disabled as a
+ precaution; re-enabling is a separate decision).
+- **Do not merge or push.** Work stays on `dev` locally.
+- **The service is `dispatch`, not `dispatch-server`.**
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index aa114d5..70d1cb2 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -21,13 +21,20 @@ import {
type SecretsAccess,
type StorageNamespace,
} from "@dispatch/kernel";
-import { extension as lspExt } from "@dispatch/lsp";
+// LSP temporarily disabled — crashes (unhandled JSON parse, ENOENT on
+// transient .old_modules dirs) and a memory leak. Re-enable after fix.
+// import { extension as lspExt } from "@dispatch/lsp";
import { extension as mcpExt } from "@dispatch/mcp";
import { extension as messageQueueExt } from "@dispatch/message-queue";
import { extension as providerConcurrencyExt } from "@dispatch/provider-concurrency";
import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat";
import { extension as providerUmansExt } from "@dispatch/provider-umans";
-import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator";
+import {
+ type MemorySample,
+ memorySampleAttributes,
+ extension as sessionOrchestratorExt,
+ sessionOrchestratorHandle,
+} from "@dispatch/session-orchestrator";
import { extension as skillsExt } from "@dispatch/skills";
import { extension as sshExt } from "@dispatch/ssh";
import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite";
@@ -49,6 +56,7 @@ import type { ChildHandle } from "./collector-supervisor.js";
import { createCollectorSupervisor } from "./collector-supervisor.js";
import { configMapToAccess, envToConfigMap } from "./config.js";
import { loadExternalExtensions } from "./load-external.js";
+import { startMemoryTelemetry } from "./mem-telemetry.js";
function createEmptySecrets(): SecretsAccess {
return {
@@ -101,7 +109,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [
skillsExt,
systemPromptExt,
cacheWarmingExt,
- lspExt,
+ // lspExt, // LSP temporarily disabled — see import above
// ssh declares `dependsOn: ["exec-backend"]` and PROVIDES the remote
// exec-backend factory + the ComputerService the HTTP routes delegate to.
// Its lookups are lazy (tool-/request-time), but it is placed after
@@ -227,10 +235,45 @@ async function boot(): Promise<void> {
}
}
+ // Periodic memory telemetry — leak-localization edge effect (AGENTS.md:
+ // timers are edge effects owned by host-bin, the composition root, NOT the
+ // kernel). Logs process.memoryUsage() every 60s tagged with the active-
+ // conversation count, and every 5 min runs Bun.gc(true) + logs RSS
+ // before/after to distinguish live retained objects from GC fragmentation.
+ // The per-turn before/after sampling lives in session-orchestrator; this
+ // owns the PERIODIC baseline. All effects are injected (no ambient state);
+ // stop() is cleared on shutdown so timers never leak across a restart.
+ let memoryTelemetry: { stop: () => void } | undefined;
+ let activeConvCountFn: (() => number) | undefined;
+ try {
+ const orchestrator = host.getHostAPI().getService(sessionOrchestratorHandle);
+ activeConvCountFn = () => orchestrator.getActiveConversationCount();
+ memoryTelemetry = startMemoryTelemetry({
+ logger: logger.child({ extensionId: "mem-telemetry" }),
+ sampleMemory: (): MemorySample => {
+ const m = process.memoryUsage();
+ return {
+ rss: m.rss,
+ heapUsed: m.heapUsed,
+ heapTotal: m.heapTotal,
+ external: m.external,
+ arrayBuffers: m.arrayBuffers,
+ };
+ },
+ gc: () => Bun.gc(true),
+ getActiveConversationCount: () => orchestrator.getActiveConversationCount(),
+ });
+ } catch (err) {
+ logger.error("Memory telemetry not started (session-orchestrator unavailable)", {
+ err,
+ });
+ }
+
let shuttingDown = false;
const shutdown = async () => {
if (shuttingDown) return;
shuttingDown = true;
+ memoryTelemetry?.stop();
logger.info("Shutting down — deactivating extensions");
await host.deactivate();
logger.info("Draining collector");
@@ -240,6 +283,38 @@ async function boot(): Promise<void> {
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
+ const memorySnapshot = () => {
+ const m = process.memoryUsage();
+ return memorySampleAttributes({
+ rss: m.rss,
+ heapUsed: m.heapUsed,
+ heapTotal: m.heapTotal,
+ external: m.external,
+ arrayBuffers: m.arrayBuffers,
+ });
+ };
+
+ process.on("unhandledRejection", (reason) => {
+ logger.error("unhandledRejection", {
+ err: reason,
+ handler: "unhandledRejection",
+ activeConversations: activeConvCountFn?.() ?? "unavailable",
+ timestamp: new Date().toISOString(),
+ ...memorySnapshot(),
+ });
+ });
+
+ process.on("uncaughtException", (err) => {
+ logger.error("uncaughtException", {
+ err,
+ handler: "uncaughtException",
+ activeConversations: activeConvCountFn?.() ?? "unavailable",
+ timestamp: new Date().toISOString(),
+ ...memorySnapshot(),
+ });
+ void shutdown();
+ });
+
logger.info("Dispatch booted");
console.info("Dispatch booted");
}
diff --git a/packages/host-bin/src/mem-telemetry.test.ts b/packages/host-bin/src/mem-telemetry.test.ts
new file mode 100644
index 0000000..20237ef
--- /dev/null
+++ b/packages/host-bin/src/mem-telemetry.test.ts
@@ -0,0 +1,225 @@
+import type { Attributes, Logger } from "@dispatch/kernel";
+import type { MemorySample } from "@dispatch/session-orchestrator";
+import { describe, expect, it } from "vitest";
+import {
+ buildGcAttributes,
+ buildPeriodicAttributes,
+ type MemoryTelemetryDeps,
+ startMemoryTelemetry,
+} from "./mem-telemetry.js";
+
+/** Minimal capturing logger — records every info/debug/warn/error call. */
+interface CapturedLog {
+ readonly level: string;
+ readonly msg: string;
+ readonly attrs?: Attributes;
+}
+
+function capturingLogger(): { logger: Logger; logs: CapturedLog[] } {
+ const logs: CapturedLog[] = [];
+ const record = (level: string) => (msg: string, attrs?: Attributes) => {
+ logs.push({ level, msg, attrs });
+ };
+ const logger: Logger = {
+ debug: record("debug"),
+ info: record("info"),
+ warn: record("warn"),
+ error: () => {},
+ child: () => logger,
+ span: () => ({
+ id: "s",
+ log: logger,
+ setAttributes: () => {},
+ addLink: () => {},
+ child: () => ({}) as never,
+ end: () => {},
+ }),
+ };
+ return { logger, logs };
+}
+
+const SAMPLE_A: MemorySample = {
+ rss: 100 * 1024 * 1024,
+ heapUsed: 40 * 1024 * 1024,
+ heapTotal: 60 * 1024 * 1024,
+ external: 5 * 1024 * 1024,
+ arrayBuffers: 2 * 1024 * 1024,
+};
+const SAMPLE_B: MemorySample = {
+ rss: 300 * 1024 * 1024,
+ heapUsed: 80 * 1024 * 1024,
+ heapTotal: 60 * 1024 * 1024,
+ external: 5 * 1024 * 1024,
+ arrayBuffers: 6 * 1024 * 1024,
+};
+
+describe("buildPeriodicAttributes", () => {
+ it("formats the sample as MB and tags the active-conversation count", () => {
+ const attrs = buildPeriodicAttributes(SAMPLE_A, 3);
+ expect(attrs).toEqual({
+ rssMB: 100,
+ heapUsedMB: 40,
+ heapTotalMB: 60,
+ externalMB: 5,
+ arrayBuffersMB: 2,
+ activeConversations: 3,
+ });
+ });
+});
+
+describe("buildGcAttributes", () => {
+ it("carries absolute after values plus reclaimed (before-after) delta", () => {
+ const attrs = buildGcAttributes(SAMPLE_A, SAMPLE_B);
+ // Absolute "after" values (SAMPLE_B).
+ expect(attrs.rssMB).toBe(300);
+ expect(attrs.heapUsedMB).toBe(80);
+ // Reclaimed delta: before - after is negative here (memory GREW), so
+ // reclaimedRssMB = round((100-300) MB) = -200.
+ expect(attrs.reclaimedRssMB).toBe(-200);
+ expect(attrs.reclaimedHeapUsedMB).toBe(-40);
+ expect(attrs.reclaimedHeapTotalMB).toBe(0);
+ expect(attrs.reclaimedArrayBuffersMB).toBe(-4);
+ });
+
+ it("shows a positive reclaimed value when GC freed memory", () => {
+ const attrs = buildGcAttributes(SAMPLE_B, SAMPLE_A);
+ expect(attrs.reclaimedRssMB).toBe(200);
+ });
+});
+
+describe("startMemoryTelemetry", () => {
+ function fakeTimers(): {
+ setInterval: MemoryTelemetryDeps["setInterval"];
+ clearInterval: MemoryTelemetryDeps["clearInterval"];
+ tick: (name: "sample" | "gc") => void;
+ cleared: { sample: number; gc: number };
+ } {
+ // Store the callbacks keyed by interval so we can drive either one. The
+ // gc interval is always larger than the sample interval, so route the
+ // larger ms to the gc callback.
+ let sampleCb: (() => void) | undefined;
+ let gcCb: (() => void) | undefined;
+ let firstMs = 0;
+ const cleared = { sample: 0, gc: 0 };
+ return {
+ setInterval: (fn, ms) => {
+ if (firstMs === 0) {
+ firstMs = ms;
+ sampleCb = fn;
+ return "sample" as never;
+ }
+ // The second timer registered is the gc one (larger interval).
+ gcCb = fn;
+ return "gc" as never;
+ },
+ clearInterval: (handle) => {
+ if (handle === "sample") cleared.sample++;
+ else if (handle === "gc") cleared.gc++;
+ },
+ tick: (name) => {
+ if (name === "sample") sampleCb?.();
+ else gcCb?.();
+ },
+ cleared,
+ };
+ }
+
+ it("logs a periodic sample tagged with the active-conversation count", () => {
+ const { logger, logs } = capturingLogger();
+ let active = 2;
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => SAMPLE_A,
+ gc: () => {},
+ getActiveConversationCount: () => active,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ timers.tick("sample");
+ const periodic = logs.find((l) => l.msg === "memory:periodic");
+ expect(periodic).toBeDefined();
+ expect(periodic?.attrs?.rssMB).toBe(100);
+ expect(periodic?.attrs?.activeConversations).toBe(2);
+
+ // The count is read live (re-evaluated each tick).
+ active = 5;
+ timers.tick("sample");
+ const periodic2 = logs.filter((l) => l.msg === "memory:periodic");
+ expect(periodic2).toHaveLength(2);
+ expect(periodic2[1]?.attrs?.activeConversations).toBe(5);
+
+ handle.stop();
+ expect(timers.cleared.sample).toBe(1);
+ expect(timers.cleared.gc).toBe(1);
+ });
+
+ it("runs gc and logs RSS before/after on the gc interval", () => {
+ const { logger, logs } = capturingLogger();
+ let calls = 0;
+ const samples = [SAMPLE_A, SAMPLE_B];
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => samples[calls++] as MemorySample,
+ gc: () => {},
+ getActiveConversationCount: () => 0,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ timers.tick("gc");
+ const gcLog = logs.find((l) => l.msg === "memory:gc");
+ expect(gcLog).toBeDefined();
+ // before=SAMPLE_A (call 0), after=SAMPLE_B (call 1) — rss grew 100→300.
+ expect(gcLog?.attrs?.rssMB).toBe(300);
+ expect(gcLog?.attrs?.reclaimedRssMB).toBe(-200);
+
+ handle.stop();
+ });
+
+ it("actually calls the injected gc function", () => {
+ const { logger } = capturingLogger();
+ let gcCalls = 0;
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => SAMPLE_A,
+ gc: () => {
+ gcCalls++;
+ },
+ getActiveConversationCount: () => 0,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ timers.tick("gc");
+ expect(gcCalls).toBe(1);
+ // Periodic ticks do NOT trigger gc.
+ timers.tick("sample");
+ expect(gcCalls).toBe(1);
+
+ handle.stop();
+ });
+
+ it("stop is idempotent (clearing twice is harmless)", () => {
+ const { logger } = capturingLogger();
+ const timers = fakeTimers();
+ const handle = startMemoryTelemetry({
+ logger,
+ sampleMemory: () => SAMPLE_A,
+ gc: () => {},
+ getActiveConversationCount: () => 0,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+
+ handle.stop();
+ handle.stop();
+ // Both timers cleared exactly once each (second stop is a no-op on the
+ // same handles — clear count stays at 1 per handle).
+ expect(timers.cleared.sample).toBe(1);
+ expect(timers.cleared.gc).toBe(1);
+ });
+});
diff --git a/packages/host-bin/src/mem-telemetry.ts b/packages/host-bin/src/mem-telemetry.ts
new file mode 100644
index 0000000..576347b
--- /dev/null
+++ b/packages/host-bin/src/mem-telemetry.ts
@@ -0,0 +1,160 @@
+/**
+ * Periodic memory telemetry — leak-localization edge effect.
+ *
+ * Owns the timers (setInterval) that periodically log process.memoryUsage()
+ * so RSS growth can be correlated with active conversations/turns and the
+ * leaking subsystem pinpointed. This is the composition-root edge effect
+ * (AGENTS.md: timers are edge effects owned by host-bin, NOT the kernel).
+ *
+ * All effects are injected (sampler, GC, clock, logger, active-conversation
+ * count) so the timer logic is fully testable — tests pass fakes and assert
+ * the emitted log calls, never waiting on a real wall clock. No ambient
+ * state (P3): the timers are owned explicitly and returned as a `stop()`
+ * handle that the composition root clears on shutdown.
+ *
+ * The per-turn before/after sampling lives in session-orchestrator (it wraps
+ * the stream boundary); this module owns the PERIODIC baseline + GC logging.
+ */
+
+import type { Logger } from "@dispatch/kernel";
+import {
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
+} from "@dispatch/session-orchestrator";
+
+/** Default periodic sample interval: every 15s. */
+export const DEFAULT_MEMORY_SAMPLE_INTERVAL_MS = 15_000;
+
+/** Default GC interval: every 5 min (longer than the sample interval). */
+export const DEFAULT_GC_INTERVAL_MS = 5 * 60_000;
+
+/**
+ * Deps injected into {@link startMemoryTelemetry}. Every effect is explicit so
+ * the timer logic is reproducible from its inputs (no ambient state, P3) and
+ * testable without a real clock or process.
+ */
+export interface MemoryTelemetryDeps {
+ /** Logger (auto-scoped to host-bin by the composition root). */
+ readonly logger: Logger;
+ /** Edge effect: capture a {@link MemorySample} now (process.memoryUsage()). */
+ readonly sampleMemory: () => MemorySample;
+ /**
+ * Edge effect: run a full GC cycle. In production this is `() =>
+ * Bun.gc(true)`; tests pass a no-op or a counting fake. Used on the longer
+ * GC interval to distinguish live retained objects from GC fragmentation.
+ */
+ readonly gc: () => void;
+ /**
+ * The number of conversations currently driving a turn (from the
+ * session-orchestrator's activeConversations set). Tags each periodic
+ * sample so growth can be attributed to the streaming/turn path vs an idle
+ * baseline.
+ */
+ readonly getActiveConversationCount: () => number;
+ /** Periodic sample interval (ms). Defaults to 15s. */
+ readonly sampleIntervalMs?: number;
+ /** GC interval (ms). Defaults to 5 min. */
+ readonly gcIntervalMs?: number;
+ /**
+ * Injected timer scheduler (defaults to global setInterval). Tests pass a
+ * fake to drive ticks deterministically without real wall-clock waits. The
+ * handle type is opaque (the same type {@link clearInterval} accepts).
+ */
+ readonly setInterval?: (fn: () => void, ms: number) => MemoryTimerHandle;
+ /** Injected timer clearer (defaults to global clearInterval). */
+ readonly clearInterval?: (handle: MemoryTimerHandle | undefined) => void;
+}
+
+/** Opaque timer handle shared by {@link MemoryTelemetryDeps.setInterval} / clearInterval. */
+export type MemoryTimerHandle = ReturnType<typeof globalThis.setInterval>;
+
+/** Handle returned by {@link startMemoryTelemetry} to stop the timers. */
+export interface MemoryTelemetryHandle {
+ /** Stop both timers. Idempotent. Called by the composition root on shutdown. */
+ readonly stop: () => void;
+}
+
+/**
+ * Start periodic memory telemetry. Logs process.memoryUsage() every
+ * `sampleIntervalMs` (default 15s) tagged with the active-conversation count,
+ * and every `gcIntervalMs` (default 5 min) runs `gc()` and logs RSS
+ * before/after to distinguish live retained objects from GC fragmentation.
+ *
+ * Returns a `stop()` handle that clears both timers. The composition root
+ * (host-bin main) owns this handle and calls `stop()` on shutdown so the
+ * timers never leak across a restart.
+ *
+ * Pure decision logic: {@link buildPeriodicAttributes} /
+ * {@link buildGcAttributes} are exported separately for unit testing.
+ */
+export function startMemoryTelemetry(deps: MemoryTelemetryDeps): MemoryTelemetryHandle {
+ const sampleIntervalMs = deps.sampleIntervalMs ?? DEFAULT_MEMORY_SAMPLE_INTERVAL_MS;
+ const gcIntervalMs = deps.gcIntervalMs ?? DEFAULT_GC_INTERVAL_MS;
+ const setIntervalFn = deps.setInterval ?? globalThis.setInterval;
+ const clearIntervalFn = deps.clearInterval ?? globalThis.clearInterval;
+
+ let gcHandle: MemoryTimerHandle | undefined;
+
+ // Periodic sample: log rss/heap/external/arrayBuffers + active-conversation
+ // count every 15s. Correlates RSS growth with active turns so the leak can
+ // be attributed to the streaming path vs an idle baseline.
+ const sampleHandle: MemoryTimerHandle | undefined = setIntervalFn(() => {
+ const sample = deps.sampleMemory();
+ const activeConversations = deps.getActiveConversationCount();
+ deps.logger.info("memory:periodic", buildPeriodicAttributes(sample, activeConversations));
+ }, sampleIntervalMs);
+
+ // GC log: on a longer interval, force a full GC and log RSS before/after.
+ // A small/no drop after gc means the memory is LIVE (retained objects — the
+ // leak); a large drop means it was GC fragmentation (reclaimable). This
+ // distinguishes the two failure modes the crash investigation flagged.
+ gcHandle = setIntervalFn(() => {
+ const before = deps.sampleMemory();
+ deps.gc();
+ const after = deps.sampleMemory();
+ deps.logger.info("memory:gc", buildGcAttributes(before, after));
+ }, gcIntervalMs);
+
+ let stopped = false;
+ return {
+ stop() {
+ if (stopped) return; // idempotent — safe to call on every shutdown path
+ stopped = true;
+ clearIntervalFn(sampleHandle);
+ clearIntervalFn(gcHandle);
+ },
+ };
+}
+
+/**
+ * Pure: build the logger attributes for a periodic sample. Exported for unit
+ * testing (no I/O, no clock). The `activeConversations` count tags the sample
+ * so growth can be attributed to the streaming/turn path vs idle baseline.
+ */
+export function buildPeriodicAttributes(
+ sample: MemorySample,
+ activeConversations: number,
+): ReturnType<typeof memorySampleAttributes> & { activeConversations: number } {
+ return { ...memorySampleAttributes(sample), activeConversations };
+}
+
+/**
+ * Pure: build the logger attributes for a GC sample. Exported for unit testing.
+ * Carries the absolute after-sample plus the `reclaimed` delta
+ * (`before - after`): a POSITIVE `reclaimedRssMB` means GC freed memory
+ * (fragmentation, reclaimable); near-zero/negative means the memory is LIVE
+ * (retained objects — the leak). This distinguishes the two failure modes the
+ * crash investigation flagged.
+ */
+export function buildGcAttributes(
+ before: MemorySample,
+ after: MemorySample,
+): ReturnType<typeof memorySampleAttributes> {
+ // reclaimed = before - after (how much GC freed). memoryDelta(a, b) = b - a,
+ // so pass (after, before) to get before - after.
+ return {
+ ...memorySampleAttributes(after),
+ ...memorySampleAttributes(memoryDelta(after, before), "reclaimed"),
+ };
+}
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index 783d894..777feaa 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -65,6 +65,19 @@ export function activate(host: HostAPI): void {
logger: host.logger,
now: () => Date.now(),
emit: (hook, payload) => host.emit(hook, payload),
+ // Injected process.memoryUsage() sampler — the production edge. Tests
+ // inject a fake to assert per-turn before/after telemetry. Wired in the
+ // shell (like `now: () => Date.now()`); pure decision logic is untouched.
+ sampleMemory: () => {
+ const m = process.memoryUsage();
+ return {
+ rss: m.rss,
+ heapUsed: m.heapUsed,
+ heapTotal: m.heapTotal,
+ external: m.external,
+ arrayBuffers: m.arrayBuffers,
+ };
+ },
resolveQueue: () => {
// Lazily resolve the message-queue service. Returns undefined when the
// extension isn't loaded (feature degrades off) — checked via the
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index cc41fa8..3d3e816 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -39,6 +39,9 @@ export {
defaultDispatchPolicy,
delayFor,
generateTurnId,
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
RETRY_BUDGET_MS,
RETRY_SCHEDULE_MS,
RETRY_TAIL_MS,
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index 076ad51..400ca0b 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -4,7 +4,10 @@ import type {
AgentEvent,
ChatMessage,
EventHookDescriptor,
+ LogDeps,
Logger,
+ LogRecord,
+ LogSink,
ProviderContract,
ProviderEvent,
ProviderStreamOptions,
@@ -15,7 +18,7 @@ import type {
ToolContract,
TurnMetrics,
} from "@dispatch/kernel";
-import { runTurn } from "@dispatch/kernel";
+import { createLogger, runTurn } from "@dispatch/kernel";
import type { SystemPromptService } from "@dispatch/system-prompt";
import { describe, expect, it } from "vitest";
import {
@@ -27,6 +30,7 @@ import {
type TurnLifecyclePayload,
type WarmCompletedPayload,
} from "./orchestrator.js";
+import type { MemorySample } from "./pure.js";
import type { ToolAssembly } from "./tools-filter.js";
function createInMemoryStore(): ConversationStore & {
@@ -3980,3 +3984,135 @@ describe("system prompt: compaction flow", () => {
expect(capturedSystemPrompt?.startsWith("RECONSTRUCTED")).toBe(false);
});
});
+
+describe("per-turn memory telemetry", () => {
+ function capturingLogger(): { logger: Logger; records: LogRecord[] } {
+ let id = 0;
+ const deps: LogDeps = { now: () => 1000 + id++, newId: () => `id-${id++}` };
+ const records: LogRecord[] = [];
+ const sink: LogSink = { emit: (r) => records.push(r) };
+ return { logger: createLogger({ extensionId: "session-orchestrator" }, sink, deps), records };
+ }
+
+ it("logs before/after samples around the stream, tagged with conversationId + turnId", async () => {
+ const store = createInMemoryStore();
+ const provider: ProviderContract = { id: "p", stream: async function* () {} };
+ const { captured, captureRunTurn } = createCapturingRunTurn();
+ const { logger, records } = capturingLogger();
+
+ const samples: MemorySample[] = [
+ { rss: 100, heapUsed: 10, heapTotal: 20, external: 1, arrayBuffers: 0 },
+ { rss: 250, heapUsed: 30, heapTotal: 20, external: 1, arrayBuffers: 5 },
+ ];
+ let sampleIdx = 0;
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: captureRunTurn,
+ logger,
+ sampleMemory: () => samples[sampleIdx++] as MemorySample,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-mem",
+ text: "hi",
+ onEvent: () => {},
+ });
+
+ expect(captured).toHaveLength(1);
+ const beforeLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:before");
+ const afterLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:after");
+ expect(beforeLogs).toHaveLength(1);
+ expect(afterLogs).toHaveLength(1);
+
+ // Both samples carry the turn's conversationId + turnId correlation.
+ const turnId = captured[0]?.turnId;
+ expect(turnId).toMatch(/^turn-/);
+ const before = beforeLogs[0] as Extract<LogRecord, { kind: "log" }>;
+ const after = afterLogs[0] as Extract<LogRecord, { kind: "log" }>;
+ expect(before.conversationId).toBe("conv-mem");
+ expect(before.turnId).toBe(turnId);
+ expect(after.conversationId).toBe("conv-mem");
+ expect(after.turnId).toBe(turnId);
+
+ // Before sample carries absolute MB values.
+ expect(before.attributes?.rssMB).toBe(0); // 100 bytes rounds to 0 MB
+ // After sample carries absolute + delta (delta rss = 150 bytes → 0 MB).
+ expect(after.attributes?.rssMB).toBe(0);
+ // deltaRssMB is the rounded delta (150 bytes → 0 MB).
+ expect(after.attributes).toHaveProperty("deltaRssMB");
+ });
+
+ it("emits no memory logs when sampleMemory is not injected (degrades off)", async () => {
+ const store = createInMemoryStore();
+ const provider: ProviderContract = { id: "p", stream: async function* () {} };
+ const { captureRunTurn } = createCapturingRunTurn();
+ const { logger, records } = capturingLogger();
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: captureRunTurn,
+ logger,
+ // sampleMemory intentionally omitted
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-no-mem",
+ text: "hi",
+ onEvent: () => {},
+ });
+
+ const memLogs = records.filter(
+ (r) => r.kind === "log" && typeof r.msg === "string" && r.msg.startsWith("memory:turn"),
+ );
+ expect(memLogs).toHaveLength(0);
+ });
+
+ it("getActiveConversationCount tracks in-flight turns", async () => {
+ const store = createInMemoryStore();
+ const result: RunTurnResult = {
+ messages: [{ role: "assistant", chunks: [{ type: "text", text: "ok" }] }],
+ usage: { inputTokens: 1, outputTokens: 1 },
+ finishReason: "stop",
+ };
+ // A runTurn that blocks until the test releases it — keeps the turn
+ // active so getActiveConversationCount reflects an in-flight turn.
+ let release: () => void = () => {};
+ const blocked = new Promise<void>((resolve) => {
+ release = resolve;
+ });
+ const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => {
+ await blocked;
+ return result;
+ };
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => ({ id: "p", stream: async function* () {} }),
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: blockingRunTurn,
+ });
+
+ expect(orchestrator.getActiveConversationCount()).toBe(0);
+ const done = orchestrator.handleMessage({
+ conversationId: "conv-active",
+ text: "hi",
+ onEvent: () => {},
+ });
+ // Give the detached turn a tick to register as active.
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(orchestrator.getActiveConversationCount()).toBe(1);
+
+ release();
+ await done;
+ expect(orchestrator.getActiveConversationCount()).toBe(0);
+ });
+});
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 4c6a673..aaf418a 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -30,6 +30,9 @@ import {
defaultDispatchPolicy,
delayFor,
generateTurnId,
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
resolveModelName,
resolveReasoningEffort,
} from "./pure.js";
@@ -331,6 +334,13 @@ export interface SessionOrchestrator {
subscribe(conversationId: string, listener: TurnEventListener): () => void;
isActive(conversationId: string): boolean;
/**
+ * The number of conversations currently driving a turn (in the
+ * `activeConversations` set). Used by host-bin's periodic memory telemetry
+ * to tag each RSS sample with the active-conversation count, so growth can
+ * be attributed to the streaming/turn path vs an idle baseline.
+ */
+ getActiveConversationCount(): number;
+ /**
* Explicitly close a conversation (the user closed its tab — distinct from a
* socket disconnect, which never touches the turn): aborts any in-flight turn
* (the kernel finishes with `finishReason: "aborted"`, partial messages are
@@ -431,6 +441,16 @@ export interface SessionOrchestratorDeps {
readonly logger?: Logger;
/** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */
readonly now?: () => number;
+ /**
+ * Optional process.memoryUsage() sampler, injected for testability. When
+ * present, the orchestrator captures a sample immediately before and after
+ * each turn's stream completes (`deps.runTurn`) and logs the per-turn delta
+ * tagged with conversationId + turnId — correlating RSS growth with the
+ * streaming path (the prime leak suspect). When absent (undefined), no
+ * per-turn memory telemetry is emitted (feature degrades off cleanly).
+ * Pure decision logic stays unchanged; this is additive observability.
+ */
+ readonly sampleMemory?: () => MemorySample;
/** Emit a lifecycle event hook to subscribers. Injected from host. */
readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void;
}
@@ -886,6 +906,18 @@ export function createSessionOrchestrator(
// FE to syncTail during generation (CR-6).
await deps.conversationStore.append(conversationId, [userMsg]);
+ // Per-turn memory telemetry: capture a sample immediately BEFORE the
+ // stream starts. Paired with the post-stream sample below, this
+ // measures the streaming path's memory footprint per turn (the prime
+ // leak suspect — AI-SDK streaming buffers + per-turn message arrays).
+ // Tagged with conversationId + turnId via the turnLogger's correlation
+ // context. Additive observability only — does not alter the stream.
+ const sampleMem = deps.sampleMemory;
+ const memBefore = sampleMem?.();
+ if (memBefore !== undefined) {
+ turnLogger?.debug("memory:turn:before", memorySampleAttributes(memBefore));
+ }
+
let stepsPersisted = false;
const result = await deps.runTurn({
...opts,
@@ -899,6 +931,23 @@ export function createSessionOrchestrator(
},
});
+ // Per-turn memory telemetry: capture a sample immediately AFTER the
+ // stream completes and log the per-turn delta vs `memBefore`. A
+ // positive rss delta on a sealed turn flags memory retained by the
+ // streaming path (the leak we are localizing). No I/O beyond the
+ // injected sampler; pure delta computation via memoryDelta(). The
+ // delta attributes carry a `delta` prefix so the absolute "after"
+ // values and the per-turn delta coexist without key collision.
+ if (memBefore !== undefined) {
+ const memAfter = sampleMem?.();
+ if (memAfter !== undefined) {
+ turnLogger?.info("memory:turn:after", {
+ ...memorySampleAttributes(memAfter),
+ ...memorySampleAttributes(memoryDelta(memBefore, memAfter), "delta"),
+ });
+ }
+ }
+
// Fallback: if onStepComplete was never called (e.g., a fake
// runTurn in tests), persist all result messages as a batch.
if (!stepsPersisted && result.messages.length > 0) {
@@ -1042,6 +1091,10 @@ export function createSessionOrchestrator(
return activeTurns.has(conversationId);
},
+ getActiveConversationCount() {
+ return activeConversations.size;
+ },
+
closeConversation(conversationId) {
const turn = activeTurns.get(conversationId);
const abortedTurn = turn !== undefined;
diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts
index 7a574f1..d4fe84a 100644
--- a/packages/session-orchestrator/src/pure.test.ts
+++ b/packages/session-orchestrator/src/pure.test.ts
@@ -6,6 +6,9 @@ import {
defaultDispatchPolicy,
delayFor,
generateTurnId,
+ type MemorySample,
+ memoryDelta,
+ memorySampleAttributes,
RETRY_BUDGET_MS,
RETRY_SCHEDULE_MS,
RETRY_TAIL_MS,
@@ -194,3 +197,71 @@ describe("retry backoff schedule (delayFor)", () => {
expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true);
});
});
+
+describe("memorySampleAttributes", () => {
+ const sample: MemorySample = {
+ rss: 100 * 1024 * 1024, // 100 MB
+ heapUsed: 40 * 1024 * 1024,
+ heapTotal: 60 * 1024 * 1024,
+ external: 5 * 1024 * 1024,
+ arrayBuffers: 2 * 1024 * 1024,
+ };
+
+ it("formats fields as rounded MB with no prefix", () => {
+ const attrs = memorySampleAttributes(sample);
+ expect(attrs).toEqual({
+ rssMB: 100,
+ heapUsedMB: 40,
+ heapTotalMB: 60,
+ externalMB: 5,
+ arrayBuffersMB: 2,
+ });
+ });
+
+ it("namespaces keys with the given prefix", () => {
+ const attrs = memorySampleAttributes(sample, "delta");
+ expect(attrs).toEqual({
+ deltaRssMB: 100,
+ deltaHeapUsedMB: 40,
+ deltaHeapTotalMB: 60,
+ deltaExternalMB: 5,
+ deltaArrayBuffersMB: 2,
+ });
+ });
+
+ it("rounds fractional MB", () => {
+ const attrs = memorySampleAttributes({ ...sample, rss: 100.6 * 1024 * 1024 });
+ expect(attrs.rssMB).toBe(101);
+ });
+});
+
+describe("memoryDelta", () => {
+ const before: MemorySample = {
+ rss: 200 * 1024 * 1024,
+ heapUsed: 100 * 1024 * 1024,
+ heapTotal: 150 * 1024 * 1024,
+ external: 10 * 1024 * 1024,
+ arrayBuffers: 4 * 1024 * 1024,
+ };
+ const after: MemorySample = {
+ rss: 350 * 1024 * 1024,
+ heapUsed: 120 * 1024 * 1024,
+ heapTotal: 150 * 1024 * 1024,
+ external: 10 * 1024 * 1024,
+ arrayBuffers: 8 * 1024 * 1024,
+ };
+
+ it("computes signed after - before per field", () => {
+ const delta = memoryDelta(before, after);
+ expect(delta.rss).toBe(150 * 1024 * 1024);
+ expect(delta.heapUsed).toBe(20 * 1024 * 1024);
+ expect(delta.heapTotal).toBe(0);
+ expect(delta.external).toBe(0);
+ expect(delta.arrayBuffers).toBe(4 * 1024 * 1024);
+ });
+
+ it("is negative when memory dropped", () => {
+ const delta = memoryDelta(after, before);
+ expect(delta.rss).toBe(-150 * 1024 * 1024);
+ });
+});
diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts
index 0d2068f..489b140 100644
--- a/packages/session-orchestrator/src/pure.ts
+++ b/packages/session-orchestrator/src/pure.ts
@@ -1,4 +1,5 @@
import type {
+ Attributes,
ChatMessage,
Chunk,
ImageInput,
@@ -134,3 +135,71 @@ export function defaultDispatchPolicy(): ToolDispatchPolicy {
export function generateTurnId(): string {
return `turn-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
+
+// ── Memory telemetry (leak localization) ────────────────────────────────────
+//
+// Pure helpers for process.memoryUsage() sampling. The orchestrator owns the
+// sample SHAPE (this type) so its per-turn sampling and the host-bin periodic
+// timer share one contract without a cross-package import of an
+// implementation — host-bin imports this type, the orchestrator never imports
+// host-bin. Pure: inputs → attributes/delta, no I/O, no clock.
+
+/**
+ * A snapshot of process.memoryUsage() at one instant. Mirrors the subset of
+ * Node/Bun's MemoryUsage we log for leak localization (rss, heapUsed,
+ * heapTotal, external, arrayBuffers). Owned here so the orchestrator's
+ * per-turn sampling and the host-bin periodic timer agree on the shape.
+ */
+export interface MemorySample {
+ readonly rss: number;
+ readonly heapUsed: number;
+ readonly heapTotal: number;
+ readonly external: number;
+ readonly arrayBuffers: number;
+}
+
+const BYTES_PER_MB = 1024 * 1024;
+
+function mb(bytes: number): number {
+ return Math.round(bytes / BYTES_PER_MB);
+}
+
+/**
+ * Pure: format a {@link MemorySample} as flat logger {@link Attributes}
+ * (values in MB, rounded). Flat scalars are serializable (D3) and queryable
+ * (D9) in the journal. No I/O.
+ *
+ * Pass a `prefix` to namespace the keys — e.g. `memorySampleAttributes(delta,
+ * "delta")` yields `deltaRssMB`, so an "after" log can carry both the absolute
+ * sample (`rssMB`) and the per-turn delta (`deltaRssMB`) without key collision.
+ * The first letter of each field is capitalized after the prefix for
+ * readability (`deltaRssMB`, not `deltarssMB`).
+ */
+export function memorySampleAttributes(sample: MemorySample, prefix?: string): Attributes {
+ const p = prefix === undefined ? "" : prefix;
+ const cap = (s: string): string =>
+ s.length === 0 ? s : `${s[0]?.toUpperCase() ?? ""}${s.slice(1)}`;
+ const field = (name: string): string => (p.length === 0 ? name : `${p}${cap(name)}`);
+ return {
+ [field("rssMB")]: mb(sample.rss),
+ [field("heapUsedMB")]: mb(sample.heapUsed),
+ [field("heapTotalMB")]: mb(sample.heapTotal),
+ [field("externalMB")]: mb(sample.external),
+ [field("arrayBuffersMB")]: mb(sample.arrayBuffers),
+ };
+}
+
+/**
+ * Pure: compute the signed per-field delta `after - before`. A positive
+ * `rss` delta on a sealed turn flags memory retained by the streaming path
+ * (the prime leak suspect). No I/O.
+ */
+export function memoryDelta(before: MemorySample, after: MemorySample): MemorySample {
+ return {
+ rss: after.rss - before.rss,
+ heapUsed: after.heapUsed - before.heapUsed,
+ heapTotal: after.heapTotal - before.heapTotal,
+ external: after.external - before.external,
+ arrayBuffers: after.arrayBuffers - before.arrayBuffers,
+ };
+}
diff --git a/packages/ssh/src/pool.test.ts b/packages/ssh/src/pool.test.ts
new file mode 100644
index 0000000..b5d72c8
--- /dev/null
+++ b/packages/ssh/src/pool.test.ts
@@ -0,0 +1,176 @@
+/**
+ * Unit tests for the pooled ssh2.Client error lifecycle in pool.ts.
+ *
+ * The ssh2.Client (the outermost network edge) is faked via EventEmitter —
+ * permitted by the constitution (mocking the outermost edge is fine; only
+ * `@dispatch/*` may not be mocked). These tests prove the permanent
+ * `'error'` listener survives `cleanup()` (which only removes the connect-time
+ * onReady/onError) and that a post-connect ssh2 error is captured, logged, and
+ * non-throwing, with the connection reconnecting on the next acquire.
+ */
+
+import { EventEmitter } from "node:events";
+import type { Logger } from "@dispatch/kernel";
+import type { Computer } from "@dispatch/wire";
+import type { Client } from "ssh2";
+import { afterEach, describe, expect, it } from "vitest";
+import { createSshConnectionPool } from "./pool.js";
+
+const computer: Computer = {
+ alias: "testremote",
+ hostName: "localhost",
+ port: 22,
+ user: "testuser",
+ identityFile: null,
+ knownHost: false,
+};
+
+interface CapturedError {
+ msg: string;
+ err: unknown;
+ alias?: string;
+ message?: string;
+ level?: string;
+ from?: string;
+ to?: string;
+}
+
+function capturingLogger(calls: CapturedError[]): Logger {
+ const log: Logger = {
+ debug: () => undefined,
+ info: () => undefined,
+ warn: () => undefined,
+ error: (msg, attrs) => {
+ calls.push({
+ msg,
+ err: attrs?.err,
+ alias: attrs?.alias as string | undefined,
+ message: attrs?.message as string | undefined,
+ level: attrs?.level as string | undefined,
+ from: attrs?.from as string | undefined,
+ to: attrs?.to as string | undefined,
+ });
+ },
+ child: () => log,
+ span: (name) => ({
+ id: name,
+ log,
+ setAttributes: () => undefined,
+ addLink: () => undefined,
+ child: (n) => ({ id: n, log }) as never,
+ end: () => undefined,
+ }),
+ };
+ return log;
+}
+
+/** A fake ssh2.Client: EventEmitter + connect/emd/sftp. `connect` emits 'ready'. */
+function fakeClient(): { client: Client; ee: EventEmitter } {
+ const ee = new EventEmitter();
+ const api = {
+ connect: () => {
+ process.nextTick(() => ee.emit("ready"));
+ },
+ end: () => undefined,
+ sftp: (cb: (err: Error | null, sftp: unknown) => void) => cb(null, {}),
+ };
+ return { client: Object.assign(ee, api) as unknown as Client, ee };
+}
+
+function makeDeps(client: Client, logger: Logger) {
+ return {
+ logger,
+ homeDir: "/tmp",
+ knownHostsPath: "/tmp/known_hosts",
+ readFileText: async (p: string): Promise<string> => {
+ if (p === "/tmp/known_hosts") return "";
+ return "ssh-ed25519 not-encrypted-key";
+ },
+ appendKnownHosts: async () => undefined,
+ pathExists: async () => true,
+ newClient: () => client,
+ resolveComputer: async () => computer,
+ };
+}
+
+describe("SshConnectionPool — permanent pooled-client error listener", () => {
+ let pool: ReturnType<typeof createSshConnectionPool>;
+
+ afterEach(async () => {
+ await pool?.closeAll();
+ });
+
+ it("captures a post-connect 'error' without throwing, sets state=error, and logs", async () => {
+ const errorCalls: CapturedError[] = [];
+ const { client, ee } = fakeClient();
+ pool = createSshConnectionPool(makeDeps(client, capturingLogger(errorCalls)));
+
+ const conn = await pool.acquire("testremote");
+ expect(conn.state).toBe("connected");
+
+ const sshErr = Object.assign(new Error("Timed out while waiting for handshake"), {
+ level: "socket",
+ });
+
+ expect(() => ee.emit("error", sshErr)).not.toThrow();
+
+ expect(conn.state).toBe("error");
+ expect(conn.error).toBe("Timed out while waiting for handshake");
+ expect(errorCalls).toHaveLength(1);
+ expect(errorCalls[0]?.msg).toBe("ssh: pooled client error");
+ expect(errorCalls[0]?.alias).toBe("testremote");
+ expect(errorCalls[0]?.message).toBe("Timed out while waiting for handshake");
+ expect(errorCalls[0]?.level).toBe("socket");
+ expect(errorCalls[0]?.from).toBe("connected");
+ expect(errorCalls[0]?.to).toBe("error");
+ });
+
+ it("does NOT remove the permanent listener on successful connect (survives cleanup)", async () => {
+ const errorCalls: CapturedError[] = [];
+ const { client, ee } = fakeClient();
+ pool = createSshConnectionPool(makeDeps(client, capturingLogger(errorCalls)));
+
+ const conn = await pool.acquire("testremote");
+ expect(conn.state).toBe("connected");
+
+ ee.emit("error", new Error("post-connect drop"));
+ expect(conn.state).toBe("error");
+ expect(errorCalls).toHaveLength(1);
+ });
+
+ it("reconnects on the next acquire after a post-connect error", async () => {
+ const { client, ee } = fakeClient();
+ pool = createSshConnectionPool(makeDeps(client, capturingLogger([])));
+
+ const conn = await pool.acquire("testremote");
+ expect(conn.state).toBe("connected");
+
+ ee.emit("error", new Error("post-connect drop"));
+ expect(conn.state).toBe("error");
+
+ const conn2 = await pool.acquire("testremote");
+ expect(conn2.state).toBe("connected");
+ expect(conn2.error).toBeUndefined();
+ });
+
+ it("logs the connect-time error too (permanent listener fires during handshake)", async () => {
+ const errorCalls: CapturedError[] = [];
+ const ee = new EventEmitter();
+ const api = {
+ connect: () => {
+ process.nextTick(() => ee.emit("error", new Error("connect refused")));
+ },
+ end: () => undefined,
+ sftp: (cb: (err: Error | null, sftp: unknown) => void) => cb(null, {}),
+ };
+ const client = Object.assign(ee, api) as unknown as Client;
+ pool = createSshConnectionPool(makeDeps(client, capturingLogger(errorCalls)));
+
+ const conn = await pool.acquire("testremote");
+ expect(conn.state).toBe("error");
+ expect(conn.error).toBe("connect refused");
+ expect(errorCalls).toHaveLength(1);
+ expect(errorCalls[0]?.msg).toBe("ssh: pooled client error");
+ expect(errorCalls[0]?.message).toBe("connect refused");
+ });
+});
diff --git a/packages/ssh/src/pool.ts b/packages/ssh/src/pool.ts
index 9acae0e..5d1eedd 100644
--- a/packages/ssh/src/pool.ts
+++ b/packages/ssh/src/pool.ts
@@ -112,6 +112,24 @@ export function createSshConnectionPool(deps: SshPoolDeps): SshConnectionPool {
let sftp: import("ssh2").SFTPWrapper | null = null;
let connectPromise: Promise<void> | null = null;
+ // Permanent error listener — without it, a post-connect ssh2 'error'
+ // (re-key/keepalive timeout) escapes uncaught → process crash. cleanup()
+ // in doConnect only removes the connect-time onReady/onError; this persists.
+ client.on("error", (err: unknown) => {
+ const from = state.value;
+ state.value = "error";
+ state.error = err instanceof Error ? err.message : String(err);
+ connectPromise = null;
+ deps.logger.error("ssh: pooled client error", {
+ err,
+ alias,
+ message: state.error,
+ level: (err as { level?: string } | null)?.level,
+ from,
+ to: "error",
+ });
+ });
+
const touch = (): void => {
const e = entries.get(alias);
if (e !== undefined) e.lastUsedAt = Date.now();
diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts
index 5bf16b3..cfb7166 100644
--- a/packages/transport-http/src/extension.ts
+++ b/packages/transport-http/src/extension.ts
@@ -11,6 +11,7 @@ import {
credentialStoreHandle,
heartbeatServiceHandle,
lspServiceHandle,
+ type LspService,
mcpServiceHandle,
sessionOrchestratorHandle,
systemPromptHandle,
@@ -27,7 +28,6 @@ export const manifest: Manifest = {
"conversation-store",
"credential-store",
"heartbeat",
- "lsp",
"mcp",
"session-orchestrator",
"throughput-store",
@@ -96,7 +96,14 @@ export function createTransportHttpExtension(): Extension & {
const throughputStore = host.getService(throughputStoreHandle);
const warmService = host.getService(cacheWarmHandle);
const compactionService = host.getService(compactionHandle);
- const lspService = host.getService(lspServiceHandle);
+ // Optional: the `lsp` extension may be disabled (hot-fix). Wrapped because
+ // getService throws for an unregistered handle — degrades to no diagnostics.
+ let lspService: LspService | undefined;
+ try {
+ lspService = host.getService(lspServiceHandle);
+ } catch {
+ lspService = undefined;
+ }
const mcpService = host.getService(mcpServiceHandle);
const systemPromptService = host.getService(systemPromptHandle);
const heartbeatService = host.getService(heartbeatServiceHandle);
@@ -129,7 +136,7 @@ export function createTransportHttpExtension(): Extension & {
throughputStore,
warmService,
compactionService,
- lspService,
+ ...(lspService !== undefined ? { lspService } : {}),
mcpService,
systemPromptService,
heartbeatService,
diff --git a/systemd/dispatch.service b/systemd/dispatch.service
index e651fa7..42b5a29 100644
--- a/systemd/dispatch.service
+++ b/systemd/dispatch.service
@@ -10,6 +10,12 @@ EnvironmentFile=/etc/dispatch/env
WorkingDirectory=/var/lib/dispatch
Restart=on-failure
RestartSec=5
+# Memory-pressure circuit breaker: the server has a memory leak (~2.5 GB/h)
+# that eventually triggers a Bun runtime segfault. These cgroup limits turn
+# the uncontrolled crash into a controlled OOM-kill → clean restart.
+# Machine has 33.24 GB total; 24G hard cap leaves OS headroom.
+MemoryHigh=20G
+MemoryMax=24G
StandardOutput=journal
StandardError=journal