diff options
| author | Adam Malczewski <[email protected]> | 2026-06-28 14:24:58 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-28 14:24:58 +0900 |
| commit | 841e776635d2a93371f302f0617e729626a69fe5 (patch) | |
| tree | c416925f6730a35f2560451ca7c6edaf41e8b1e4 | |
| parent | 71c635f7d8ee01a2b23d5ddfdfc4bff043980052 (diff) | |
| parent | 414080e271ea44df0a7affc154b62e39b51a11a0 (diff) | |
| download | dispatch-841e776635d2a93371f302f0617e729626a69fe5.tar.gz dispatch-841e776635d2a93371f302f0617e729626a69fe5.zip | |
Merge branch 'dev' into feature/workspace-star
| -rwxr-xr-x | bin/apply-memory-limits.sh | 45 | ||||
| -rwxr-xr-x | bin/install | 2 | ||||
| -rw-r--r-- | crash-review-report.md | 86 | ||||
| -rw-r--r-- | notes/crash-investigation-findings.md | 256 | ||||
| -rw-r--r-- | notes/memory-leak-investigation-handoff.md | 331 | ||||
| -rw-r--r-- | packages/host-bin/src/main.ts | 81 | ||||
| -rw-r--r-- | packages/host-bin/src/mem-telemetry.test.ts | 225 | ||||
| -rw-r--r-- | packages/host-bin/src/mem-telemetry.ts | 160 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/extension.ts | 13 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/index.ts | 3 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 138 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 53 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/pure.test.ts | 71 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/pure.ts | 69 | ||||
| -rw-r--r-- | packages/ssh/src/pool.test.ts | 176 | ||||
| -rw-r--r-- | packages/ssh/src/pool.ts | 18 | ||||
| -rw-r--r-- | packages/transport-http/src/extension.ts | 13 | ||||
| -rw-r--r-- | systemd/dispatch.service | 6 |
18 files changed, 1739 insertions, 7 deletions
diff --git a/bin/apply-memory-limits.sh b/bin/apply-memory-limits.sh new file mode 100755 index 0000000..f5c9cf0 --- /dev/null +++ b/bin/apply-memory-limits.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +# apply-memory-limits.sh — apply the MemoryMax/MemoryHigh cgroup limits to the +# LIVE dispatch.service WITHOUT a full reinstall. Safe to run while the server +# is up (the limits take effect on the next restart). +# +# What it does (all privileged lines use sudo): +# 1. Copies the updated dispatch.service template to /etc/systemd/system/ (sudo) +# 2. Reloads systemd so it picks up the new unit file (sudo) +# 3. Restarts dispatch so the cgroup limits are applied (sudo) +# +# Why sudo: /etc/systemd/system/ is root-owned; daemon-reload and restart +# require root. The script carries its own sudo — run it directly (no sudo prefix). +# +# Run: ./bin/apply-memory-limits.sh + +set -euo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT="$(cd "$HERE/.." && pwd)" + +SERVICE_SRC="$ROOT/systemd/dispatch.service" +SERVICE_DST="/etc/systemd/system/dispatch.service" + +if [ ! -f "$SERVICE_SRC" ]; then + echo "apply-memory-limits: template not found at $SERVICE_SRC" >&2 + exit 1 +fi + +echo "[apply] copying updated dispatch.service → $SERVICE_DST" +# Patch in the real user (same as bin/install does) +REAL_USER="${SUDO_USER:-$USER}" +REAL_GROUP=$(id -gn "$REAL_USER" 2>/dev/null || echo "$REAL_USER") +sed "s/^# User\/Group are set by bin/install.*/User=$REAL_USER\nGroup=$REAL_GROUP/" "$SERVICE_SRC" | sudo tee "$SERVICE_DST" > /dev/null +sudo chmod 644 "$SERVICE_DST" + +echo "[apply] daemon-reload…" +sudo systemctl daemon-reload + +echo "[apply] restarting dispatch (cgroup limits take effect)…" +sudo systemctl restart dispatch + +echo "[apply] done!" +echo " Status: systemctl status dispatch" +echo " Memory: systemctl show dispatch -p MemoryHigh -p MemoryMax" +echo " Logs: journalctl -u dispatch -f" diff --git a/bin/install b/bin/install index 4899939..3e2a62f 100755 --- a/bin/install +++ b/bin/install @@ -104,6 +104,8 @@ EnvironmentFile=/etc/dispatch/env WorkingDirectory=/var/lib/dispatch Restart=on-failure RestartSec=5 +MemoryHigh=20G +MemoryMax=24G StandardOutput=journal StandardError=journal diff --git a/crash-review-report.md b/crash-review-report.md new file mode 100644 index 0000000..272abdb --- /dev/null +++ b/crash-review-report.md @@ -0,0 +1,86 @@ +# Production Crash Investigation — Independent Review + +## Executive Summary + +The production Dispatch server is experiencing two distinct failure modes under load: +1. **Exit-code 1 Crashes**: Driven by an unhandled `EventEmitter` `'error'` event from the `ssh2` connection pool, **not** the AI-SDK as previously suspected. +2. **Bun Runtime Segfaults**: Triggered by massive memory pressure from unbounded conversation history serialization during long multi-step agent turns, confirming the "leak" is actually a massive live working set, not a persistent memory leak. + +Additionally, a suspected latent crash path in the cache-warming probe has been confirmed as an Unhandled Promise Rejection. + +--- + +## 1. The Exit-1 Crash ("Timed out while waiting for handshake") + +### Finding: Confirmed Dispatch Bug in SSH Pool (Incorrect Preliminary Finding) +The preliminary analysis hypothesized that the `error: Timed out while waiting for handshake` crash was caused by an unhandled `'error'` on an outbound TLS socket to the AI provider. **This is incorrect.** + +The crash actually originates from the `ssh2` package managing outbound remote computer connections, specifically within `packages/ssh/src/pool.ts`. + +### Technical Analysis +- **The Evidence**: The exact string `'Timed out while waiting for handshake'` is hardcoded in `ssh2/lib/client.js` when the SSH handshake times out or keepalives fail during a re-keying phase. +- **The Code Path**: In `packages/ssh/src/pool.ts`, the `doConnect` function attaches an `onError` listener to the `ssh2.Client` instance to catch connection failures: + ```typescript + client.on("error", onError); + ``` + However, when the connection succeeds (`onReady` fires), the `cleanup()` function is called, which **removes the error listener**: + ```typescript + function cleanup(): void { + clearTimeout(timer); + client.removeListener("ready", onReady); + client.removeListener("error", onError); // <-- Listener removed here + } + ``` +- **The Crash Mechanism**: After `doConnect` succeeds, the client is placed in the pool and returned to callers. If the SSH connection drops later or a timeout occurs, the `ssh2.Client` emits an `'error'` event. Because there are no longer any listeners attached for `'error'`, Node.js's `EventEmitter` escalates it to an uncaught exception, instantly crashing the process with exit code 1. + +### Recommendation +**Dispatch Code Change**: Add a persistent `.on("error", ...)` handler to the `client` in `buildConnection` (or refrain from removing it) to gracefully catch post-connection drops, tear down the connection, and transition `state.value = "error"`. + +--- + +## 2. The Bun Native Segfaults (The 6.2 GB "Leak") + +### Finding: Massive Live Working Set, Not a Persistent Leak +The preliminary investigation suspected a 2.5 GB/hour slow leak. The telemetry data confirms that the memory is **not permanently leaked**—when `activeConversations` drops to `0`, the RSS cleanly drops back down to the ~84 MB baseline. The crash is caused by unbounded live working set growth during concurrent agent turns, which fragments and overwhelms Bun's allocator. + +### Technical Analysis +- **The Code Path**: `MAX_STEPS` in `packages/kernel/src/runtime/run-turn.ts` is set to `0` (unlimited). A single turn can run for hundreds of steps. +- **The Mechanism**: In `executeStep`, every step appends new tool calls and results to the `messages` array. This array is then passed to `provider.stream()`. +- Inside `packages/openai-stream/src/stream.ts`, the entire unbounded array is serialized into a single contiguous string every step: + ```typescript + const bodyString = JSON.stringify(body); + ``` +- **The Crash**: If 4 concurrent conversations (`activeConversations = 4`) run for hundreds of steps, the `messages` arrays grow to hundreds of megabytes each. Serializing these arrays copies them into massive contiguous strings on the V8 heap on *every step*. This causes gigabytes of memory allocation churn, memory pressure spikes (peaking at 6.2 GB), and eventually triggers a native `SIGSEGV`/`SIGILL` in Bun's allocator. + +### Recommendation +**Dispatch Code Change**: +1. Reintroduce a sane `MAX_STEPS` limit (e.g., `50` or `100`) to bound the maximum length of a single turn. +2. Implement a sliding window or context-truncation strategy for `messages` before serializing to prevent the payload from growing infinitely. +3. **Operational Mitigation**: Apply the `MemoryMax` cgroup circuit breaker to turn the segfault into a controlled recycle while the codebase fix is developed. + +--- + +## 3. The Cache-Warming Latent Crash + +### Finding: Confirmed Latent Unhandled Promise Rejection +The preliminary finding suspected a latent crash path due to a missing `try/catch` in the cache-warming probe. This is confirmed. + +### Technical Analysis +- **The Code Path**: In `packages/session-orchestrator/src/orchestrator.ts`, the `createWarmService`'s `warm` function consumes the provider stream: + ```typescript + for await (const event of provider.stream(messages, assembled.tools, providerOpts)) { + ``` +- **The Mechanism**: If the AI provider connection fails or aborts, `provider.stream` throws. Because there is no `try/catch` around this loop, the async `warm` function rejects its returned promise. +- In `packages/cache-warming/src/warmer.ts`, the timer fires `void fireWarm(conversationId, token);`. Since the returned promise is not `await`ed or `.catch()`'d at the top level, it results in an **Unhandled Promise Rejection**. + +### Recommendation +**Dispatch Code Change**: Add a `try/catch` block around the `for await` loop in `createWarmService` (or add `.catch()` to `fireWarm`) to gracefully emit an error result instead of throwing an unhandled rejection. + +--- + +## Assessment of Preliminary Findings + +- ❌ **"Unhandled TLS Socket to AI Provider"**: Incorrect. The exit-1 crash was a race condition in error listener attachment for outbound SSH connections (`ssh2`), not the AI SDK's TLS socket. +- ✅ **"MAX_STEPS = 0 ... structural enabler for large working sets"**: Correct. Unbounded history serialization caused the massive gigabyte allocations that crashed Bun. +- ✅ **"Cache-warming missing try/catch"**: Correct. It is an unhandled promise rejection waiting to happen. +- ✅ **"Not an LSP leak"**: Confirmed. The memory growth is strictly tied to `activeConversations` and the unbounded turn array serialization. diff --git a/notes/crash-investigation-findings.md b/notes/crash-investigation-findings.md new file mode 100644 index 0000000..485e02d --- /dev/null +++ b/notes/crash-investigation-findings.md @@ -0,0 +1,256 @@ +# Production Crash Investigation — Findings + +> **Definitive post-investigation record.** Supersedes `notes/memory-leak-investigation-handoff.md` +> (which contained three material errors — see §3). Authored by opencode (umans-glm-5.2) on +> 2026-06-28, incorporating an independent Gemini review (`crash-review-report.md`) whose pivotal +> finding (the SSH pool) overturned an earlier wrong conclusion. + +**Last updated:** 2026-06-28 +**Status:** Root cause of the **live** (1.3.14) crash is confirmed and fixable in Dispatch code. +The native segfault is **not** root-caused and may already be fixed by the 1.3.14 upgrade — +needs observation under real load. + +--- + +## 0. TL;DR + +- The production server was crash-looping (~20 restarts on Jun 28). **Two distinct failure modes.** +- **Live crash (Bun 1.3.14):** `exit-1 "Timed out while waiting for handshake"`. Root cause is a + **Dispatch bug** in `packages/ssh/src/pool.ts`: the pooled `ssh2.Client` has **no permanent + `'error'` listener** (it's removed after connect by `cleanup()`, pool.ts:266), so a post-connect + ssh2 error emits `'error'` with no listener → uncaught → **no `process.on("uncaughtException")` + guard in main.ts** → process exit-1. Fixable in Dispatch code; no runtime change needed. +- **Segfaults:** all on **Bun v1.3.13** (`Bun v1.3.13 (bf2e2cec)` printed at each). **Zero + segfaults observed on 1.3.14** so far. Not root-caused; may be fixed by the upgrade. +- The prior "~2.5 GB/hour memory leak" framing is **wrong** (see §3). Idle memory is flat at 84 MB. +- **Recovery on restart** is repair-and-seal, not resume: conversations are swept `active→idle` + and partial turns reconciled to a consistent state, but in-flight turns are sealed (user re-sends). + +--- + +## 1. The live crash — SSH pool missing error listener (CONFIRMED, Dispatch bug) + +### Crash signature (the only crash on the current 1.3.14 binary) +``` +error: Timed out while waiting for handshake + at emitError (node:events:51:13) + at <anonymous> (/$bunfs/root/dispatch-server:16:75617) +Bun v1.3.14 (Linux x64) +→ Main process exited, code=exited, status=1/FAILURE +``` +- 09:22:04 JST crash; last telemetry sample (09:21:52) showed `rss=116MB, activeConversations=4`. +- systemd reported 2.7 GB peak, but the crash was **exit-1 (self-exit), not OOM/SIGKILL** — memory + was a symptom, not the kill trigger. + +### Root cause chain +1. The error string `"Timed out while waiting for handshake"` is **ssh2's**, not Bun's TLS. It is + hardcoded in `ssh2/lib/client.js:1114`: + ```js + this._readyTimeout = setTimeout(() => { + const err = new Error('Timed out while waiting for handshake'); + this.emit('error', err); // ← ssh2 emits 'error' on the ssh2.Client + sock.destroy(); + }, this.config.readyTimeout); + ``` + (Bun's own TLS string is the different `"TLS handshake timeout"` — confirmed via `strings` on + the binary.) +2. In `packages/ssh/src/pool.ts`, `doConnect` attaches `client.on("error", onError)` **only during + the connect promise** and removes it in `cleanup()` (pool.ts:266) on success. `buildConnection` + (pool.ts:101-175) returns a long-lived `SshConnection` (keepalive 30s, idle-reap 15m) and + **never attaches a permanent `'error'` listener** to the pooled client. +3. So after a successful connect, the `ssh2.Client` sits in the pool with no `'error'` listener. A + later ssh2 error (re-handshake/re-key/keepalive timeout emitting the string above) → + `emit('error')` → `emitError (node:events)` with no listener → EventEmitter default throws → + uncaught. +4. `packages/host-bin/src/main.ts` has **only `SIGINT`/`SIGTERM` handlers** (main.ts:280-281) — + **no `process.on("uncaughtException")` / `process.on("unhandledRejection")`**. So the uncaught + error exits the process (exit-1), killing every in-flight turn — not just the one using SSH. + +### Why capping concurrency is NOT the fix +A single pooled SSH connection dropping crashes the server regardless of how many turns are +concurrent. Capping `activeConversations` would only reduce frequency, not fix the root cause — +and it cripples a product built around concurrent agents. (The existing provider-concurrency +limiter, capped at 3-4, already bounds concurrent *provider streams*; SSH connections are +tool-execution connections and bypass it.) + +### The fix (root cause) +1. **`packages/ssh/src/pool.ts` — `buildConnection`:** attach a permanent `'error'` listener to + the `ssh2.Client` that tears down the connection, sets `state.value = "error"` / + `state.error = <msg>`, and **logs** (computer alias, error message, connection state) so the + failure is observable. Keep the existing connect-time `onError` for the connect promise. +2. **`packages/host-bin/src/main.ts` — boot:** add `process.on("uncaughtException")` and + `process.on("unhandledRejection")` handlers that **log rich detail** (message, stack, + `activeConversations` count, `process.memoryUsage()` snapshot, timestamp) so we know *where* + they happen, then either survive (for non-fatal) or seal in-flight work + exit gracefully. This + is the defense-in-depth that ensures any *future* unhandled error degrades instead of taking + down the whole server and every in-flight turn. + +--- + +## 2. The segfaults — NOT root-caused; 1.3.13 only (may be fixed on 1.3.14) + +### Crash signature +``` +panic(main thread): Segmentation fault at address 0x0 +oh no: Bun has crashed. This indicates a bug in Bun, not your code. +→ Main process exited, code=dumped, status=4/ILL +``` +- **Every segfault was on Bun v1.3.13** (the log prints `Bun v1.3.13 (bf2e2cec)` at each: + 00:12, 00:45, 00:57, 08:50, 09:05). +- Memory peaks ranged widely: **370 MB** (09:05, 1-min uptime) up to 6.2 GB (00:12, 2.5h uptime). +- The 1.3.14 binary first ran at 09:06 JST. **No segfault has been observed on 1.3.14** — the only + 1.3.14 crash is the SSH exit-1 above. The current process has been stable idle. + +### What this means +- The 370 MB segfault is **not memory exhaustion** (24 GB cgroup) — it's a native concurrency/race + bug in Bun's runtime. No amount of memory bounding prevents a native race. +- The 1.3.14 upgrade **may** have fixed it; the sample so far is small and mostly idle, so this is + **not confirmed**. +- **A native segfault cannot be caught by `process.on("uncaughtException")`** (that catches JS + exceptions, not SIGILL/SIGSEGV). So if the segfault recurs on 1.3.14, the only complete fixes + are: leave Bun (port to Node) or root-cause the native trigger (core dump → backtrace → repro → + upstream Bun report). + +### Plan for the segfault +1. Enable core-dump capture (`ulimit -c unlimited` / systemd `COREdump`) so the next segfault + produces a backtrace rather than a bare `address 0x0`. +2. Run 1.3.14 under the real orchestrator workload (concurrent backend+frontend+subagent turns). +3. If no segfault recurs → likely already fixed; close the issue. +4. If one recurs → root-cause via the backtrace; do NOT paper over it with concurrency caps. + +--- + +## 3. What was WRONG in the prior handoff (corrections) + +`notes/memory-leak-investigation-handoff.md` was a careful document but contained three material +errors that this investigation corrected: + +1. **"Telemetry is in journald — `journalctl | grep memory:periodic`."** Wrong. The logger writes + to a **journal sink file** (`DISPATCH_JOURNAL=/var/log/dispatch/app.ndjson`, main.ts:139), + not stderr/journald. The handoff's grep found nothing *by design* — the data was in the file + all along. (Also: the observability-collector that would drain this file into `traces.db` is + **disabled in compiled binaries** — main.ts:150 guards on a source path that doesn't exist in + prod — so the journal just accumulates and rotates; only `app.ndjson` + `.1` are retained.) +2. **"The segfault persists on Bun 1.3.14" (handoff hypothesis #3 disproven).** Wrong. All + segfaults were on **1.3.13**; no segfault has been seen on 1.3.14. The 1.3.14 upgrade may have + fixed it — the handoff's hypothesis #3 is *not* disproven after all. +3. **"~2.5 GB/hour slow memory leak."** Wrong framing. Telemetry shows **idle is flat at 84 MB** + (20+ min, zero reclaimable, zero growth) — there is no background/timer/closure leak. The + "leak" was the live working set of concurrent multi-step turns, which is reclaimed when turns + end. Moreover, raw context size is never GB-scale: 300k tokens ≈ 1-2 MB, so the gigabytes + under load must come from pathological tool payloads, retention, or native bugs — not + "unbounded context." (See §5.) + +--- + +## 4. What was ruled out + +- **Warm-cache probe** — confirmed (Gemini + my read) as a *latent* unhandled-rejection path + (`createWarmService`'s `warm` has no try/catch around its `for await (provider.stream(...))` at + orchestrator.ts:1276; `fireWarm` drops the promise with `void` at warmer.ts:130). **But it has + never fired in production** — zero `cache-warming` activity in the journal, zero enabled + conversations in the `kv` table (53k rows, all conversation settings; warming is opt-in default + OFF). It is a latent risk worth fixing (the `unhandledRejection` guard from §1 catches its + rejection too), but it is **not** the cause of any observed crash. +- **LSP** — exonerated (per handoff §7; caches bounded; disabled as a precaution, unrelated to + crashes). +- **Idle/background leak** — refuted by telemetry (idle flat at 84 MB). +- **`activeConversations` cap as a fix** — rejected. It's a workaround that cripples the + concurrent-agent product; the existing provider-concurrency limiter (3-4) already bounds + provider streams. The root cause is the missing error listener + missing guard. + +--- + +## 5. Memory: what actually drives GB-scale (and what doesn't) + +The user's key insight: **300k tokens ≈ 1-2 MB** — raw context size is never GB-scale (capped at +single-digit MB by the context window). So "unbounded context → GB crash" is numerically wrong. +The only ways a turn reaches GB-scale: + +1. **A pathological tool payload** — a single tool result that's itself hundreds of MB (huge + file/log read). It enters `messages` unbounded and each step re-`JSON.stringify`s it (stream.ts:91) + **plus** the `reqSpan` captures a second copy (stream.ts:112) → ~2× transient per step. + **Tool-output bounding** directly prevents this (opencode caps at 50 KB / 2000 lines, + offloading full text to a managed file — `packages/opencode/src/tool/truncate.ts`). +2. **O(N²) re-serialization across many steps**, only if GC can't keep up or there's retention. +3. **A genuine retention leak** (message arrays / spans / closures held after the step or turn) — + accumulates per-turn over time (the 6.2 GB-over-2.5 h pattern). **Bounding context does NOT + fix this** — the retained reference must be found. (Not yet investigated; the span's + `bodyString` copy is written to the journal sink on disk, so it's likely transient, not + retained — but unverified.) + +**Dispatch vs opencode (context hygiene — NOT crash fixes):** +| Aspect | Dispatch | opencode | +|---|---|---| +| Mid-turn compaction | refuses during active conv (orchestrator.ts:1325) | compacts between steps near context window (llm.ts:210) | +| Tool result size in history | unbounded | bounded 50 KB / 2000 lines (truncate.ts) | +| Step limit | `MAX_STEPS = 0` (unlimited) | configured per agent; `MAX_STEPS_PROMPT` disables tools at last step | +| History sent to provider | full raw `messages` array | projected `toLLMMessages(...)` after compaction | + +These are general hygiene improvements, **not** crash fixes (the live crash is the SSH bug; the +segfault is native). Tool-output bounding is the one with direct crash relevance (prevents +pathological-payload spikes). Compaction/max-steps do not reduce RAM for context the agent +genuinely needs. + +--- + +## 6. Recovery behavior on restart (repair-and-seal, NOT resume) + +When the server crashes and systemd restarts it: +- **`conversation-store/extension.ts:21-27`** — boot-sweep: lists all conversations with + `status: ["active"]` and sets them to `"idle"`. +- **`store.ts:681`** — on load, `reconcileWithReport` repairs partial turns: synthesizes error + results for orphaned tool-calls, strips error-only trailing assistant messages, drops empty + messages. Emits a `reconcile.repair` span when repairs occur. +- **`heartbeat/heartbeat.ts:299`** — sweeps stale "running" runs to "stopped". + +**Result:** the DB is always consistent after a restart (no broken conversations), but in-flight +turns are **sealed, not resumed** — partial assistant output is preserved (reconciled), the +conversation is marked idle, and the user must re-send. This is why the `uncaughtException` guard +matters: without it, one SSH error kills the process → **every** in-flight turn (not just the one +using SSH) is sealed and must re-send. With the guard, only the failing turn seals; the rest +continue. + +--- + +## 7. Data artifacts & quick-reference + +```bash +# Live state +systemctl status dispatch +systemctl show dispatch -p MemoryMax -p MemoryHigh # 24G / 20G (live, applied) +curl http://localhost:24991/health # → {"ok":true} + +# Telemetry lives HERE (not journald): +/var/log/dispatch/app.ndjson # current process +/var/log/dispatch/app.ndjson.1 # rotated (older crash windows rotated away — only .1 kept) +# grep: rg 'memory:periodic|memory:gc' /var/log/dispatch/app.ndjson + +# Journal (crash stacks): +journalctl -u dispatch --since '24 hours ago' | rg 'Main process exited|panic|Bun v' + +# The two investigation docs: +notes/memory-leak-investigation-handoff.md # prior (has the 3 errors in §3) +notes/crash-investigation-findings.md # THIS file (definitive) +crash-review-report.md # independent Gemini review (found the SSH bug) + +# Suspect code: +packages/ssh/src/pool.ts # buildConnection/doConnect — the crash +packages/host-bin/src/main.ts # missing uncaughtException guard +packages/kernel/src/runtime/run-turn.ts # MAX_STEPS=0, streaming retry loop +packages/openai-stream/src/stream.ts # JSON.stringify whole body + span copy +packages/session-orchestrator/src/orchestrator.ts # warm probe (latent), activeConversations +``` + +## 8. Current fix scope + +**In scope now (root-cause, Dispatch code):** +1. `packages/ssh/src/pool.ts` — permanent `'error'` listener on the pooled `ssh2.Client` + logging. +2. `packages/host-bin/src/main.ts` — `uncaughtException` + `unhandledRejection` guards with rich + logging (message, stack, activeConversations, memory snapshot, timestamp). + +**Deferred (separate decisions):** +- Tool-output bounding, mid-turn compaction, MAX_STEPS (general hygiene; not crash fixes). +- Warm-probe try/catch (latent; the `unhandledRejection` guard covers it for now). +- Port to Node (only if segfault recurs on 1.3.14 under real load). +- Core-dump capture setup (operational; do before the next load test). diff --git a/notes/memory-leak-investigation-handoff.md b/notes/memory-leak-investigation-handoff.md new file mode 100644 index 0000000..3dc4ad0 --- /dev/null +++ b/notes/memory-leak-investigation-handoff.md @@ -0,0 +1,331 @@ +# Memory-Leak Investigation — Handoff for OpenCode Agent + +> **Purpose:** This document gives an OpenCode harness agent — running outside +> the Dispatch system, with no prior context — everything needed to +> independently investigate the memory leak that is crashing the production +> Dispatch server. Read it top to bottom before touching anything. + +**Last updated:** 2026-06-28 +**Author of this handoff:** umans/umans-glm-5.2 (Dispatch agent) +**Originating investigation:** `notes/server-crash-investigation.md` (commit `b83aa8d`) + +--- + +## 0. TL;DR — what you are here to do + +The production Dispatch server leaks memory at **~2.5 GB/hour** and eventually +hits a **Bun runtime segfault** (native crash, not a JS exception). A prior +investigation already ruled out LSP as the cause. Your job is to **find where +memory is being retained** and propose a fix. There are four undeployed commits +on the `dev` branch that add memory telemetry and a cgroup circuit breaker — +**those need to be built, installed, and observed first** to get the data that +will localize the leak. Do not start code-diving blindly; deploy the telemetry, +collect a crash cycle, then read the logs. + +--- + +## 1. System Overview — what Dispatch is + +Dispatch is an **AI agent orchestration platform**: a backend that runs one or +more AI "agents" through turns (each turn = a step loop of model calls + +tool dispatch), plus a separate frontend that consumes the backend's typed +contracts over HTTP + WebSocket. + +**Repo layout** — all under `/home/tradam/projects/dispatch/`: + +| Path | What | Git remote | Branch | +|---|---|---|---| +| `backend/` | The server (this codebase) | `[email protected]:realtradam/dispatch.git` | `dev` | +| `frontend/` | The web UI (separate repo, same methodology) | `[email protected]:realtradam/frontend.git` | `dev` | +| `worktrees/` | Per-task git worktrees for isolated feature branches | (varies) | (varies) | +| `bin/` *(inside backend)* | Operational shell scripts (build, install, up, secrets, certs) | — | — | + +Both repos use **`dev` as the active development branch**. The backend is a +**Bun + TypeScript** monorepo (project references via `tsc -b`, Biome for +lint/format, Vitest for tests, `bun:sqlite` for storage, the `ai` / +`@ai-sdk/*` packages for model providers). Architecture rules are in +`backend/AGENTS.md` — **read it** before editing (especially the "kernel +touches no I/O" and "no ambient state" rules). + +--- + +## 2. Where things are installed (production) + +| Artifact | Path | Notes | +|---|---|---| +| Production server binary | `/usr/bin/dispatch-server` | Bun **standalone compiled** binary. Currently the OLD one (built Jun 27 21:40). Not you to install — see §6. | +| CLI binary | `/usr/bin/dispatch` | `dispatch send/list/read/...` | +| Frontend static files | `/usr/share/dispatch/web/` | Built by `bin/build` (Vite, `VITE_HTTP_PORT=24991 VITE_WS_PORT=24990`) | +| Config / env file | `/etc/dispatch/env` | `EnvironmentFile` for systemd. **Root-readable only** (`Permission denied` for non-root). Source template in repo: `systemd/dispatch.env` (installed by `bin/setup-env`). | +| Data directory | `/var/lib/dispatch/` | WorkingDirectory of the service. SQLite DBs live here: `dispatch.db`, `dispatch.db-shm`, `dispatch.db-wal`. | +| Logs | the journal | `journalctl -u dispatch` — there are no log files on disk; everything goes to journald. | +| systemd unit (live) | `/etc/systemd/system/dispatch.service` | Installed from the repo template `systemd/dispatch.service` by `bin/install`. | +| systemd unit (repo template) | `backend/systemd/dispatch.service` | **Edit this one**, not the live file — `bin/install` copies it over. | +| Install script | `backend/bin/install` | Builds + installs the binary, unit, env, frontend. Uses `sudo` on privileged lines only (the agent has no sudo — hand scripts to the user). | +| Build script | `backend/bin/build` | `tsc --build` then `bun build --compile` → `dist/dispatch-server`. | +| Memory-limits script | `backend/bin/apply-memory-limits.sh` | Applies `MemoryHigh`/`MemoryMax` to the live service. **User has NOT run it yet.** | + +--- + +## 3. Production ports + +| Service | Port | Confirmed | +|---|---|---| +| HTTP API | **24991** | `BACKEND_PORT=24991` in `systemd/dispatch.env` (set by `bin/setup-env`). Live: `ss -tlnp` shows `dispatch-server` listening on `*:24991`. | +| Surface WebSocket | **24990** | `SURFACE_WS_PORT`. Live: `ss -tlnp` shows `dispatch-server` listening on `*:24990`. | + +The **dev** stack (`bin/up`) uses different ports: **24203** (HTTP) + **24205** (WS) + **24204** (frontend Vite dev server). Don't confuse dev ports with prod ports. + +**Health check:** `curl http://localhost:24991/health` → `{"ok":true}` + +--- + +## 4. How to access the running server + +```bash +# Is it running? +systemctl status dispatch + +# Live logs (follow) +journalctl -u dispatch -f + +# Recent crashes / errors (last 2h) +journalctl -u dispatch --since '2 hours ago' -n 200 + +# Memory telemetry — ONLY visible once the new binary is deployed (see §6). +# The EXACT log tags to grep (NOT "[mem-telemetry]" — that is the module name): +journalctl -u dispatch -f | rg 'memory:periodic|memory:gc' + +# Restart (needs root — hand to the user) +sudo systemctl restart dispatch +``` + +> **The service is named `dispatch`, NOT `dispatch-server`.** The binary is +> `dispatch-server`; the systemd unit is `dispatch.service`. This mismatch +> tripped up the first investigation — don't repeat it. + +The backend checkout at `/home/tradam/projects/dispatch/backend/` is on `dev` +and contains the source. The running binary is compiled, so **editing source +does nothing until you rebuild + install + restart** (§6). + +--- + +## 5. The memory leak — what we know + +### 5.1 Symptom & crash signature + +- The server grows **~2.5 GB/hour** under load, eventually triggering a **Bun + runtime segfault** (native crash). +- **Last crash:** Jun 28 00:12 JST. + ``` + panic(main thread): Segmentation fault at address 0x0 + oh no: Bun has crashed. This indicates a bug in Bun, not your code. + Main process exited, code=dumped, status=4/ILL + ``` + RSS was **6.2 GB** at crash (over 2h31m uptime). A prior session hit + **25.7 GB over 18h**. +- The crash is a **native segfault inside Bun's runtime** (NULL deref in the + allocator/GC), NOT a JS exception. The crash-time memory readout was itself + corrupted (`RSS: 0.02ZB` — impossible), and systemd saw SIGILL while Bun + reported SIGSEGV — both signs of **heap corruption under memory pressure**. +- This is the **only** segfault in 5 days of logs. Earlier crashes (Jun 27 + ~02:44–02:52) were `exit-code` failures = application-level LSP bugs, **now + fixed** (see §7). + +### 5.2 What it is NOT + +- **NOT LSP.** The Language Server Protocol extension was the original prime + suspect; it is exonerated. Its caches are bounded (`MAX_OPEN_DOCUMENTS = 50`, + `MAX_PUSH_DIAGNOSTICS = 100` — see `packages/lsp/src/client.ts:153` and + `diagnostics.ts:47`). 50 docs + 100 diag entries cannot explain gigabytes. +- **NOT the conversation store.** `packages/conversation-store` is + **SQLite-backed** (all reads/writes go through a `StorageNamespace` to + `bun:sqlite`) — it does not hold conversations in an in-memory map. +- **NOT `activeConversations`.** The session-orchestrator's + `activeConversations` is just a `Set<string>` of conversation IDs (tiny). + +### 5.3 Prime suspects (not yet confirmed — that's your job) + +1. **The AI-SDK streaming path** — the `async` generator returned by + `provider.stream(...)` (`@ai-sdk/*`, including the `openai-stream` package). + Streaming response buffers may be retained if the generator is not fully + drained or if the SDK holds internal buffers per request. +2. **Per-turn message arrays** in `session-orchestrator` — the history + + `userMsg` + `providerMessages` arrays assembled before each + `provider.stream(...)` call. For long multi-step agent turns these can be + large; if references outlive the turn (closures, unresolved promises, event + listeners), they leak. +3. **A Bun bug fixed in 1.3.14.** The crash was on Bun **v1.3.13**. Bun 1.3.14 + has been built locally but not installed (§6.3) — deploying it may itself + resolve the segfault even if a leak remains. + +### 5.4 Key files to examine (the leak-suspect code path) + +| File | What's there | Why it matters | +|---|---|---| +| `packages/session-orchestrator/src/orchestrator.ts` | `runTurnDetached()` at **line 541** — kicks off a turn; the `activeConversations.add/delete` around it (556, 979) brackets the turn lifecycle. | Turn lifecycle: where memory should be acquired and released. If a turn's buffers aren't released here, it leaks per-turn. | +| `packages/session-orchestrator/src/orchestrator.ts` | `for await (const event of provider.stream(messages, assembled.tools, providerOpts))` at **line 1276** (and a second one at **1430** for compaction/summary). | **The streaming loop** — the #1 suspect. This is where the AI-SDK async generator is consumed. If the generator is abandoned mid-stream (error, `break`, abort) or its internal buffers are retained, memory grows per turn. | +| `packages/session-orchestrator/src/pure.ts` | The pure turn/step decision logic + the `MemorySample`/`memoryDelta`/`memorySampleAttributes` helpers used by telemetry. | The per-turn before/after sampling wraps the stream boundary (referenced by `mem-telemetry.ts`). | +| `packages/kernel/src/runtime/run-turn.ts` | The **step loop** (`runTurn`) — one agent turn = N steps of (model call → tool dispatch → feed results back). | MAX_STEPS is disabled (`0 = unlimited`, commit `e8b4bf1`), so a single turn can run many steps. Many steps ⇒ many accumulated message arrays ⇒ more retention surface. | +| `packages/kernel/src/runtime/dispatch.ts` | **Tool dispatch** (the `maxConcurrent`/`eager` tool-execution loop). | Tool results accumulate into the turn's message history. A rogue tool returning huge payloads could balloon arrays. | +| `packages/host-bin/src/mem-telemetry.ts` | The periodic telemetry edge effect. | Read this to understand exactly what `memory:periodic` / `memory:gc` log entries contain (rss, heapUsed, heapTotal, external, arrayBuffers, activeConversations, reclaimedRssMB). | + +> **Architecture note (AGENTS.md):** the kernel (`packages/kernel`) touches NO +> I/O and names no concrete feature. Decision logic is pure; effects are +> injected at the edges (host-bin). When investigating, keep this layering in +> mind — a leak "in the kernel" would actually be in a closure captured by an +> edge that feeds the kernel, not in the kernel's own (stateless) turn loop. + +--- + +## 6. What's been done (committed to `dev`, NOT yet deployed) + +The running binary is still the **old one** (built Jun 27 21:40, pre-telemetry, +pre-Bun-1.3.14, pre-LSP-disable). Four commits on `dev` need building + +installing before any of this takes effect. **Verify before you trust:** the +live systemd still reports `MemoryMax=infinity` (confirmed via +`systemctl show dispatch -p MemoryMax`). + +| Commit | What | Status | +|---|---|---| +| `d1de9ed` | `systemd/dispatch.service` gets `MemoryHigh=20G` + `MemoryMax=24G` circuit breaker. | Committed. **Live = infinity (not applied).** | +| `b3b6eb4` | `bin/apply-memory-limits.sh` — applies the limits to the live service. | Committed. **User has NOT run it.** | +| `1cd66da` | Memory telemetry: periodic `process.memoryUsage()` every 15s, per-turn before/after sampling, `activeConversations` tagging, `Bun.gc(true)` every 5 min. Files: `packages/host-bin/src/mem-telemetry.ts` + `packages/session-orchestrator/src/pure.ts`. | Committed. **NOT deployed.** | +| `73ff84c` | **LSP disabled** as a precaution (`main.ts` import commented out + removed from `CORE_EXTENSIONS`; `transport-http` tolerates its absence). Also set telemetry interval 60s→15s. | Committed. **NOT deployed.** LSP stays disabled until the leak is understood. | +| (bun upgrade) | Bun **1.3.13 → 1.3.14** via `bun upgrade`. New binary built at `dist/dispatch-server`. | Rebuilt. **NOT installed** to `/usr/bin/dispatch-server`. | + +### 6.1 The telemetry data you will collect (once deployed) + +The exact log tags to grep in journald are **`memory:periodic`** and +**`memory:gc`** (the module is `mem-telemetry.ts`, but the log *messages* are +those two strings — do not grep for `[mem-telemetry]`). + +- `memory:periodic` — every 15s: `rss`, `heapUsed`, `heapTotal`, `external`, + `arrayBuffers` (bytes), plus `activeConversations` (count). Correlate RSS + growth with active turns: if RSS climbs while `activeConversations` is 0, + the leak is background/idle (timers, watchers, retained closures); if it + climbs only during/after turns, it's the streaming/turn path. +- `memory:gc` — every 5 min: runs `Bun.gc(true)` and logs RSS before/after + + `reclaimedRssMB` (`before - after`). **Interpretation:** a large positive + `reclaimedRssMB` = GC freed it (fragmentation, reclaimable — not a true + leak). Near-zero/negative `reclaimedRssMB` = memory is **LIVE** (retained + objects — a real leak). This single field distinguishes the two failure + modes; check it first when you get data. + +### 6.2 Deploy sequence (hand to the user — you have no sudo) + +The agent cannot run `sudo` or install to `/usr/bin`. Write a script with +`sudo` on the privileged lines and hand it to the user (per the repo's sudo +policy). The sequence, roughly: + +1. From `backend/`: `bun run typecheck && bun run test && bun run check` — + make sure `dev` is green (it should be; these commits are committed). +2. `bin/build` — produces `dist/dispatch-server` (Bun 1.3.14, with telemetry + + LSP-disabled). +3. `bin/install` (or a targeted script) — copies `dist/dispatch-server` → + `/usr/bin/dispatch-server`, the unit template → `/etc/systemd/system/`, + `systemd/dispatch.env` → `/etc/dispatch/env`. (Or run `bin/apply-memory-limits.sh` + separately if you only want the cgroup limits without a full reinstall.) +4. `sudo systemctl daemon-reload && sudo systemctl restart dispatch`. +5. Confirm: `systemctl show dispatch -p MemoryMax` should show `24G`, and + `journalctl -u dispatch -f | rg 'memory:periodic'` should show telemetry + within 15s. + +> ⚠️ **Warning:** deploying restarts the production server. The live server is +> actively serving agent conversations. Coordinate with the user (ceb2 / +> tradam) before restarting — don't just yank it. Also note the running server +> at time of writing is PID 28956 (started Jun 28 08:51). + +--- + +## 7. Background — the original crash investigation (already done) + +You do **not** need to redo this; it's recorded for context. + +- **Original report:** `notes/server-crash-investigation.md` (commit `b83aa8d`). +- The first investigation's task description was **stale**: it claimed an + "uncommitted hot-fix that disables LSP" existed. It did not — the working + tree was clean and LSP was *enabled*. The developer then *chose* to disable + LSP (commit `73ff84c`) as a precaution, but the root cause was already + identified as the Bun segfault + leak, not LSP. +- The **three original LSP suspects** (JSON-parse TypeError, `fs.watch` ENOENT, + unbounded cache leak) were all fixed in commits `05ff256` + `f9d1ca5` and + are correct/tested. LSP being disabled now is a precaution, not a fix for the + crash. Once the leak is understood, LSP can be re-enabled safely. +- **Do not spend time re-investigating LSP.** It is exonerated. + +--- + +## 8. What needs investigation (your tasks, in order) + +1. **Deploy the telemetry + cgroup limits** (§6) — you cannot localize the leak + without the `memory:periodic` / `memory:gc` data. Get a full crash cycle's + worth of logs (let it run until it either hits `MemoryMax=24G` and restarts, + or until the growth pattern is clear). +2. **Read the telemetry first.** The `memory:gc` `reclaimedRssMB` field tells + you live-retained vs fragmentation. If near-zero after GC → real retained + leak; chase which subsystem holds it. +3. **Correlate growth with `activeConversations`.** Idle-growth ⇒ timers / + watchers / retained closures / the LSP file-watcher (if any servers still + spawn — but LSP is disabled). Turn-bound growth ⇒ the streaming/turn path + (suspects in §5.4). +4. **Examine the streaming loop** (`orchestrator.ts:1276`): is the + `provider.stream(...)` async generator fully drained on every path + (success, error, abort, `break`)? Does the AI SDK retain response buffers? + Consider adding a before/after `memoryUsage()` around a single turn to + measure per-turn delta directly. +5. **Examine the message arrays** assembled before `provider.stream` — are + they scoped to the turn and released when the turn completes, or do + closures/promises/event-listeners keep them alive? +6. **Test Bun 1.3.14 in isolation.** Since the upgrade is built but not + installed, see if a memory-stress repro under 1.3.13 vs 1.3.14 behaves + differently — the segfault may be a Bun allocator bug that 1.3.14 fixes. +7. **Propose a fix** (do not implement without coordinating — report up to the + orchestrator). Likely candidates: ensure the streaming generator is always + fully consumed / explicitly closed on error; bound per-turn message + history; release references at `activeConversations.delete` time; or + confirm it's purely a Bun bug requiring the 1.3.14 deploy. + +--- + +## 9. Quick-reference commands + +```bash +# --- status & logs --- +systemctl status dispatch +systemctl show dispatch -p MemoryMax -p MemoryHigh # expect 24G/20G after deploy +journalctl -u dispatch -f # live logs +journalctl -u dispatch -f | rg 'memory:periodic|memory:gc' # telemetry (after deploy) +journalctl -u dispatch --since '2 hours ago' -n 200 # recent history / crashes +curl http://localhost:24991/health # → {"ok":true} +ss -tlnp | rg dispatch # ports 24991 + 24990 + +# --- source (the leak path) --- +cd /home/tradam/projects/dispatch/backend +git log --oneline -n 8 # see the undeployed commits +rg -n 'runTurnDetached|provider\.stream|for await' packages/session-orchestrator/src/orchestrator.ts + +# --- build & verify (NO sudo needed) --- +bun run typecheck && bun run test && bun run check +bin/build # → dist/dispatch-server + +# --- deploy (NEEDS sudo — hand a script to the user) --- +# bin/install (or: sudo cp dist/dispatch-server /usr/bin/dispatch-server) +# bin/apply-memory-limits.sh (applies MemoryMax to live service) +# sudo systemctl daemon-reload && sudo systemctl restart dispatch +``` + +--- + +## 10. Guardrails + +- **Do NOT edit implementation code without reporting to the orchestrator.** + This is an investigation handoff; propose fixes, don't apply them + unilaterally. +- **You have no sudo.** Any step touching `/usr/bin`, `/etc/`, or + `systemctl restart` must be a script handed to the user with `sudo` on the + privileged lines. +- **Do not re-enable LSP** until the leak is understood (it's disabled as a + precaution; re-enabling is a separate decision). +- **Do not merge or push.** Work stays on `dev` locally. +- **The service is `dispatch`, not `dispatch-server`.** diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index aa114d5..70d1cb2 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -21,13 +21,20 @@ import { type SecretsAccess, type StorageNamespace, } from "@dispatch/kernel"; -import { extension as lspExt } from "@dispatch/lsp"; +// LSP temporarily disabled — crashes (unhandled JSON parse, ENOENT on +// transient .old_modules dirs) and a memory leak. Re-enable after fix. +// import { extension as lspExt } from "@dispatch/lsp"; import { extension as mcpExt } from "@dispatch/mcp"; import { extension as messageQueueExt } from "@dispatch/message-queue"; import { extension as providerConcurrencyExt } from "@dispatch/provider-concurrency"; import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat"; import { extension as providerUmansExt } from "@dispatch/provider-umans"; -import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator"; +import { + type MemorySample, + memorySampleAttributes, + extension as sessionOrchestratorExt, + sessionOrchestratorHandle, +} from "@dispatch/session-orchestrator"; import { extension as skillsExt } from "@dispatch/skills"; import { extension as sshExt } from "@dispatch/ssh"; import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite"; @@ -49,6 +56,7 @@ import type { ChildHandle } from "./collector-supervisor.js"; import { createCollectorSupervisor } from "./collector-supervisor.js"; import { configMapToAccess, envToConfigMap } from "./config.js"; import { loadExternalExtensions } from "./load-external.js"; +import { startMemoryTelemetry } from "./mem-telemetry.js"; function createEmptySecrets(): SecretsAccess { return { @@ -101,7 +109,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [ skillsExt, systemPromptExt, cacheWarmingExt, - lspExt, + // lspExt, // LSP temporarily disabled — see import above // ssh declares `dependsOn: ["exec-backend"]` and PROVIDES the remote // exec-backend factory + the ComputerService the HTTP routes delegate to. // Its lookups are lazy (tool-/request-time), but it is placed after @@ -227,10 +235,45 @@ async function boot(): Promise<void> { } } + // Periodic memory telemetry — leak-localization edge effect (AGENTS.md: + // timers are edge effects owned by host-bin, the composition root, NOT the + // kernel). Logs process.memoryUsage() every 60s tagged with the active- + // conversation count, and every 5 min runs Bun.gc(true) + logs RSS + // before/after to distinguish live retained objects from GC fragmentation. + // The per-turn before/after sampling lives in session-orchestrator; this + // owns the PERIODIC baseline. All effects are injected (no ambient state); + // stop() is cleared on shutdown so timers never leak across a restart. + let memoryTelemetry: { stop: () => void } | undefined; + let activeConvCountFn: (() => number) | undefined; + try { + const orchestrator = host.getHostAPI().getService(sessionOrchestratorHandle); + activeConvCountFn = () => orchestrator.getActiveConversationCount(); + memoryTelemetry = startMemoryTelemetry({ + logger: logger.child({ extensionId: "mem-telemetry" }), + sampleMemory: (): MemorySample => { + const m = process.memoryUsage(); + return { + rss: m.rss, + heapUsed: m.heapUsed, + heapTotal: m.heapTotal, + external: m.external, + arrayBuffers: m.arrayBuffers, + }; + }, + gc: () => Bun.gc(true), + getActiveConversationCount: () => orchestrator.getActiveConversationCount(), + }); + } catch (err) { + logger.error("Memory telemetry not started (session-orchestrator unavailable)", { + err, + }); + } + let shuttingDown = false; const shutdown = async () => { if (shuttingDown) return; shuttingDown = true; + memoryTelemetry?.stop(); logger.info("Shutting down — deactivating extensions"); await host.deactivate(); logger.info("Draining collector"); @@ -240,6 +283,38 @@ async function boot(): Promise<void> { process.on("SIGINT", shutdown); process.on("SIGTERM", shutdown); + const memorySnapshot = () => { + const m = process.memoryUsage(); + return memorySampleAttributes({ + rss: m.rss, + heapUsed: m.heapUsed, + heapTotal: m.heapTotal, + external: m.external, + arrayBuffers: m.arrayBuffers, + }); + }; + + process.on("unhandledRejection", (reason) => { + logger.error("unhandledRejection", { + err: reason, + handler: "unhandledRejection", + activeConversations: activeConvCountFn?.() ?? "unavailable", + timestamp: new Date().toISOString(), + ...memorySnapshot(), + }); + }); + + process.on("uncaughtException", (err) => { + logger.error("uncaughtException", { + err, + handler: "uncaughtException", + activeConversations: activeConvCountFn?.() ?? "unavailable", + timestamp: new Date().toISOString(), + ...memorySnapshot(), + }); + void shutdown(); + }); + logger.info("Dispatch booted"); console.info("Dispatch booted"); } diff --git a/packages/host-bin/src/mem-telemetry.test.ts b/packages/host-bin/src/mem-telemetry.test.ts new file mode 100644 index 0000000..20237ef --- /dev/null +++ b/packages/host-bin/src/mem-telemetry.test.ts @@ -0,0 +1,225 @@ +import type { Attributes, Logger } from "@dispatch/kernel"; +import type { MemorySample } from "@dispatch/session-orchestrator"; +import { describe, expect, it } from "vitest"; +import { + buildGcAttributes, + buildPeriodicAttributes, + type MemoryTelemetryDeps, + startMemoryTelemetry, +} from "./mem-telemetry.js"; + +/** Minimal capturing logger — records every info/debug/warn/error call. */ +interface CapturedLog { + readonly level: string; + readonly msg: string; + readonly attrs?: Attributes; +} + +function capturingLogger(): { logger: Logger; logs: CapturedLog[] } { + const logs: CapturedLog[] = []; + const record = (level: string) => (msg: string, attrs?: Attributes) => { + logs.push({ level, msg, attrs }); + }; + const logger: Logger = { + debug: record("debug"), + info: record("info"), + warn: record("warn"), + error: () => {}, + child: () => logger, + span: () => ({ + id: "s", + log: logger, + setAttributes: () => {}, + addLink: () => {}, + child: () => ({}) as never, + end: () => {}, + }), + }; + return { logger, logs }; +} + +const SAMPLE_A: MemorySample = { + rss: 100 * 1024 * 1024, + heapUsed: 40 * 1024 * 1024, + heapTotal: 60 * 1024 * 1024, + external: 5 * 1024 * 1024, + arrayBuffers: 2 * 1024 * 1024, +}; +const SAMPLE_B: MemorySample = { + rss: 300 * 1024 * 1024, + heapUsed: 80 * 1024 * 1024, + heapTotal: 60 * 1024 * 1024, + external: 5 * 1024 * 1024, + arrayBuffers: 6 * 1024 * 1024, +}; + +describe("buildPeriodicAttributes", () => { + it("formats the sample as MB and tags the active-conversation count", () => { + const attrs = buildPeriodicAttributes(SAMPLE_A, 3); + expect(attrs).toEqual({ + rssMB: 100, + heapUsedMB: 40, + heapTotalMB: 60, + externalMB: 5, + arrayBuffersMB: 2, + activeConversations: 3, + }); + }); +}); + +describe("buildGcAttributes", () => { + it("carries absolute after values plus reclaimed (before-after) delta", () => { + const attrs = buildGcAttributes(SAMPLE_A, SAMPLE_B); + // Absolute "after" values (SAMPLE_B). + expect(attrs.rssMB).toBe(300); + expect(attrs.heapUsedMB).toBe(80); + // Reclaimed delta: before - after is negative here (memory GREW), so + // reclaimedRssMB = round((100-300) MB) = -200. + expect(attrs.reclaimedRssMB).toBe(-200); + expect(attrs.reclaimedHeapUsedMB).toBe(-40); + expect(attrs.reclaimedHeapTotalMB).toBe(0); + expect(attrs.reclaimedArrayBuffersMB).toBe(-4); + }); + + it("shows a positive reclaimed value when GC freed memory", () => { + const attrs = buildGcAttributes(SAMPLE_B, SAMPLE_A); + expect(attrs.reclaimedRssMB).toBe(200); + }); +}); + +describe("startMemoryTelemetry", () => { + function fakeTimers(): { + setInterval: MemoryTelemetryDeps["setInterval"]; + clearInterval: MemoryTelemetryDeps["clearInterval"]; + tick: (name: "sample" | "gc") => void; + cleared: { sample: number; gc: number }; + } { + // Store the callbacks keyed by interval so we can drive either one. The + // gc interval is always larger than the sample interval, so route the + // larger ms to the gc callback. + let sampleCb: (() => void) | undefined; + let gcCb: (() => void) | undefined; + let firstMs = 0; + const cleared = { sample: 0, gc: 0 }; + return { + setInterval: (fn, ms) => { + if (firstMs === 0) { + firstMs = ms; + sampleCb = fn; + return "sample" as never; + } + // The second timer registered is the gc one (larger interval). + gcCb = fn; + return "gc" as never; + }, + clearInterval: (handle) => { + if (handle === "sample") cleared.sample++; + else if (handle === "gc") cleared.gc++; + }, + tick: (name) => { + if (name === "sample") sampleCb?.(); + else gcCb?.(); + }, + cleared, + }; + } + + it("logs a periodic sample tagged with the active-conversation count", () => { + const { logger, logs } = capturingLogger(); + let active = 2; + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => SAMPLE_A, + gc: () => {}, + getActiveConversationCount: () => active, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + timers.tick("sample"); + const periodic = logs.find((l) => l.msg === "memory:periodic"); + expect(periodic).toBeDefined(); + expect(periodic?.attrs?.rssMB).toBe(100); + expect(periodic?.attrs?.activeConversations).toBe(2); + + // The count is read live (re-evaluated each tick). + active = 5; + timers.tick("sample"); + const periodic2 = logs.filter((l) => l.msg === "memory:periodic"); + expect(periodic2).toHaveLength(2); + expect(periodic2[1]?.attrs?.activeConversations).toBe(5); + + handle.stop(); + expect(timers.cleared.sample).toBe(1); + expect(timers.cleared.gc).toBe(1); + }); + + it("runs gc and logs RSS before/after on the gc interval", () => { + const { logger, logs } = capturingLogger(); + let calls = 0; + const samples = [SAMPLE_A, SAMPLE_B]; + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => samples[calls++] as MemorySample, + gc: () => {}, + getActiveConversationCount: () => 0, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + timers.tick("gc"); + const gcLog = logs.find((l) => l.msg === "memory:gc"); + expect(gcLog).toBeDefined(); + // before=SAMPLE_A (call 0), after=SAMPLE_B (call 1) — rss grew 100→300. + expect(gcLog?.attrs?.rssMB).toBe(300); + expect(gcLog?.attrs?.reclaimedRssMB).toBe(-200); + + handle.stop(); + }); + + it("actually calls the injected gc function", () => { + const { logger } = capturingLogger(); + let gcCalls = 0; + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => SAMPLE_A, + gc: () => { + gcCalls++; + }, + getActiveConversationCount: () => 0, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + timers.tick("gc"); + expect(gcCalls).toBe(1); + // Periodic ticks do NOT trigger gc. + timers.tick("sample"); + expect(gcCalls).toBe(1); + + handle.stop(); + }); + + it("stop is idempotent (clearing twice is harmless)", () => { + const { logger } = capturingLogger(); + const timers = fakeTimers(); + const handle = startMemoryTelemetry({ + logger, + sampleMemory: () => SAMPLE_A, + gc: () => {}, + getActiveConversationCount: () => 0, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + + handle.stop(); + handle.stop(); + // Both timers cleared exactly once each (second stop is a no-op on the + // same handles — clear count stays at 1 per handle). + expect(timers.cleared.sample).toBe(1); + expect(timers.cleared.gc).toBe(1); + }); +}); diff --git a/packages/host-bin/src/mem-telemetry.ts b/packages/host-bin/src/mem-telemetry.ts new file mode 100644 index 0000000..576347b --- /dev/null +++ b/packages/host-bin/src/mem-telemetry.ts @@ -0,0 +1,160 @@ +/** + * Periodic memory telemetry — leak-localization edge effect. + * + * Owns the timers (setInterval) that periodically log process.memoryUsage() + * so RSS growth can be correlated with active conversations/turns and the + * leaking subsystem pinpointed. This is the composition-root edge effect + * (AGENTS.md: timers are edge effects owned by host-bin, NOT the kernel). + * + * All effects are injected (sampler, GC, clock, logger, active-conversation + * count) so the timer logic is fully testable — tests pass fakes and assert + * the emitted log calls, never waiting on a real wall clock. No ambient + * state (P3): the timers are owned explicitly and returned as a `stop()` + * handle that the composition root clears on shutdown. + * + * The per-turn before/after sampling lives in session-orchestrator (it wraps + * the stream boundary); this module owns the PERIODIC baseline + GC logging. + */ + +import type { Logger } from "@dispatch/kernel"; +import { + type MemorySample, + memoryDelta, + memorySampleAttributes, +} from "@dispatch/session-orchestrator"; + +/** Default periodic sample interval: every 15s. */ +export const DEFAULT_MEMORY_SAMPLE_INTERVAL_MS = 15_000; + +/** Default GC interval: every 5 min (longer than the sample interval). */ +export const DEFAULT_GC_INTERVAL_MS = 5 * 60_000; + +/** + * Deps injected into {@link startMemoryTelemetry}. Every effect is explicit so + * the timer logic is reproducible from its inputs (no ambient state, P3) and + * testable without a real clock or process. + */ +export interface MemoryTelemetryDeps { + /** Logger (auto-scoped to host-bin by the composition root). */ + readonly logger: Logger; + /** Edge effect: capture a {@link MemorySample} now (process.memoryUsage()). */ + readonly sampleMemory: () => MemorySample; + /** + * Edge effect: run a full GC cycle. In production this is `() => + * Bun.gc(true)`; tests pass a no-op or a counting fake. Used on the longer + * GC interval to distinguish live retained objects from GC fragmentation. + */ + readonly gc: () => void; + /** + * The number of conversations currently driving a turn (from the + * session-orchestrator's activeConversations set). Tags each periodic + * sample so growth can be attributed to the streaming/turn path vs an idle + * baseline. + */ + readonly getActiveConversationCount: () => number; + /** Periodic sample interval (ms). Defaults to 15s. */ + readonly sampleIntervalMs?: number; + /** GC interval (ms). Defaults to 5 min. */ + readonly gcIntervalMs?: number; + /** + * Injected timer scheduler (defaults to global setInterval). Tests pass a + * fake to drive ticks deterministically without real wall-clock waits. The + * handle type is opaque (the same type {@link clearInterval} accepts). + */ + readonly setInterval?: (fn: () => void, ms: number) => MemoryTimerHandle; + /** Injected timer clearer (defaults to global clearInterval). */ + readonly clearInterval?: (handle: MemoryTimerHandle | undefined) => void; +} + +/** Opaque timer handle shared by {@link MemoryTelemetryDeps.setInterval} / clearInterval. */ +export type MemoryTimerHandle = ReturnType<typeof globalThis.setInterval>; + +/** Handle returned by {@link startMemoryTelemetry} to stop the timers. */ +export interface MemoryTelemetryHandle { + /** Stop both timers. Idempotent. Called by the composition root on shutdown. */ + readonly stop: () => void; +} + +/** + * Start periodic memory telemetry. Logs process.memoryUsage() every + * `sampleIntervalMs` (default 15s) tagged with the active-conversation count, + * and every `gcIntervalMs` (default 5 min) runs `gc()` and logs RSS + * before/after to distinguish live retained objects from GC fragmentation. + * + * Returns a `stop()` handle that clears both timers. The composition root + * (host-bin main) owns this handle and calls `stop()` on shutdown so the + * timers never leak across a restart. + * + * Pure decision logic: {@link buildPeriodicAttributes} / + * {@link buildGcAttributes} are exported separately for unit testing. + */ +export function startMemoryTelemetry(deps: MemoryTelemetryDeps): MemoryTelemetryHandle { + const sampleIntervalMs = deps.sampleIntervalMs ?? DEFAULT_MEMORY_SAMPLE_INTERVAL_MS; + const gcIntervalMs = deps.gcIntervalMs ?? DEFAULT_GC_INTERVAL_MS; + const setIntervalFn = deps.setInterval ?? globalThis.setInterval; + const clearIntervalFn = deps.clearInterval ?? globalThis.clearInterval; + + let gcHandle: MemoryTimerHandle | undefined; + + // Periodic sample: log rss/heap/external/arrayBuffers + active-conversation + // count every 15s. Correlates RSS growth with active turns so the leak can + // be attributed to the streaming path vs an idle baseline. + const sampleHandle: MemoryTimerHandle | undefined = setIntervalFn(() => { + const sample = deps.sampleMemory(); + const activeConversations = deps.getActiveConversationCount(); + deps.logger.info("memory:periodic", buildPeriodicAttributes(sample, activeConversations)); + }, sampleIntervalMs); + + // GC log: on a longer interval, force a full GC and log RSS before/after. + // A small/no drop after gc means the memory is LIVE (retained objects — the + // leak); a large drop means it was GC fragmentation (reclaimable). This + // distinguishes the two failure modes the crash investigation flagged. + gcHandle = setIntervalFn(() => { + const before = deps.sampleMemory(); + deps.gc(); + const after = deps.sampleMemory(); + deps.logger.info("memory:gc", buildGcAttributes(before, after)); + }, gcIntervalMs); + + let stopped = false; + return { + stop() { + if (stopped) return; // idempotent — safe to call on every shutdown path + stopped = true; + clearIntervalFn(sampleHandle); + clearIntervalFn(gcHandle); + }, + }; +} + +/** + * Pure: build the logger attributes for a periodic sample. Exported for unit + * testing (no I/O, no clock). The `activeConversations` count tags the sample + * so growth can be attributed to the streaming/turn path vs idle baseline. + */ +export function buildPeriodicAttributes( + sample: MemorySample, + activeConversations: number, +): ReturnType<typeof memorySampleAttributes> & { activeConversations: number } { + return { ...memorySampleAttributes(sample), activeConversations }; +} + +/** + * Pure: build the logger attributes for a GC sample. Exported for unit testing. + * Carries the absolute after-sample plus the `reclaimed` delta + * (`before - after`): a POSITIVE `reclaimedRssMB` means GC freed memory + * (fragmentation, reclaimable); near-zero/negative means the memory is LIVE + * (retained objects — the leak). This distinguishes the two failure modes the + * crash investigation flagged. + */ +export function buildGcAttributes( + before: MemorySample, + after: MemorySample, +): ReturnType<typeof memorySampleAttributes> { + // reclaimed = before - after (how much GC freed). memoryDelta(a, b) = b - a, + // so pass (after, before) to get before - after. + return { + ...memorySampleAttributes(after), + ...memorySampleAttributes(memoryDelta(after, before), "reclaimed"), + }; +} diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index 783d894..777feaa 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -65,6 +65,19 @@ export function activate(host: HostAPI): void { logger: host.logger, now: () => Date.now(), emit: (hook, payload) => host.emit(hook, payload), + // Injected process.memoryUsage() sampler — the production edge. Tests + // inject a fake to assert per-turn before/after telemetry. Wired in the + // shell (like `now: () => Date.now()`); pure decision logic is untouched. + sampleMemory: () => { + const m = process.memoryUsage(); + return { + rss: m.rss, + heapUsed: m.heapUsed, + heapTotal: m.heapTotal, + external: m.external, + arrayBuffers: m.arrayBuffers, + }; + }, resolveQueue: () => { // Lazily resolve the message-queue service. Returns undefined when the // extension isn't loaded (feature degrades off) — checked via the diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts index cc41fa8..3d3e816 100644 --- a/packages/session-orchestrator/src/index.ts +++ b/packages/session-orchestrator/src/index.ts @@ -39,6 +39,9 @@ export { defaultDispatchPolicy, delayFor, generateTurnId, + type MemorySample, + memoryDelta, + memorySampleAttributes, RETRY_BUDGET_MS, RETRY_SCHEDULE_MS, RETRY_TAIL_MS, diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index 076ad51..400ca0b 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -4,7 +4,10 @@ import type { AgentEvent, ChatMessage, EventHookDescriptor, + LogDeps, Logger, + LogRecord, + LogSink, ProviderContract, ProviderEvent, ProviderStreamOptions, @@ -15,7 +18,7 @@ import type { ToolContract, TurnMetrics, } from "@dispatch/kernel"; -import { runTurn } from "@dispatch/kernel"; +import { createLogger, runTurn } from "@dispatch/kernel"; import type { SystemPromptService } from "@dispatch/system-prompt"; import { describe, expect, it } from "vitest"; import { @@ -27,6 +30,7 @@ import { type TurnLifecyclePayload, type WarmCompletedPayload, } from "./orchestrator.js"; +import type { MemorySample } from "./pure.js"; import type { ToolAssembly } from "./tools-filter.js"; function createInMemoryStore(): ConversationStore & { @@ -3980,3 +3984,135 @@ describe("system prompt: compaction flow", () => { expect(capturedSystemPrompt?.startsWith("RECONSTRUCTED")).toBe(false); }); }); + +describe("per-turn memory telemetry", () => { + function capturingLogger(): { logger: Logger; records: LogRecord[] } { + let id = 0; + const deps: LogDeps = { now: () => 1000 + id++, newId: () => `id-${id++}` }; + const records: LogRecord[] = []; + const sink: LogSink = { emit: (r) => records.push(r) }; + return { logger: createLogger({ extensionId: "session-orchestrator" }, sink, deps), records }; + } + + it("logs before/after samples around the stream, tagged with conversationId + turnId", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captured, captureRunTurn } = createCapturingRunTurn(); + const { logger, records } = capturingLogger(); + + const samples: MemorySample[] = [ + { rss: 100, heapUsed: 10, heapTotal: 20, external: 1, arrayBuffers: 0 }, + { rss: 250, heapUsed: 30, heapTotal: 20, external: 1, arrayBuffers: 5 }, + ]; + let sampleIdx = 0; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: captureRunTurn, + logger, + sampleMemory: () => samples[sampleIdx++] as MemorySample, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-mem", + text: "hi", + onEvent: () => {}, + }); + + expect(captured).toHaveLength(1); + const beforeLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:before"); + const afterLogs = records.filter((r) => r.kind === "log" && r.msg === "memory:turn:after"); + expect(beforeLogs).toHaveLength(1); + expect(afterLogs).toHaveLength(1); + + // Both samples carry the turn's conversationId + turnId correlation. + const turnId = captured[0]?.turnId; + expect(turnId).toMatch(/^turn-/); + const before = beforeLogs[0] as Extract<LogRecord, { kind: "log" }>; + const after = afterLogs[0] as Extract<LogRecord, { kind: "log" }>; + expect(before.conversationId).toBe("conv-mem"); + expect(before.turnId).toBe(turnId); + expect(after.conversationId).toBe("conv-mem"); + expect(after.turnId).toBe(turnId); + + // Before sample carries absolute MB values. + expect(before.attributes?.rssMB).toBe(0); // 100 bytes rounds to 0 MB + // After sample carries absolute + delta (delta rss = 150 bytes → 0 MB). + expect(after.attributes?.rssMB).toBe(0); + // deltaRssMB is the rounded delta (150 bytes → 0 MB). + expect(after.attributes).toHaveProperty("deltaRssMB"); + }); + + it("emits no memory logs when sampleMemory is not injected (degrades off)", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captureRunTurn } = createCapturingRunTurn(); + const { logger, records } = capturingLogger(); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: captureRunTurn, + logger, + // sampleMemory intentionally omitted + }); + + await orchestrator.handleMessage({ + conversationId: "conv-no-mem", + text: "hi", + onEvent: () => {}, + }); + + const memLogs = records.filter( + (r) => r.kind === "log" && typeof r.msg === "string" && r.msg.startsWith("memory:turn"), + ); + expect(memLogs).toHaveLength(0); + }); + + it("getActiveConversationCount tracks in-flight turns", async () => { + const store = createInMemoryStore(); + const result: RunTurnResult = { + messages: [{ role: "assistant", chunks: [{ type: "text", text: "ok" }] }], + usage: { inputTokens: 1, outputTokens: 1 }, + finishReason: "stop", + }; + // A runTurn that blocks until the test releases it — keeps the turn + // active so getActiveConversationCount reflects an in-flight turn. + let release: () => void = () => {}; + const blocked = new Promise<void>((resolve) => { + release = resolve; + }); + const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => { + await blocked; + return result; + }; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => ({ id: "p", stream: async function* () {} }), + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: blockingRunTurn, + }); + + expect(orchestrator.getActiveConversationCount()).toBe(0); + const done = orchestrator.handleMessage({ + conversationId: "conv-active", + text: "hi", + onEvent: () => {}, + }); + // Give the detached turn a tick to register as active. + await Promise.resolve(); + await Promise.resolve(); + expect(orchestrator.getActiveConversationCount()).toBe(1); + + release(); + await done; + expect(orchestrator.getActiveConversationCount()).toBe(0); + }); +}); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 4c6a673..aaf418a 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -30,6 +30,9 @@ import { defaultDispatchPolicy, delayFor, generateTurnId, + type MemorySample, + memoryDelta, + memorySampleAttributes, resolveModelName, resolveReasoningEffort, } from "./pure.js"; @@ -331,6 +334,13 @@ export interface SessionOrchestrator { subscribe(conversationId: string, listener: TurnEventListener): () => void; isActive(conversationId: string): boolean; /** + * The number of conversations currently driving a turn (in the + * `activeConversations` set). Used by host-bin's periodic memory telemetry + * to tag each RSS sample with the active-conversation count, so growth can + * be attributed to the streaming/turn path vs an idle baseline. + */ + getActiveConversationCount(): number; + /** * Explicitly close a conversation (the user closed its tab — distinct from a * socket disconnect, which never touches the turn): aborts any in-flight turn * (the kernel finishes with `finishReason: "aborted"`, partial messages are @@ -431,6 +441,16 @@ export interface SessionOrchestratorDeps { readonly logger?: Logger; /** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */ readonly now?: () => number; + /** + * Optional process.memoryUsage() sampler, injected for testability. When + * present, the orchestrator captures a sample immediately before and after + * each turn's stream completes (`deps.runTurn`) and logs the per-turn delta + * tagged with conversationId + turnId — correlating RSS growth with the + * streaming path (the prime leak suspect). When absent (undefined), no + * per-turn memory telemetry is emitted (feature degrades off cleanly). + * Pure decision logic stays unchanged; this is additive observability. + */ + readonly sampleMemory?: () => MemorySample; /** Emit a lifecycle event hook to subscribers. Injected from host. */ readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void; } @@ -886,6 +906,18 @@ export function createSessionOrchestrator( // FE to syncTail during generation (CR-6). await deps.conversationStore.append(conversationId, [userMsg]); + // Per-turn memory telemetry: capture a sample immediately BEFORE the + // stream starts. Paired with the post-stream sample below, this + // measures the streaming path's memory footprint per turn (the prime + // leak suspect — AI-SDK streaming buffers + per-turn message arrays). + // Tagged with conversationId + turnId via the turnLogger's correlation + // context. Additive observability only — does not alter the stream. + const sampleMem = deps.sampleMemory; + const memBefore = sampleMem?.(); + if (memBefore !== undefined) { + turnLogger?.debug("memory:turn:before", memorySampleAttributes(memBefore)); + } + let stepsPersisted = false; const result = await deps.runTurn({ ...opts, @@ -899,6 +931,23 @@ export function createSessionOrchestrator( }, }); + // Per-turn memory telemetry: capture a sample immediately AFTER the + // stream completes and log the per-turn delta vs `memBefore`. A + // positive rss delta on a sealed turn flags memory retained by the + // streaming path (the leak we are localizing). No I/O beyond the + // injected sampler; pure delta computation via memoryDelta(). The + // delta attributes carry a `delta` prefix so the absolute "after" + // values and the per-turn delta coexist without key collision. + if (memBefore !== undefined) { + const memAfter = sampleMem?.(); + if (memAfter !== undefined) { + turnLogger?.info("memory:turn:after", { + ...memorySampleAttributes(memAfter), + ...memorySampleAttributes(memoryDelta(memBefore, memAfter), "delta"), + }); + } + } + // Fallback: if onStepComplete was never called (e.g., a fake // runTurn in tests), persist all result messages as a batch. if (!stepsPersisted && result.messages.length > 0) { @@ -1042,6 +1091,10 @@ export function createSessionOrchestrator( return activeTurns.has(conversationId); }, + getActiveConversationCount() { + return activeConversations.size; + }, + closeConversation(conversationId) { const turn = activeTurns.get(conversationId); const abortedTurn = turn !== undefined; diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts index 7a574f1..d4fe84a 100644 --- a/packages/session-orchestrator/src/pure.test.ts +++ b/packages/session-orchestrator/src/pure.test.ts @@ -6,6 +6,9 @@ import { defaultDispatchPolicy, delayFor, generateTurnId, + type MemorySample, + memoryDelta, + memorySampleAttributes, RETRY_BUDGET_MS, RETRY_SCHEDULE_MS, RETRY_TAIL_MS, @@ -194,3 +197,71 @@ describe("retry backoff schedule (delayFor)", () => { expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true); }); }); + +describe("memorySampleAttributes", () => { + const sample: MemorySample = { + rss: 100 * 1024 * 1024, // 100 MB + heapUsed: 40 * 1024 * 1024, + heapTotal: 60 * 1024 * 1024, + external: 5 * 1024 * 1024, + arrayBuffers: 2 * 1024 * 1024, + }; + + it("formats fields as rounded MB with no prefix", () => { + const attrs = memorySampleAttributes(sample); + expect(attrs).toEqual({ + rssMB: 100, + heapUsedMB: 40, + heapTotalMB: 60, + externalMB: 5, + arrayBuffersMB: 2, + }); + }); + + it("namespaces keys with the given prefix", () => { + const attrs = memorySampleAttributes(sample, "delta"); + expect(attrs).toEqual({ + deltaRssMB: 100, + deltaHeapUsedMB: 40, + deltaHeapTotalMB: 60, + deltaExternalMB: 5, + deltaArrayBuffersMB: 2, + }); + }); + + it("rounds fractional MB", () => { + const attrs = memorySampleAttributes({ ...sample, rss: 100.6 * 1024 * 1024 }); + expect(attrs.rssMB).toBe(101); + }); +}); + +describe("memoryDelta", () => { + const before: MemorySample = { + rss: 200 * 1024 * 1024, + heapUsed: 100 * 1024 * 1024, + heapTotal: 150 * 1024 * 1024, + external: 10 * 1024 * 1024, + arrayBuffers: 4 * 1024 * 1024, + }; + const after: MemorySample = { + rss: 350 * 1024 * 1024, + heapUsed: 120 * 1024 * 1024, + heapTotal: 150 * 1024 * 1024, + external: 10 * 1024 * 1024, + arrayBuffers: 8 * 1024 * 1024, + }; + + it("computes signed after - before per field", () => { + const delta = memoryDelta(before, after); + expect(delta.rss).toBe(150 * 1024 * 1024); + expect(delta.heapUsed).toBe(20 * 1024 * 1024); + expect(delta.heapTotal).toBe(0); + expect(delta.external).toBe(0); + expect(delta.arrayBuffers).toBe(4 * 1024 * 1024); + }); + + it("is negative when memory dropped", () => { + const delta = memoryDelta(after, before); + expect(delta.rss).toBe(-150 * 1024 * 1024); + }); +}); diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts index 0d2068f..489b140 100644 --- a/packages/session-orchestrator/src/pure.ts +++ b/packages/session-orchestrator/src/pure.ts @@ -1,4 +1,5 @@ import type { + Attributes, ChatMessage, Chunk, ImageInput, @@ -134,3 +135,71 @@ export function defaultDispatchPolicy(): ToolDispatchPolicy { export function generateTurnId(): string { return `turn-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; } + +// ── Memory telemetry (leak localization) ──────────────────────────────────── +// +// Pure helpers for process.memoryUsage() sampling. The orchestrator owns the +// sample SHAPE (this type) so its per-turn sampling and the host-bin periodic +// timer share one contract without a cross-package import of an +// implementation — host-bin imports this type, the orchestrator never imports +// host-bin. Pure: inputs → attributes/delta, no I/O, no clock. + +/** + * A snapshot of process.memoryUsage() at one instant. Mirrors the subset of + * Node/Bun's MemoryUsage we log for leak localization (rss, heapUsed, + * heapTotal, external, arrayBuffers). Owned here so the orchestrator's + * per-turn sampling and the host-bin periodic timer agree on the shape. + */ +export interface MemorySample { + readonly rss: number; + readonly heapUsed: number; + readonly heapTotal: number; + readonly external: number; + readonly arrayBuffers: number; +} + +const BYTES_PER_MB = 1024 * 1024; + +function mb(bytes: number): number { + return Math.round(bytes / BYTES_PER_MB); +} + +/** + * Pure: format a {@link MemorySample} as flat logger {@link Attributes} + * (values in MB, rounded). Flat scalars are serializable (D3) and queryable + * (D9) in the journal. No I/O. + * + * Pass a `prefix` to namespace the keys — e.g. `memorySampleAttributes(delta, + * "delta")` yields `deltaRssMB`, so an "after" log can carry both the absolute + * sample (`rssMB`) and the per-turn delta (`deltaRssMB`) without key collision. + * The first letter of each field is capitalized after the prefix for + * readability (`deltaRssMB`, not `deltarssMB`). + */ +export function memorySampleAttributes(sample: MemorySample, prefix?: string): Attributes { + const p = prefix === undefined ? "" : prefix; + const cap = (s: string): string => + s.length === 0 ? s : `${s[0]?.toUpperCase() ?? ""}${s.slice(1)}`; + const field = (name: string): string => (p.length === 0 ? name : `${p}${cap(name)}`); + return { + [field("rssMB")]: mb(sample.rss), + [field("heapUsedMB")]: mb(sample.heapUsed), + [field("heapTotalMB")]: mb(sample.heapTotal), + [field("externalMB")]: mb(sample.external), + [field("arrayBuffersMB")]: mb(sample.arrayBuffers), + }; +} + +/** + * Pure: compute the signed per-field delta `after - before`. A positive + * `rss` delta on a sealed turn flags memory retained by the streaming path + * (the prime leak suspect). No I/O. + */ +export function memoryDelta(before: MemorySample, after: MemorySample): MemorySample { + return { + rss: after.rss - before.rss, + heapUsed: after.heapUsed - before.heapUsed, + heapTotal: after.heapTotal - before.heapTotal, + external: after.external - before.external, + arrayBuffers: after.arrayBuffers - before.arrayBuffers, + }; +} diff --git a/packages/ssh/src/pool.test.ts b/packages/ssh/src/pool.test.ts new file mode 100644 index 0000000..b5d72c8 --- /dev/null +++ b/packages/ssh/src/pool.test.ts @@ -0,0 +1,176 @@ +/** + * Unit tests for the pooled ssh2.Client error lifecycle in pool.ts. + * + * The ssh2.Client (the outermost network edge) is faked via EventEmitter — + * permitted by the constitution (mocking the outermost edge is fine; only + * `@dispatch/*` may not be mocked). These tests prove the permanent + * `'error'` listener survives `cleanup()` (which only removes the connect-time + * onReady/onError) and that a post-connect ssh2 error is captured, logged, and + * non-throwing, with the connection reconnecting on the next acquire. + */ + +import { EventEmitter } from "node:events"; +import type { Logger } from "@dispatch/kernel"; +import type { Computer } from "@dispatch/wire"; +import type { Client } from "ssh2"; +import { afterEach, describe, expect, it } from "vitest"; +import { createSshConnectionPool } from "./pool.js"; + +const computer: Computer = { + alias: "testremote", + hostName: "localhost", + port: 22, + user: "testuser", + identityFile: null, + knownHost: false, +}; + +interface CapturedError { + msg: string; + err: unknown; + alias?: string; + message?: string; + level?: string; + from?: string; + to?: string; +} + +function capturingLogger(calls: CapturedError[]): Logger { + const log: Logger = { + debug: () => undefined, + info: () => undefined, + warn: () => undefined, + error: (msg, attrs) => { + calls.push({ + msg, + err: attrs?.err, + alias: attrs?.alias as string | undefined, + message: attrs?.message as string | undefined, + level: attrs?.level as string | undefined, + from: attrs?.from as string | undefined, + to: attrs?.to as string | undefined, + }); + }, + child: () => log, + span: (name) => ({ + id: name, + log, + setAttributes: () => undefined, + addLink: () => undefined, + child: (n) => ({ id: n, log }) as never, + end: () => undefined, + }), + }; + return log; +} + +/** A fake ssh2.Client: EventEmitter + connect/emd/sftp. `connect` emits 'ready'. */ +function fakeClient(): { client: Client; ee: EventEmitter } { + const ee = new EventEmitter(); + const api = { + connect: () => { + process.nextTick(() => ee.emit("ready")); + }, + end: () => undefined, + sftp: (cb: (err: Error | null, sftp: unknown) => void) => cb(null, {}), + }; + return { client: Object.assign(ee, api) as unknown as Client, ee }; +} + +function makeDeps(client: Client, logger: Logger) { + return { + logger, + homeDir: "/tmp", + knownHostsPath: "/tmp/known_hosts", + readFileText: async (p: string): Promise<string> => { + if (p === "/tmp/known_hosts") return ""; + return "ssh-ed25519 not-encrypted-key"; + }, + appendKnownHosts: async () => undefined, + pathExists: async () => true, + newClient: () => client, + resolveComputer: async () => computer, + }; +} + +describe("SshConnectionPool — permanent pooled-client error listener", () => { + let pool: ReturnType<typeof createSshConnectionPool>; + + afterEach(async () => { + await pool?.closeAll(); + }); + + it("captures a post-connect 'error' without throwing, sets state=error, and logs", async () => { + const errorCalls: CapturedError[] = []; + const { client, ee } = fakeClient(); + pool = createSshConnectionPool(makeDeps(client, capturingLogger(errorCalls))); + + const conn = await pool.acquire("testremote"); + expect(conn.state).toBe("connected"); + + const sshErr = Object.assign(new Error("Timed out while waiting for handshake"), { + level: "socket", + }); + + expect(() => ee.emit("error", sshErr)).not.toThrow(); + + expect(conn.state).toBe("error"); + expect(conn.error).toBe("Timed out while waiting for handshake"); + expect(errorCalls).toHaveLength(1); + expect(errorCalls[0]?.msg).toBe("ssh: pooled client error"); + expect(errorCalls[0]?.alias).toBe("testremote"); + expect(errorCalls[0]?.message).toBe("Timed out while waiting for handshake"); + expect(errorCalls[0]?.level).toBe("socket"); + expect(errorCalls[0]?.from).toBe("connected"); + expect(errorCalls[0]?.to).toBe("error"); + }); + + it("does NOT remove the permanent listener on successful connect (survives cleanup)", async () => { + const errorCalls: CapturedError[] = []; + const { client, ee } = fakeClient(); + pool = createSshConnectionPool(makeDeps(client, capturingLogger(errorCalls))); + + const conn = await pool.acquire("testremote"); + expect(conn.state).toBe("connected"); + + ee.emit("error", new Error("post-connect drop")); + expect(conn.state).toBe("error"); + expect(errorCalls).toHaveLength(1); + }); + + it("reconnects on the next acquire after a post-connect error", async () => { + const { client, ee } = fakeClient(); + pool = createSshConnectionPool(makeDeps(client, capturingLogger([]))); + + const conn = await pool.acquire("testremote"); + expect(conn.state).toBe("connected"); + + ee.emit("error", new Error("post-connect drop")); + expect(conn.state).toBe("error"); + + const conn2 = await pool.acquire("testremote"); + expect(conn2.state).toBe("connected"); + expect(conn2.error).toBeUndefined(); + }); + + it("logs the connect-time error too (permanent listener fires during handshake)", async () => { + const errorCalls: CapturedError[] = []; + const ee = new EventEmitter(); + const api = { + connect: () => { + process.nextTick(() => ee.emit("error", new Error("connect refused"))); + }, + end: () => undefined, + sftp: (cb: (err: Error | null, sftp: unknown) => void) => cb(null, {}), + }; + const client = Object.assign(ee, api) as unknown as Client; + pool = createSshConnectionPool(makeDeps(client, capturingLogger(errorCalls))); + + const conn = await pool.acquire("testremote"); + expect(conn.state).toBe("error"); + expect(conn.error).toBe("connect refused"); + expect(errorCalls).toHaveLength(1); + expect(errorCalls[0]?.msg).toBe("ssh: pooled client error"); + expect(errorCalls[0]?.message).toBe("connect refused"); + }); +}); diff --git a/packages/ssh/src/pool.ts b/packages/ssh/src/pool.ts index 9acae0e..5d1eedd 100644 --- a/packages/ssh/src/pool.ts +++ b/packages/ssh/src/pool.ts @@ -112,6 +112,24 @@ export function createSshConnectionPool(deps: SshPoolDeps): SshConnectionPool { let sftp: import("ssh2").SFTPWrapper | null = null; let connectPromise: Promise<void> | null = null; + // Permanent error listener — without it, a post-connect ssh2 'error' + // (re-key/keepalive timeout) escapes uncaught → process crash. cleanup() + // in doConnect only removes the connect-time onReady/onError; this persists. + client.on("error", (err: unknown) => { + const from = state.value; + state.value = "error"; + state.error = err instanceof Error ? err.message : String(err); + connectPromise = null; + deps.logger.error("ssh: pooled client error", { + err, + alias, + message: state.error, + level: (err as { level?: string } | null)?.level, + from, + to: "error", + }); + }); + const touch = (): void => { const e = entries.get(alias); if (e !== undefined) e.lastUsedAt = Date.now(); diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts index 5bf16b3..cfb7166 100644 --- a/packages/transport-http/src/extension.ts +++ b/packages/transport-http/src/extension.ts @@ -11,6 +11,7 @@ import { credentialStoreHandle, heartbeatServiceHandle, lspServiceHandle, + type LspService, mcpServiceHandle, sessionOrchestratorHandle, systemPromptHandle, @@ -27,7 +28,6 @@ export const manifest: Manifest = { "conversation-store", "credential-store", "heartbeat", - "lsp", "mcp", "session-orchestrator", "throughput-store", @@ -96,7 +96,14 @@ export function createTransportHttpExtension(): Extension & { const throughputStore = host.getService(throughputStoreHandle); const warmService = host.getService(cacheWarmHandle); const compactionService = host.getService(compactionHandle); - const lspService = host.getService(lspServiceHandle); + // Optional: the `lsp` extension may be disabled (hot-fix). Wrapped because + // getService throws for an unregistered handle — degrades to no diagnostics. + let lspService: LspService | undefined; + try { + lspService = host.getService(lspServiceHandle); + } catch { + lspService = undefined; + } const mcpService = host.getService(mcpServiceHandle); const systemPromptService = host.getService(systemPromptHandle); const heartbeatService = host.getService(heartbeatServiceHandle); @@ -129,7 +136,7 @@ export function createTransportHttpExtension(): Extension & { throughputStore, warmService, compactionService, - lspService, + ...(lspService !== undefined ? { lspService } : {}), mcpService, systemPromptService, heartbeatService, diff --git a/systemd/dispatch.service b/systemd/dispatch.service index e651fa7..42b5a29 100644 --- a/systemd/dispatch.service +++ b/systemd/dispatch.service @@ -10,6 +10,12 @@ EnvironmentFile=/etc/dispatch/env WorkingDirectory=/var/lib/dispatch Restart=on-failure RestartSec=5 +# Memory-pressure circuit breaker: the server has a memory leak (~2.5 GB/h) +# that eventually triggers a Bun runtime segfault. These cgroup limits turn +# the uncontrolled crash into a controlled OOM-kill → clean restart. +# Machine has 33.24 GB total; 24G hard cap leaves OS headroom. +MemoryHigh=20G +MemoryMax=24G StandardOutput=journal StandardError=journal |
