summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-25 18:36:08 +0900
committerAdam Malczewski <[email protected]>2026-06-25 18:36:08 +0900
commitde022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc (patch)
tree041dcb1017e544a405526443cb578baa974bec0e
parentfc1c3a54c3075990ec0dd0f97901bd46fe142923 (diff)
parent649fc4f66f40f7743683546f81d3320e7394e597 (diff)
downloaddispatch-de022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc.tar.gz
dispatch-de022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc.zip
Merge branch 'dev' into feature/ssh-support
Brings dev's retry-with-backoff (the transient `provider-retry` AgentEvent the web frontend consumes) + the LSP-dead-server per-edit-hang fix into the SSH feature branch, alongside the SSH waves 0-5c. All code files auto-merged cleanly (run-turn.ts, orchestrator.ts, runtime.ts, wire/index.ts, tool-edit-file/extension.ts, run-turn.test.ts — both computerId threading and retry-with-backoff coexist). Only tasks.md conflicted (status section — orchestrator-resolved; both feature sections kept). Verified post-merge: tsc -b EXIT 0, biome clean (391 files), 1730 vitest pass +6 sshd-integration skipped (was 1690; +40 from dev's retry/LSP tests). Wire dist rebuilt so the FE can re-sync the pinned @dispatch/wire dep and pick up BOTH provider-retry AND the SSH Computer/defaultComputerId types. No merge or push (into dev or otherwise).
-rw-r--r--.skills/ORCHESTRATOR.md443
-rw-r--r--notes/conv-list-by-worktree-research.md247
-rw-r--r--notes/retry-with-backoff-plan.md138
-rw-r--r--packages/cli/src/args.test.ts68
-rw-r--r--packages/cli/src/args.ts22
-rw-r--r--packages/cli/src/http.test.ts30
-rw-r--r--packages/cli/src/http.ts2
-rw-r--r--packages/cli/src/main.ts19
-rw-r--r--packages/kernel/src/contracts/events.ts1
-rw-r--r--packages/kernel/src/contracts/index.ts2
-rw-r--r--packages/kernel/src/contracts/runtime.ts44
-rw-r--r--packages/kernel/src/runtime/events.ts14
-rw-r--r--packages/kernel/src/runtime/index.ts1
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts535
-rw-r--r--packages/kernel/src/runtime/run-turn.ts167
-rw-r--r--packages/lsp/src/aggregate.test.ts141
-rw-r--r--packages/lsp/src/aggregate.ts80
-rw-r--r--packages/lsp/src/client.test.ts146
-rw-r--r--packages/lsp/src/client.ts153
-rw-r--r--packages/lsp/src/diagnostics.ts2
-rw-r--r--packages/lsp/src/extension.ts54
-rw-r--r--packages/lsp/src/manager.test.ts88
-rw-r--r--packages/lsp/src/manager.ts13
-rw-r--r--packages/lsp/src/rpc.test.ts26
-rw-r--r--packages/lsp/src/rpc.ts29
-rw-r--r--packages/lsp/src/tool.test.ts97
-rw-r--r--packages/lsp/src/tool.ts43
-rw-r--r--packages/session-orchestrator/src/index.ts6
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts36
-rw-r--r--packages/session-orchestrator/src/pure.test.ts61
-rw-r--r--packages/session-orchestrator/src/pure.ts47
-rw-r--r--packages/tool-edit-file/src/extension.ts5
-rw-r--r--packages/wire/src/index.ts26
-rw-r--r--tasks.md40
34 files changed, 2272 insertions, 554 deletions
diff --git a/.skills/ORCHESTRATOR.md b/.skills/ORCHESTRATOR.md
deleted file mode 100644
index 4d5f213..0000000
--- a/.skills/ORCHESTRATOR.md
+++ /dev/null
@@ -1,443 +0,0 @@
-Operating manual for the dispatch orchestrator: plan topological waves of single-owner agents, summon via dispatch CLI, verify from contracts + tests (never read implementation), resolve contract gaps. Project-specific to this repo.
----
-# ORCHESTRATOR.md — how to drive this project
-
-> **You are the orchestrator.** You do NOT write feature code yourself. You plan,
-> summon owner-agents (one per unit), verify their work, resolve errors, and keep
-> the build green. This file is your complete operating manual. Read it fully
-> before acting. Also read: `AGENTS.md` (the subagent constitution — you enforce
-> it), `GLOSSARY.md`, `.dispatch/rules/`, `tasks.md` (live progress), and
-> `notes/restructure-plan.md` (the full design + rationale; §-refs below point
-> into it).
-
----
-
-## 0. Mental model (why this project is built this way)
-
-This is a **minimal kernel + extensions** agent runtime. Every feature is an
-extension. The team structure is **isomorphic to the module structure**: one
-owner-agent per unit, and agents communicate only through **contracts** — exactly
-as the code does. Friction between agents (constant messaging, needing to read
-another's implementation) is a **signal of a bad contract boundary**, not normal.
-
-This is a synthesis of "The AI Harness"
-(https://dev.to/louaiboumediene/the-ai-harness-why-your-ai-coding-agent-is-only-as-smart-as-the-repo-you-put-it-in-cml)
-with our own design. The harness layers we use:
-- **Constitution** (`AGENTS.md`) — loaded by every agent. Non-obvious, project-
- specific rules only.
-- **Safety reflexes** (`.dispatch/rules/*.md`) — tiny, crystallized scar tissue.
-- **Glossary** (`GLOSSARY.md`) — one canonical name per concept.
-- **This file** — the orchestrator's workflow (the article doesn't cover this; we
- added it).
-- **Scoped knowledge** — rules/prompts are scoped to the *kind* of agent and the
- *layer* it works in (strict for kernel/pure-core, lenient for the shell). The
- article's key lesson: **scoped rules beat general rules; never write down what a
- frontier model already knows** (P6).
-
-The 8 principles (P1–P8) live in `notes/restructure-plan.md` §1. Internalize them;
-they justify every rule below.
-
----
-
-## 1. The golden workflow (build/modify a feature)
-
-1. **Plan.** Decide the unit(s); split into dependency-topological **waves** of
- disjoint units, and WIDEN each wave where you can (§2a). One agent owns one unit;
- it may ONLY edit its assigned files.
-2. **Overlap check FIRST (anti-synonym-drift, §5.6).** Before creating anything
- new, check `GLOSSARY.md` + existing code. If the request *describes* an
- existing concept under a new name, steer to the canonical term (e.g.
- "web-notifier" → that's a `webhook`). New term? Propose the standard/training-
- baked name and **ask the user** before adding it to the glossary. Never coin a
- term silently.
-3. **Boundary decision is the USER's (§5.2).** "New extension vs. extend an
- existing one?" — surface it to the user; never decide granularity silently.
-4. **Write the prompt** to `prompts/<unit>.md` (gitignored). See §3 for the
- prompt recipe.
-5. **Summon the wave** via `opencode run` (see §2); disjoint units run in PARALLEL
- (§2a). RE-READ `.dispatch/rules/` + the §3 scoping map before each wave — assemble
- from the files, not from memory.
-6. **Verify** the reports + independently re-run checks (see §4). Trust nothing
- until you've re-run `typecheck`/`test`/`check` yourself.
-7. **Resolve** any contract gaps / errors (see §5).
-8. **Commit** the milestone with a clear message + test count. Update `tasks.md`.
-
----
-
-## 2. Summoning agents via `opencode run` (the harness)
-
-OpenCode CLI is the summon mechanism (see `notes/opencode-agents.md`).
-
-**Working dir:** always the repo root,
-`/home/tradam/projects/dispatch/dispatch-backend` (so the agents' `lsp` tool works —
-TS language server is configured globally).
-
-**Model:** use `opencode-go/mimo-v2.5-pro` for BUILDING agents (capable coder).
-`deepseek-v4-flash` is reserved as the *app's own runtime testbench*, not for
-building.
-
-**Canonical invocation** — assemble the prompt by CONCATENATING the standardized briefs + the
-scoped rules + the per-summon TASK. The invariant guardrails live ONCE in the briefs, so
-`prompts/<unit>.md` is now JUST the TASK block (§3). Do NOT use `-f` (see gotcha); ALWAYS
-redirect output to a file.
-```bash
-cd /home/tradam/projects/dispatch/dispatch-backend && \
-opencode run --dir /home/tradam/projects/dispatch/dispatch-backend \
- -m opencode-go/mimo-v2.5-pro \
- "$(cat .dispatch/package-agent.md)
-$(cat .dispatch/extension-agent.md)
-$(cat .dispatch/rules/one-owner.md .dispatch/rules/isolation-over-dry.md .dispatch/rules/biome-clean.md .dispatch/rules/pure-core.md .dispatch/rules/no-internal-mocks.md .dispatch/rules/typed-handles.md)
-
-## TASK
-$(cat prompts/<unit>.md)" \
- > reports/<unit>.run.log 2>&1
-```
-**Assembly order is fixed: package brief → extension supplement → scoped rules → TASK**
-(the supplement references "the package brief above"; the briefs reference "rules inlined into
-this prompt"). Rules:
-- **Non-extension package?** OMIT the `.dispatch/extension-agent.md` line.
-- Inline ONLY the scoped rules matching the unit's layer (the §3 map) — not every rule on every agent.
-- `AGENTS.md` is auto-loaded by opencode — never `cat` it.
-- The briefs already instruct the agent on ownership, visibility, verify, and the report; the
- TASK block must NOT repeat any of that.
-
-**MANDATORY — capture output to a file, never display it.** The agent's streamed
-output is enormous and will overwhelm and CRASH this harness if it lands in your
-terminal. ALWAYS redirect the summon's stdout+stderr to a log file (e.g.
-`> reports/<unit>.run.log 2>&1`) and do NOT echo/`cat` that log back wholesale.
-You don't need the raw stream: read the agent's `reports/<unit>.md` report (and,
-if you must, `grep`/`tail` the log for a specific error). Treat dumping a full run
-log into context as a hard failure.
-
-**Run discipline (from the tool harness):**
-- **Do NOT background it. Use a large timeout** (e.g. 1800000 ms = 30 min) — these
- are long tasks. Backgrounding loses the stream.
-- One non-backgrounded `run_shell` per summon. For PARALLEL agents on disjoint
- files, launch multiple summons (the harness allows concurrent tool calls) — but
- ONLY when their file sets do not overlap (single-writer rule). Log parallel runs
- in `tasks.md`.
-
-**GOTCHAS (learned the hard way):**
-- **Headless cross-`--dir` read = HANG.** An agent's Read of any file OUTSIDE its
- `--dir` triggers an interactive permission prompt that CANNOT be answered headlessly
- → the run wedges until aborted. This bites CROSS-REPO: a `file:` dep symlink (e.g.
- `dispatch-web/node_modules/@dispatch/ui-contract` → the sibling repo) resolves OUTSIDE
- `--dir`, so an agent reading the dep's source hangs. Fixes: (a) keep everything the
- agent must READ inside `--dir` — ship an **in-repo reference snapshot** of a cross-repo
- contract and FORBID reading `node_modules/@dispatch/*`; OR (b) set `--dir` to a parent
- containing all needed paths — but then the repo's `AGENTS.md` won't auto-load (you lose
- the constitution). The briefs now tell agents: never read outside your scope — if you
- think you need to, REPORT it and STOP, never attempt the read.
-- `-f/--file` is an ARRAY flag and greedily eats your trailing message as another
- filename → "File not found". **Inline with `"$(cat prompts/X.md)"` instead.**
-- A quick smoke test works: `opencode run -m opencode-go/mimo-v2.5-pro "Reply with
- exactly SMOKE_OK"` should print `SMOKE_OK`.
-- `opencode models` lists models; `opencode agent list` lists agent profiles;
- `opencode run --help` for flags.
-
----
-
-## 2a. Parallel execution — WAVES
-
-Throughput comes from running disjoint units at once. Organise it as waves:
-- **A wave = units that (a) touch DISJOINT files and (b) have no compile-time dependency
- on each other** (each imports only already-built packages + existing contracts). Launch a
- wave by emitting one summon per unit as CONCURRENT tool calls (§2). Later waves depend on
- earlier ones; the composition root (`packages/host-bin/`) is almost always the LAST wave.
-- **Pre-author the seam to widen the wave.** Because the orchestrator OWNS contracts (§6),
- author the shared contract / typed handle in `packages/kernel/src/contracts/*` FIRST, then
- summon the producer AND the consumer in the SAME wave against that fixed type — neither needs
- the other's implementation. Authoring the contract up front is what turns a sequential
- producer→consumer chain into one parallel wave (and `lsp references` on the new symbol gives
- the exact consumer set to summon).
-- **Also widen by removing edges:** prefer a consumer-defined handle the producer implements,
- or a generic utility over a feature-specific one, so a dependency disappears entirely.
-- **One writer per file, always** — even across waves. If two units would edit the same file,
- they are NOT separable; merge them into one unit or sequence them.
-- **After a wave:** read every report, run the §4 checks ONCE for the whole wave, commit the
- milestone (update `tasks.md`), then start the next wave. Don't open a new wave before the
- prior one is green.
-
----
-
-## 3. The per-summon `prompts/<unit>.md` is JUST the TASK block
-
-The invariant guardrails — single-writer directory ownership, visibility, coupling, the
-engineering standard, isolated verification, and the report format — live ONCE in the
-standardized briefs the summon concatenates (§2):
-- **`.dispatch/package-agent.md`** — the base for EVERY package owner.
-- **`.dispatch/extension-agent.md`** — the extension-only supplement (added for extension summons).
-
-So `prompts/<unit>.md` no longer restates any of that. It contains ONLY the **TASK**:
-1. **Your package:** `packages/<name>/` — name the WHAT, not the files (the owner owns the whole
- directory and decides which files to touch).
-2. **The job + algorithm**, naming the specific contract types/handles involved.
-3. **The specific contract file(s)** to read (e.g. `packages/kernel/src/contracts/<x>.ts`) and
- any sibling public surfaces it consumes.
-4. **The required test cases** (named).
-
-Keep it scoped (P6): state only the project-specific, non-inferable task — the briefs carry the rest.
-
-**`.dispatch/rules/` scoping map** — include ONLY the rows matching the unit (per §0
-"scoped rules beat general rules"); do NOT dump every rule on every agent:
-- **Every agent:** `one-owner.md`, `isolation-over-dry.md`, `biome-clean.md`.
-- **Kernel unit:** `kernel-purity.md` + `pure-core.md` + `no-internal-mocks.md`.
-- **Pure-core unit:** `pure-core.md` + `no-internal-mocks.md`.
-- **Any extension coupling via hooks/services:** `typed-handles.md`.
-- **Every extension (≈ all of them — they all log):** `extension-logging.md`. Use the
- injected `host.logger`/`ctx.log`; keystone: each extension self-redacts its OWN secrets
- in its OWN code — NO shared redaction helper (design rationale:
- `notes/observability-design.md` §9). Include this on EVERY extension summon (an
- extension that never logs is a coverage gap, not an exemption).
-- **Frontend units** are summoned from the SEPARATE `../dispatch-web` repo using ITS
- OWN harness (`package-agent.md` + `frontend-*.md` rules) + ITS OWN scoping map — NOT
- these backend rules. See that repo's `ORCHESTRATOR.md`.
-
-**Tell each agent it has company (parallel waves).** Add to each wave TASK: sibling units are
-being built in OTHER packages right now; `tsc -b`/vitest/biome are whole-PROJECT, so if a check
-reports errors OUTSIDE your package, that's concurrent WIP — ignore it and ensure YOUR files are
-clean. The orchestrator's post-wave run (§4) is the source of truth.
-
-**Make agents IMPLEMENT, not deliberate.** A summoned owner must edit files + run its checks +
-write its report in the one run. If a summon returns only a plan, re-summon (§5a).
-
----
-
-## 4. Verification (the orchestrator's trust protocol)
-
-**Plan principle (§3.6 / §5 last row):** the orchestrator confirms work from
-**contracts + test results + build/diagnostics output** — that is the *designed*
-trust mechanism, and it works precisely because the boundaries are testable. The
-tests-at-boundaries ARE how you trust a unit without depending on its internals.
-
-**Stay out of implementation files (§6 Visibility).** Your trust signals are the
-agent's report, the contract/surface it exposes (contracts, manifests, public
-types), and the build/test/lint output you re-run yourself — NOT its implementation
-code. Do NOT open an extension's implementation files — not even to "skim",
-double-check, or diagnose a bug. **There is NO "conflict exception."** When X and Y
-don't work together, or a unit is broken, you diagnose from the `typecheck`/`test`
-output + `lsp references` on the contract + the agent's report, then **summon the
-owning agent** (or a temporary multi-knowledge agent, §5) to read its own code and
-fix it. You diagnose from symptoms; the agent reads the code.
-
-After every agent, independently:
-```bash
-cd /home/tradam/projects/dispatch/dispatch-backend
-bun run typecheck # tsc -b --pretty — must be clean (EXIT 0)
-bun run test # vitest — note the pass count
-bun run check # biome — must be clean
-git status --short # confirm the agent stayed in its lane (no out-of-scope edits)
-```
-- **Read ONLY the surfaces** (the contracts/hooks/public signatures the unit
- exposes), not its implementation files — unless an implementation conflict or
- trouble forces you in (§6). The surface plus green checks is enough to trust a
- unit; subtle contract mistakes show up at the boundary, which is what the
- contract + boundary tests are for.
-- Confirm the agent touched ONLY its assigned files (one-owner rule).
-- For pure units, confirm tests use NO internal `vi.mock("@dispatch/*")`.
-
-**Concurrency caveat (parallel waves):** `tsc -b`/vitest/biome are whole-project, so an agent's
-OWN mid-wave check can transiently see a sibling's half-written file. Don't act on a report's
-out-of-package errors; YOUR post-wave run is authoritative. Re-run a suite that depends on shared
-external state before trusting it — and ALWAYS sweep leaked server/collector processes between
-live runs (§8 bracket trick), since a leak silently poisons the next run's counts.
-
----
-
-## 5. Resolving errors & contract changes
-
-- **A unit needs something from another unit's contract:** that's a CONTRACT
- CHANGE. The owner of the contract makes it. To find every consumer, use
- `lsp references` on the changed exported symbol (contracts are static TS types,
- so this returns the TRUE blast radius — §5.3). Then summon the affected owners
- to update. The orchestrator dispatches this fan-out; agents don't reach across.
-- **Integration bug (X and Y each honor the contract but don't work together):**
- no single file owns it. Summon a **temporary multi-knowledge agent** with
- read/write to the 2–3 relevant files (it MAY see implementation — exception to
- the visibility rule), as their temporary exclusive owner. Dispatch proactively,
- or when a file-owner requests it (§5.5).
-- **CR (change-request) in a report:** if it's **build/config** (root
- `tsconfig.json` ref, a `package.json` dep, `.gitignore`, `bun.lock`) the
- orchestrator edits it directly, then re-verifies. If it's **implementation** (a
- barrel `index.ts`, a sibling conforming to a contract change), the orchestrator
- **summons the owning agent** — it does NOT edit implementation itself.
-- **Live API errors:** an HTTP 429 `GoUsageLimitError` is an UPSTREAM rate limit,
- not a bug. The `opencode-2` key has a monthly cap; `opencode-1` is the backup.
- Swap `DISPATCH_API_KEY` in `.env` (both keys are there).
-
----
-
-## 5a. Agent-failure recovery patterns
-
-- **Plan-only / "shall I proceed?" agent.** A summon sometimes returns a PLAN and STOPS without
- editing (no diff, no `reports/<unit>.md`). Detect via `git status` + the missing report.
- Re-summon the SAME TASK prefixed: "IMPLEMENT THIS NOW — make all edits, run the checks, write
- the report; do not stop to plan or ask." Don't hand-fix its work.
-- **A behaviour change reds a SIBLING's tests (test fan-out).** When a unit's new behaviour
- invalidates another unit's test ASSERTIONS, those tests belong to that OTHER owner — summon it
- with a focused "fix these N failing tests to match the new behaviour" TASK (state the
- behaviour). The orchestrator never edits feature tests itself. (Distinct from an INTEGRATION
- bug where neither side is wrong — that's the temporary multi-knowledge agent in §5.)
-- **Agent strayed out of its lane.** `git status --short` after every wave; if an agent touched a
- file outside its package, keep it ONLY if it's legitimately the orchestrator's lane
- (contracts / build / config / harness, §6) and note it — otherwise revert + re-summon with a
- tighter scope.
-- **Flaky green.** A wave that passes once but leaked a server/collector or relies on shared
- external state can pass for the wrong reason; sweep (§8) and re-run before committing.
-
----
-
-## 6. Restrictions & invariants (NEVER violate)
-
-- **Single-writer:** never let two agents edit the same file concurrently.
-- **Kernel purity:** no I/O / no concrete feature names in `packages/kernel`
- (`.dispatch/rules/kernel-purity.md`).
-- **Visibility rule (§5.1):** agents see only other units' CONTRACTS, never their
- implementation. A contract documents **behavior & guarantees a consumer can
- rely on, not just types** (P6 applied to contracts). An agent *needing* to read
- another unit's code is a signal that contract is underspecified — fix the
- contract, don't grant code access. (Exception: the temporary multi-knowledge
- integration agent, §5 / ORCHESTRATOR §5, which MAY read implementation.)
-- **The orchestrator NEVER reads or edits implementation.** You read ONLY contracts
- (`packages/kernel/src/contracts/*`) + surfaces (manifests, public signatures) +
- diagnostics (`typecheck`/`test` output, `lsp references` on contract symbols) +
- agent reports. Do NOT open implementation `.ts` files (feature logic, tests,
- composition roots) — not even during a bug. Clean context = level-headed
- decisions; the subagents do the implementation.
-- **What the orchestrator MAY edit directly:** (a) **contracts**
- (`packages/kernel/src/contracts/*`); (b) **build wiring + config** (root/package
- `tsconfig.json`, `package.json` deps, project refs, `.gitignore`, `bun.lock`);
- (c) **harness/docs** (`ORCHESTRATOR.md`, `AGENTS.md`, `GLOSSARY.md`,
- `.dispatch/rules/`, `notes/`, `tasks.md`, `prompts/`, `reports/`). Everything else
- — all executable implementation `.ts`, including tests and composition roots like
- `host-bin/src/main.ts` — changes ONLY by summoning the owning agent.
-- **Roadblock → surface to the user.** If a needed change doesn't fit the above
- (ambiguous ownership, a design question, a stuck agent), stop and ask rather than
- reaching into implementation.
-- **Subagents inherit this restriction.** Every prompt you write must instruct the
- agent to read ONLY the surfaces (contracts/hooks) of OTHER units, with the sole
- exception that it MAY read the implementation files of the task/extension it is
- assigned to. It must not go spelunking through sibling units' implementations.
-- **`onAny` is the ONLY allowed dynamic hook subscription** (observability/logging
- firehose). All other cross-extension coupling is typed-symbol anchored (§5.4).
-- **Contracts are static TYPES; loading is dynamic** (manifests via host). This
- split is load-bearing — it's what makes `lsp references` fan-out work.
-- **Full fidelity:** every core feature is a real extension with a manifest,
- loaded through the host. Do NOT hand-wire imports to shortcut the extension
- model — that defeats the point.
-- **Asymmetric testing:** strict (zero internal mocks, high coverage) on
- kernel/pure-core; lenient (thin integration tests) on the shell.
-- **Destructive git ops:** be extremely careful. Back up irreplaceable files
- (e.g. `notes/restructure-plan.md`) to `/tmp` before any `git reset --hard` /
- `git clean`. `.env` is gitignored — preserve it.
-- **Write things up before pivoting topics.** Keep `tasks.md` current in real
- time. Don't leave decisions only in chat context (it can be lost).
-
----
-
-## 7. Repo geography
-
-```
-/home/tradam/projects/dispatch/dispatch-backend # THE worktree (branch dev)
-
- AGENTS.md the subagent constitution (auto-loaded by opencode; you enforce it)
- ORCHESTRATOR.md the orchestrator's operating manual (this file)
- GLOSSARY.md canonical vocabulary + aliases-to-avoid (human-gated)
- tasks.md live progress checklist / milestone log
- README.md deployment, CLI usage, extension/package tables
-
- .dispatch/
- package-agent.md base owner-agent brief (every summon)
- extension-agent.md extension-only supplement (appended for extension summons)
- rules/ safety reflexes — tiny crystallized scar tissue
- journal/ runtime observability journal (gitignored)
- plans/ agent scratchpads (gitignored)
-
- notes/
- restructure-plan.md the full architecture design + rationale (P1–P8; §-refs)
- observability-design.md logging/spans/collector/trace-store design (Phase A–B)
- cli-design.md CLI design decisions + unit plan (built; §3 = settled decisions)
- frontend-design.md future web frontend design (IDEATION; separate repo)
- opencode-agents.md notes on summoning agents via the opencode CLI
-
- prompts/ (gitignored — orchestrator→agent TASK blocks)
- reports/ (gitignored — agent→orchestrator reports)
- .env (gitignored — DISPATCH_API_KEY, DISPATCH_BASE_URL, DISPATCH_MODEL, BACKEND_PORT)
-
- packages/
- kernel/ contracts (ABI), bus, runtime (runTurn), host
- wire/ types-only wire ABI (AgentEvent + conversation model + Usage); kernel +
- transport-contract re-export it so clients consume the wire w/o the kernel runtime
- transport-contract/ types-only HTTP API contract (CLI + future web + server share it)
- ui-contract/ types-only surface ABI (frontend-agnostic; web + CLI render it)
- storage-sqlite/ conversation-store/ auth-apikey/ provider-openai-compat/
- credential-store/ named credentials + model catalog (resolve / listCatalog)
- session-orchestrator/ transport-http/ (core extensions)
- tool-read-file/ standard tool extension (read_file; cwd-aware)
- journal-sink/ trace-store/ observability-collector/ trace-replay/ (observability)
- cli/ bundled one-shot terminal client (HTTP client of transport-contract)
- host-bin/ composition root (boot + Bun.serve + collector supervisor)
-```
-
-The genesis commit deleted all prior source; we rebuilt from scratch. The OLD
-project lives at `/home/tradam/projects/dispatch/dispatch-source` (reference only
-— do not edit).
-
-The **web frontend is a SEPARATE repo** at `/home/tradam/projects/dispatch/dispatch-web`
-(own git, own harness — its own `AGENTS.md`/`ORCHESTRATOR.md`/`GLOSSARY.md`/`.dispatch/`).
-It consumes `packages/ui-contract` + the wire types as a pinned `file:` dependency.
-`lsp references` does NOT span the two repos, so cross-repo contract changes are
-**couriered via the user** (see the FE `ORCHESTRATOR.md` §5). Design + plan:
-`notes/frontend-design.md`. Do NOT edit the FE repo from here.
-
----
-
-## 8. Current status & how to run
-
-See `tasks.md` for the live checklist. As of MVP completion:
-- Kernel + 6 core extensions + host-bin DONE. 178 tests pass; typecheck + biome
- clean.
-- **MVP verified live:** multi-turn curl against OpenCode Go flash works
- (`conversationId` threads history).
-
-**Boot + smoke test:**
-```bash
-cd /home/tradam/projects/dispatch/dispatch-backend
-KEY1=$(grep DISPATCH_API_KEY_OPENCODE1 .env | cut -d= -f2)
-PORT=4567 DISPATCH_API_KEY="$KEY1" bun packages/host-bin/src/main.ts # boots server
-# in another shell:
-curl -s -X POST localhost:4567/chat -H 'content-type: application/json' \
- -d '{"conversationId":"c1","message":"Say hello in 3 words."}'
-```
-Note the chat field is **`conversationId`** (threads multi-turn), not `tabId`.
-
-**Live validation & process cleanup — the `[x]` bracket trick (scar tissue).** When
-you live-validate you background the app (`bun packages/host-bin/src/main.ts &`), and it
-now spawns a child **observability collector** process. To list or kill those, ALWAYS
-use the bracket trick in `ps`/`pgrep`/`pkill` patterns:
-```bash
-ps -eo pid,args | grep '[o]bservability-collector/src/main' # list (won't self-match)
-pkill -9 -f '[h]ost-bin/src/main.ts' # kill the app
-pkill -9 -f '[o]bservability-collector/src/main' # kill the collector
-```
-**Why it matters:** a plain `pkill -f 'host-bin/src/main.ts'` matches its OWN command
-line and kills the parent shell → the tool call prints NOTHING and times out, looking
-exactly like a wedged session. `[h]ost-bin` matches the target "host-bin" while the
-literal pattern `[h]ost-bin` does not match itself. ALWAYS clean up the backgrounded app
-+ its spawned collector after each live run — leaked processes pollute the next run's
-counts (this is precisely what made a correct supervisor look like it spawned 3
-collectors and left 2 behind).
-
-**Live boot-probe in ONE command WILL hit the tool timeout — that is NOT failure (scar tissue).**
-A single bash command that boots the app (even detached via `setsid … & disown`), sleeps, runs a
-probe, then kills it will still run to the tool's timeout: the tool waits on the spawned
-server/collector session. The probe already ran — **read the probe's printed `RESULT: OK/FAIL`
-line as the signal**, ignore the timeout, then run a SEPARATE `pkill` (bracket-trick) + `ps`
-cleanup command (it returns immediately and confirms no leaks). Don't try to make the boot+probe
-command "return cleanly" — it won't. (For a frontend-agnostic surface, the probe is a tiny
-`bun` WebSocket client that asserts `catalog → subscribe → surface`.)
-
-**Next suggested work** (post-MVP, see `tasks.md` "Open items"): wire
-auth→provider properly (auth-apikey is currently vestigial), then add the first
-TOOL extension to exercise the dispatch loop (turns currently run with `tools:
-[]`).
diff --git a/notes/conv-list-by-worktree-research.md b/notes/conv-list-by-worktree-research.md
new file mode 100644
index 0000000..636f06e
--- /dev/null
+++ b/notes/conv-list-by-worktree-research.md
@@ -0,0 +1,247 @@
+# Research — List conversations filtered by worktree / workspace
+
+> **Branch:** `feature/conv-list-by-worktree` (research notes only — no code changes).
+> **Date:** 2026-06-25. **Repo:** `dispatch-backend` worktree
+> `worktrees/conv-list-by-worktree/backend`.
+>
+> **Research question:** Does the Dispatch backend currently support getting a
+> list of *open* conversations filtered by a specific worktree (or workspace)?
+
+## TL;DR
+
+**Yes — fully supported, via the `workspace` concept.** `GET /conversations`
+accepts a composable `?workspaceId=` query filter, and every conversation
+already carries a `workspaceId` (default `"default"`). Combined with the
+`?status=` filter, "open conversations in workspace X" is:
+
+```
+GET /conversations?workspaceId=X&status=active,idle
+```
+
+("open" = not `closed`; the lifecycle statuses are `active | idle | closed`,
+where `closed` is the archived state.)
+
+**Important terminology caveat:** **"worktree" is NOT a Dispatch domain term** —
+it appears nowhere in `packages/` (zero matches; it only occurs in `ORCHESTRATOR.md`
+and `notes/restructure-plan.md` as part of file *paths*). The two closest canonical
+concepts are:
+
+| Canonical term (GLOSSARY) | Meaning | "Worktree" as directory? |
+|---|---|---|
+| **workspace** | A named, URL-driven *logical grouping* of conversations that owns a default cwd. Every conversation belongs to exactly one. (`@dispatch/wire` `Workspace`) | No — it's a slug, not a path |
+| **working directory (cwd)** | The per-conversation *filesystem directory* tools/language-servers operate within. | **Yes** — a git worktree *is* a directory |
+
+The existing `GET /conversations` filter supports `?workspaceId=` but **not**
+`?cwd=`. So if the intent is "conversations whose working directory is a *specific
+git worktree path*", that is NOT supported today (see §3b). If the intent is the
+logical grouping, it's fully supported (see §1–§2).
+
+---
+
+## 1. Is there an existing endpoint? — YES
+
+`GET /conversations` lists all known conversations and supports three
+**composable** query filters:
+
+| Filter | Values | Effect |
+|---|---|---|
+| `?workspaceId=<slug>` | workspace slug | Restrict to conversations in that workspace |
+| `?status=<csv>` | `active`,`idle`,`closed` | Restrict to those lifecycle statuses |
+| `?q=<prefix>` | id prefix | Short-id resolution; id-prefix match (applied in-memory) |
+
+**Open conversations in a workspace** = `?status=active,idle` (excludes `closed`)
+combined with `?workspaceId=`.
+
+There is also a `GET /workspaces` endpoint that lists workspaces (each with a
+`conversationCount`), and per-workspace CRUD — see §4.
+
+---
+
+## 2. How it works (endpoint, parameters, response shape)
+
+### 2a. The route — `packages/transport-http/src/app.ts:775-814`
+
+```ts
+app.get("/conversations", async (c) => {
+ // ?status= comma-separated (e.g. "active,idle"). Default: all. Invalid dropped.
+ const statusFilter = parseStatusFilter(c.req.query("status"));
+ // ?workspaceId= . Missing/empty/whitespace → ignored (all workspaces).
+ const workspaceId = rawWorkspaceId !== undefined && rawWorkspaceId.trim().length > 0
+ ? rawWorkspaceId.trim() : undefined;
+ const filter = (statusFilter !== undefined || workspaceId !== undefined)
+ ? { ...(statusFilter ? { status: statusFilter } : {}),
+ ...(workspaceId ? { workspaceId } : {}) }
+ : undefined;
+ const all = await opts.conversationStore.listConversations(filter);
+ // ?q= prefix filter applied in-memory on the result.
+ const conversations = q.length > 0 ? all.filter((m) => m.id.startsWith(q)) : all;
+ return c.json({ conversations }, 200); // 500 on store error
+});
+```
+
+- `?status=` parsing: `parseStatusFilter` (`packages/transport-http/src/logic.ts:24-38`).
+ Valid values are `"active" | "idle" | "closed"` (`VALID_STATUSES`, logic.ts:16).
+ Invalid values are silently dropped; if *all* values are invalid → no filter
+ (returns all).
+- `?workspaceId=` is whitespace-trimmed; empty/whitespace-only is ignored.
+- The `filter` object passed to the store is `{ status?, workspaceId? }` — either
+ or both optional.
+
+### 2b. Response shape
+
+`200` → `ConversationListResponse` (`packages/transport-contract/src/index.ts:710-713`,
+re-exported from `@dispatch/wire`):
+
+```ts
+interface ConversationListResponse {
+ readonly conversations: readonly ConversationMeta[];
+}
+
+interface ConversationMeta { // @dispatch/wire, wire/src/index.ts:518-536
+ readonly id: string;
+ readonly createdAt: number; // epoch-ms, set on first write
+ readonly lastActivityAt: number; // epoch-ms, updated on every append
+ readonly title: string; // first user message (truncated 80) or PUT /title
+ readonly status: ConversationStatus; // "active" | "idle" | "closed"
+ readonly workspaceId: string; // always present; "default" fallback
+ readonly compactedFrom?: string; // present iff post-compaction
+}
+```
+
+- Sorted by `lastActivityAt` **descending** (most recent first); stable sort keeps
+ first-seen (index) order for ties.
+- `workspaceId` is always present on the response — conversations never assigned
+ read as `"default"` (see `toMeta`, `store.ts:329-337`).
+- Errors: store failure → `500 { error: "Failed to list conversations" }`.
+
+### 2c. The store filter — `packages/conversation-store/src/store.ts`
+
+`ConversationStore.listConversations` filter type (`store.ts:91-94`):
+
+```ts
+readonly listConversations: (filter?: {
+ readonly status?: readonly ConversationStatus[];
+ readonly workspaceId?: string;
+}) => Promise<readonly ConversationMeta[]>;
+```
+
+Implementation (`store.ts:695-733`): reads the conversation index, dedups
+(first-seen order), reads each conversation's meta row, applies both filters, sorts
+desc by `lastActivityAt`. Specifically:
+
+- `statusFilter` → membership test against `row.status`.
+- `workspaceFilter` → equality against `row.workspaceId ?? DEFAULT_WORKSPACE_ID`
+ (so legacy rows with no stored workspaceId match `"default"`).
+
+`DEFAULT_WORKSPACE_ID` is `"default"`.
+
+---
+
+## 3. "What would it take to add it?"
+
+### 3a. If "worktree" means the logical **workspace** → already done. Nothing to add.
+
+The capability, data model, contract types, and tests all exist. Pin
+`@dispatch/transport-contract` (currently `0.22.0`) and `@dispatch/wire`
+(`0.12.0`) and call `GET /conversations?workspaceId=<slug>&status=active,idle`.
+
+### 3b. If "worktree" literally means a **filesystem directory / git worktree path** →
+NOT supported today; small, well-contained change.
+
+The directory concept maps to **working directory (cwd)**, which is per-conversation
+(`conversation-store` `getCwd`/`setCwd`, keyed per conversation; `GET/PUT
+/conversations/:id/cwd`). The list endpoint does NOT support a `?cwd=` filter, and
+`ConversationMeta` does NOT carry a `cwd` field (confirmed: `wire/src/index.ts:518-536`
+has no `cwd`; filter type `store.ts:91-94` has no `cwd`).
+
+To add a `?cwd=` filter (filter conversations by their working directory), the change
+touches three layers, all additive:
+
+1. **Contract (`@dispatch/wire`)** — (optional) add `readonly cwd?: string | null` to
+ `ConversationMeta` so the caller can see each conversation's directory. Additive
+ type bump (e.g. `0.12.0 → 0.13.0`). Not strictly required if filtering is
+ server-side-only, but useful for the FE to render.
+2. **Store (`conversation-store/src/store.ts`)**:
+ - Widen the `listConversations` filter to `{ status?, workspaceId?, cwd? }`.
+ - In the scan loop (`store.ts:718-728`), for each candidate call `getCwd(id)`
+ (or the effective cwd via `getEffectiveCwd`) and compare. **Cost note:** this is
+ an extra storage read per conversation (cwd is stored in its own key, not in the
+ meta row) — fine for typical counts; if scale matters, add a `cwd` field to
+ `ConversationMetaRow` (populated on `setCwd`) so the filter is a row comparison
+ with no extra reads. The latter is the better design and mirrors how
+ `workspaceId` was added to the meta row.
+ - Decide exact-match (path equality) vs. prefix/normalized match (e.g. a worktree
+ and its subdirs). Equality is simplest and probably sufficient.
+3. **Transport (`transport-http/src/app.ts` + `logic.ts`)** — parse `?cwd=` (trim;
+ empty → ignore, mirroring `?workspaceId=`) and pass `cwd` into the store filter.
+ Update `ConversationListResponse` doc.
+
+The worktree-as-directory case is the only one that requires new code; the
+workspace case requires none.
+
+---
+
+## 4. Related workspace capabilities (for context)
+
+The workspace model is fully built out (courier doc `frontend-workspaces-handoff.md`
+→ implemented in `@dispatch/[email protected]` / `@dispatch/transport-contract`):
+
+- `GET /workspaces` → `WorkspaceListResponse` (workspaces sorted by `lastActivityAt`
+ desc, each with `conversationCount`). The `"default"` workspace is always
+ synthesized/present.
+- `PUT /workspaces/:id` (create-on-miss, idempotent), `GET /workspaces/:id` (pure
+ read, 404 if missing), `PUT /workspaces/:id/title`, `PUT /workspaces/:id/default-cwd`,
+ `DELETE /workspaces/:id` (closes all its conversations → `status="closed"`,
+ reassigns them to `"default"`, returns `closedCount`; 409 for `"default"`).
+- Workspace slug regex: `^[a-z0-9](?:[a-z0-9-]{0,38}[a-z0-9])?$` (1–40, lowercase,
+ digits, internal hyphens). Validated by `isValidWorkspaceSlug`.
+- `workspaceId` is auto-created on turn start if missing (`title = id`,
+ `defaultCwd = null`). Auto-assigned on `/chat` and `POST /conversations/:id/queue`.
+- `DELETE /conversations/:id/cwd` clears the explicit per-conversation cwd
+ (falls back to the workspace `defaultCwd`, then the server default).
+- cwd resolution: explicit per-conversation cwd → workspace `defaultCwd` → server
+ default (`process.cwd()`). `GET /conversations/:id/cwd` returns the explicit cwd
+ only; `GET /conversations/:id/lsp` roots at the *effective* cwd.
+
+`Workspace` type (`wire/src/index.ts:566-577`): `{ id, title, defaultCwd: string|null,
+createdAt, lastActivityAt }`. `WorkspaceEntry extends Workspace { conversationCount }`.
+
+---
+
+## 5. Test coverage (verified green in-repo)
+
+- `conversation-store/src/store-workspace.test.ts:369` — "listConversations filtered
+ by workspaceId" (asserts work-a returns `[a1,a2]`, work-b returns `[b1]`).
+- `conversation-store/src/store.test.ts:1463` — "listConversations filters by status",
+ incl. `{ status: ["active","idle"] }` (= "open") returns `[conv1, conv3]`
+ (excludes the `closed` conv2). Also `store.test.ts:1485` — status persists across a
+ fresh store instance.
+- `transport-http/src/app.test.ts:3696` — "GET /conversations?workspaceId= filters"
+ (asserts the store receives `{ workspaceId: "proj" }` and responds `200`).
+- `transport-http/src/app.test.ts` (q-filter block ~3133-3156) — `?q=` prefix +
+ empty/whitespace handling; 500 on store throw.
+
+The two filters are independently testable and composable at the store level
+(same `filter` object carries both); HTTP-level composition of `?status=…&workspaceId=…`
+is not asserted by an explicit combined test, but the handler builds one merged
+`filter` object so composition is structural.
+
+---
+
+## 6. Verdict & recommendation
+
+- **For the workspace interpretation (the canonical reading of "grouping of
+ conversations"): the feature already exists.** Use
+ `GET /conversations?workspaceId=<slug>&status=active,idle`. No code changes, no
+ data-model changes, no contract bumps needed.
+- **For the literal "git worktree as a directory" interpretation: not supported** as a
+ list filter; the closest concept is per-conversation **cwd**. Adding a `?cwd=` filter
+ is a small additive change across `@dispatch/wire` (optional `cwd` on
+ `ConversationMeta`), `conversation-store` (`listConversations` filter + ideally a
+ `cwd` column on the meta row for cheap filtering), and `transport-http` (parse
+ `?cwd=`). See §3b.
+- **Terminology:** recommend the user confirm which concept they mean. If they mean
+ the logical grouping, "workspace" is already the canonical GLOSSARY term — no new
+ vocabulary. If they truly need directory-based grouping, that is a distinct feature
+ from workspaces and should be scoped as such (it overlaps with, but is not, a
+ workspace; a single directory could be shared by multiple workspaces or none).
diff --git a/notes/retry-with-backoff-plan.md b/notes/retry-with-backoff-plan.md
new file mode 100644
index 0000000..99f6f5d
--- /dev/null
+++ b/notes/retry-with-backoff-plan.md
@@ -0,0 +1,138 @@
+# Plan — Retry-with-backoff on retryable provider errors (FINALIZED)
+
+**Goal:** When the upstream LLM API returns a retryable error (e.g. "server
+overloaded"), retry the request with a stepped backoff, visibly, until the
+budget is exhausted.
+
+## The error (from the prod DB) — detection is already done
+
+- **HTTP 429** (46×) and **HTTP 502** (1×), **no 503s**.
+- Body: `{"error":{"type":"overloaded_error","message":"The service is temporarily overloaded. Please retry."}}`
+- `packages/openai-stream/src/stream.ts:201` **already sets**
+ `retryable: response.status >= 500 || response.status === 429` on the error
+ event, and `ProviderErrorEvent` (`kernel/contracts/provider.ts:72`) **already
+ declares `retryable?: boolean`**. The kernel's `processEvent` just ignores it.
+- The error is **emitted (not thrown) and before any content** → retrying
+ `provider.stream()` is safe (no partial chunks to roll back).
+
+## Decision 1 — the backoff schedule
+
+`5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, then **repeat 30m** until **8h of
+cumulative retry-wait** is reached, then give up (emit the final error + seal).
+
+Pure function of the attempt index (0 = first retry):
+```ts
+const SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000];
+const TAIL_MS = 1_800_000; // 30m
+const BUDGET_MS = 8 * 60 * 60 * 1000; // 8h
+
+// pure, deterministic, no I/O
+function delayFor(attempt: number): number | undefined {
+ const delay = attempt < SCHEDULE_MS.length ? SCHEDULE_MS[attempt] : TAIL_MS;
+ if (cumulativeSleepMs(attempt) > BUDGET_MS) return undefined; // over budget → stop
+ return delay;
+}
+```
+- `cumulativeSleepMs(attempt)` = sum of delay[0..attempt]; head (8 steps) sums to
+ 3,705s, then +1,800s per extra step. 8h (28,800s) is reached at attempt ~21
+ → ~21 retries, ~7h32m of sleeping, then give up.
+- Budget = cumulative *scheduled sleep* (pure/testable). If you prefer wall-clock
+ since first error, it switches to using the injected `now` — easy change.
+
+## Decision 2 — visible (yellow system-message warning) + 5d3f handoff
+
+Add a new **transient** `AgentEvent` variant (emitted to the frontend, NOT
+persisted into the model's message history — so it never pollutes the prompt):
+
+```ts
+// @dispatch/wire (AgentEvent union gains this member)
+export interface TurnProviderRetryEvent {
+ readonly type: "provider-retry";
+ readonly conversationId: string;
+ readonly turnId: string;
+ /** 0-based: this is the Nth retry about to happen. */
+ readonly attempt: number;
+ /** ms the client should expect to wait before the retry fires. */
+ readonly delayMs: number;
+ /** The endpoint's error verbatim (e.g. "HTTP 429: {…overloaded_error…}"). */
+ readonly message: string;
+ /** The HTTP code when known (e.g. "429"). */
+ readonly code?: string;
+}
+```
+- Emitted once per scheduled retry, BEFORE the sleep, so the UI shows
+ "⚠ Server overloaded — retrying in 5s…" immediately.
+- When retries are exhausted (8h), the existing `error` event is emitted (as
+ today) and the turn seals — so the final failure is still a persisted error.
+
+**Frontend handoff to 5d3f:** render `provider-retry` as a yellow warning
+system-message bubble showing `message` (+ `code`), with the countdown. (I do
+the backend; 5d3f does the renderer — handoff via dispatch CLI.)
+
+## Decision 3 — retry ANY retryable error
+
+Retry trigger (both paths), **only when no content has been emitted yet**
+(the safety invariant — never duplicate partial output):
+
+- **Emitted** `error` ProviderEvent with `retryable === true` → retry. (429/502/5xx + network fetch errors — all pre-content.)
+- **Thrown** error (mid-stream, caught in `executeStep`'s `catch`) → treated as **retryable-by-default when pre-content** (most mid-stream throws are transient network/SSE issues). A thrown error after content is emitted is NOT retried (can't safely).
+
+So "if it's retryable, retry it" = the `retryable` flag drives emitted errors;
+thrown errors default to retryable when nothing was streamed yet. Non-retryable
+emitted errors (`retryable: false`/absent) end the step as today.
+
+## Architecture — kernel provides the HOOK, shell provides POLICY + I/O
+
+(Constitution: kernel touches no I/O; effects injected; decision pure.)
+
+### Kernel contract (`kernel/src/contracts/runtime.ts`) — add to `RunTurnInput`:
+```ts
+export interface RetryStrategy {
+ /** Pure: attempt → delay ms, or undefined to stop (budget exhausted). */
+ readonly delayFor: (attempt: number) => number | undefined;
+ /** Injected effect: actually sleep. Kernel imports no timer. Abortable. */
+ readonly sleep: (ms: number, signal: AbortSignal) => Promise<void>;
+}
+export interface RunTurnInput {
+ // …existing…
+ /** Optional injected retry. Omit = no retry (backward-compatible). */
+ readonly retry?: RetryStrategy;
+}
+```
+
+### Kernel loop (`kernel/src/runtime/run-turn.ts`, `executeStep`):
+Wrap stream consumption in a retry loop:
+- track `hadContent` (any text/reasoning/tool-call/usage seen);
+- on a retryable error (emitted `retryable:true` OR thrown) with `!hadContent`:
+ - `delay = retry.delayFor(attempt)`; if `undefined` → give up (emit the
+ suppressed error, end step);
+ - else emit `providerRetryEvent(attempt, delay, message, code)`, `await
+ retry.sleep(delay, signal)`, `attempt++`, re-call `provider.stream()`;
+- on abort during sleep → reject, seal turn `aborted` (existing flow).
+
+### Shell wiring (`session-orchestrator/src/orchestrator.ts`):
+- Provide the concrete `RetryStrategy`: `delayFor` = the schedule + 8h budget
+ above; `sleep` = abortable `setTimeout`-based promise.
+- Pass `retry` into the `RunTurnInput` it builds (line 589).
+
+## Build breakdown by unit (execution)
+
+| Unit (owner) | Change |
+|---|---|
+| `@dispatch/wire` | add `TurnProviderRetryEvent` to `AgentEvent` union |
+| `kernel` contracts | add `RetryStrategy` + `retry?` on `RunTurnInput` |
+| `kernel` events.ts | `providerRetryEvent(...)` constructor |
+| `kernel` run-turn.ts | retry loop in `executeStep` (the core logic) |
+| `kernel` run-turn.test.ts | pure tests: fake `sleep` + pure `delayFor`; assert schedule, no-after-content retry, give-up emits error, abort-during-sleep |
+| `session-orchestrator` | wire concrete schedule + real `setTimeout` sleep |
+| `transport-ws` | if it has an exhaustive `switch(event.type)`, add the `provider-retry` case |
+| `transport-http` (mine) | **no change** — `serializeEventLine` is generic `JSON.stringify` |
+| frontend (5d3f) | render `provider-retry` as a yellow warning system message |
+
+## Open items
+- **8h budget = cumulative scheduled sleep** (pure). Confirm OK vs wall-clock.
+- **Thrown errors default retryable-when-pre-content.** Confirm (vs only the
+ flagged emitted path).
+- **Execution mode:** this spans kernel + wire + orchestrator (outside my
+ transport-http unit). Build it directly across units, or dispatch each slice
+ to its unit owner via the dispatch CLI?
diff --git a/packages/cli/src/args.test.ts b/packages/cli/src/args.test.ts
index 3d07c96..e613f31 100644
--- a/packages/cli/src/args.test.ts
+++ b/packages/cli/src/args.test.ts
@@ -254,6 +254,41 @@ describe("parseArgs", () => {
});
});
+ it("parses 'list' with --workspace", () => {
+ expect(parseArgs(["list", "--workspace", "proj"], { defaultServer })).toEqual({
+ kind: "list",
+ server: "http://localhost:24203",
+ workspaceId: "proj",
+ all: false,
+ });
+ });
+
+ it("parses 'list' with -w shorthand", () => {
+ const result = parseArgs(["list", "-w", "ws"], { defaultServer });
+ expect(result.kind).toBe("list");
+ if (result.kind === "list") expect(result.workspaceId).toBe("ws");
+ });
+
+ it("parses 'list' with --workspace, --status, and a prefix together", () => {
+ const result = parseArgs(["list", "abc", "--status", "active", "--workspace", "proj"], {
+ defaultServer,
+ });
+ expect(result).toEqual({
+ kind: "list",
+ server: "http://localhost:24203",
+ query: "abc",
+ status: "active",
+ workspaceId: "proj",
+ all: false,
+ });
+ });
+
+ it("errors when --workspace has no value (list)", () => {
+ const result = parseArgs(["list", "--workspace"], { defaultServer });
+ expect(result.kind).toBe("error");
+ if (result.kind === "error") expect(result.message).toContain("--workspace requires a value");
+ });
+
it("parses 'list' with --all", () => {
expect(parseArgs(["list", "--all"], { defaultServer })).toEqual({
kind: "list",
@@ -320,11 +355,31 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
+ queue: false,
+ open: false,
+ });
+ });
+
+ it("parses 'send' with --file", () => {
+ expect(parseArgs(["send", "deadbeef", "--file", "foo.txt"], { defaultServer })).toEqual({
+ kind: "send",
+ server: "http://localhost:24203",
+ conversationId: "deadbeef",
+ text: undefined,
+ file: "foo.txt",
queue: false,
open: false,
});
});
+ it("parses 'send' with both --text and --file", () => {
+ const result = parseArgs(["send", "deadbeef", "--text", "hi", "--file", "f.txt"], {
+ defaultServer,
+ });
+ expect(result).toMatchObject({ kind: "send", text: "hi", file: "f.txt" });
+ });
+
it("parses 'send' with --queue", () => {
const result = parseArgs(["send", "deadbeef", "--text", "hi", "--queue"], {
defaultServer,
@@ -334,6 +389,7 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
queue: true,
open: false,
});
@@ -348,6 +404,7 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
queue: false,
open: true,
});
@@ -363,6 +420,7 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
queue: false,
open: false,
cwd: "/tmp",
@@ -370,10 +428,10 @@ describe("parseArgs", () => {
});
});
- it("requires --text", () => {
+ it("errors when --text and --file are both missing", () => {
const result = parseArgs(["send", "deadbeef"], { defaultServer });
expect(result.kind).toBe("error");
- if (result.kind === "error") expect(result.message).toContain("--text");
+ if (result.kind === "error") expect(result.message).toContain("--text or --file");
});
it("requires a conversation id", () => {
@@ -386,6 +444,12 @@ describe("parseArgs", () => {
const result = parseArgs(["send", "deadbeef", "--text"], { defaultServer });
expect(result.kind).toBe("error");
});
+
+ it("errors when --file has no value", () => {
+ const result = parseArgs(["send", "deadbeef", "--file"], { defaultServer });
+ expect(result.kind).toBe("error");
+ if (result.kind === "error") expect(result.message).toContain("--file requires a value");
+ });
});
describe("open", () => {
diff --git a/packages/cli/src/args.ts b/packages/cli/src/args.ts
index 8a63777..74cc56a 100644
--- a/packages/cli/src/args.ts
+++ b/packages/cli/src/args.ts
@@ -33,6 +33,7 @@ export type ParsedCommand =
readonly server: string;
readonly query?: string;
readonly status?: string;
+ readonly workspaceId?: string;
readonly all: boolean;
}
| { readonly kind: "compact"; readonly server: string; readonly conversationId: string }
@@ -42,7 +43,8 @@ export type ParsedCommand =
readonly kind: "send";
readonly server: string;
readonly conversationId: string;
- readonly text: string;
+ readonly text?: string | undefined;
+ readonly file?: string | undefined;
readonly queue: boolean;
readonly open: boolean;
readonly cwd?: string;
@@ -84,6 +86,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
let server = opts.defaultServer;
let query: string | undefined;
let status: string | undefined;
+ let workspaceId: string | undefined;
let all = false;
for (let i = 1; i < argv.length; i++) {
const arg = argv[i] as string;
@@ -93,6 +96,9 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
} else if (arg === "--status") {
if (i + 1 >= argv.length) return { kind: "error", message: "--status requires a value" };
status = argv[++i];
+ } else if (arg === "--workspace" || arg === "-w") {
+ if (i + 1 >= argv.length) return { kind: "error", message: "--workspace requires a value" };
+ workspaceId = argv[++i];
} else if (arg === "--all") {
all = true;
} else if (arg.startsWith("--")) {
@@ -108,6 +114,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
server,
...(query !== undefined && { query }),
...(status !== undefined && { status }),
+ ...(workspaceId !== undefined && { workspaceId }),
all,
};
}
@@ -204,6 +211,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
let server = opts.defaultServer;
let conversationId: string | undefined;
let text: string | undefined;
+ let file: string | undefined;
let queue = false;
let open = false;
let cwd: string | undefined;
@@ -221,6 +229,10 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
if (i + 1 >= argv.length) return { kind: "error", message: "--text requires a value" };
text = argv[++i];
break;
+ case "--file":
+ if (i + 1 >= argv.length) return { kind: "error", message: "--file requires a value" };
+ file = argv[++i];
+ break;
case "--queue":
queue = true;
break;
@@ -263,8 +275,11 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
if (conversationId === undefined) {
return { kind: "error", message: "'send' requires a conversation id" };
}
- if (text === undefined) {
- return { kind: "error", message: "'send' requires --text" };
+ if (!text && !file) {
+ return {
+ kind: "error",
+ message: "At least one of --text or --file is required for 'send'",
+ };
}
return {
@@ -272,6 +287,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
server,
conversationId,
text,
+ file,
queue,
open,
...(cwd !== undefined && { cwd }),
diff --git a/packages/cli/src/http.test.ts b/packages/cli/src/http.test.ts
index 2aa61e9..ab39813 100644
--- a/packages/cli/src/http.test.ts
+++ b/packages/cli/src/http.test.ts
@@ -289,6 +289,36 @@ describe("fetchConversations", () => {
expect(calledUrl).toBe("http://localhost:24203/conversations?q=abc+def");
});
+ it("appends ?workspaceId=<value> when a workspaceId is given", async () => {
+ let calledUrl: string | undefined;
+ const fakeFetch = (async (url: string | URL | Request): Promise<Response> => {
+ calledUrl = String(url);
+ return new Response(JSON.stringify({ conversations: [] }), { status: 200 });
+ }) as unknown as typeof fetch;
+
+ await fetchConversations(
+ { fetchImpl: fakeFetch },
+ { server: "http://localhost:24203", workspaceId: "proj" },
+ );
+ expect(calledUrl).toBe("http://localhost:24203/conversations?workspaceId=proj");
+ });
+
+ it("combines ?status= and ?workspaceId= when both are given", async () => {
+ let calledUrl: string | undefined;
+ const fakeFetch = (async (url: string | URL | Request): Promise<Response> => {
+ calledUrl = String(url);
+ return new Response(JSON.stringify({ conversations: [] }), { status: 200 });
+ }) as unknown as typeof fetch;
+
+ await fetchConversations(
+ { fetchImpl: fakeFetch },
+ { server: "http://localhost:24203", status: "active,idle", workspaceId: "proj" },
+ );
+ expect(calledUrl).toBe(
+ "http://localhost:24203/conversations?status=active%2Cidle&workspaceId=proj",
+ );
+ });
+
it("throws on non-OK status", async () => {
const fakeFetch = (async (): Promise<Response> =>
new Response("boom", { status: 500 })) as unknown as typeof fetch;
diff --git a/packages/cli/src/http.ts b/packages/cli/src/http.ts
index 42fcfec..e13842a 100644
--- a/packages/cli/src/http.ts
+++ b/packages/cli/src/http.ts
@@ -98,6 +98,7 @@ interface FetchConversationsOpts {
readonly server: string;
readonly query?: string;
readonly status?: string;
+ readonly workspaceId?: string;
}
export async function fetchConversations(
@@ -107,6 +108,7 @@ export async function fetchConversations(
const params = new URLSearchParams();
if (opts.query !== undefined) params.set("q", opts.query);
if (opts.status !== undefined) params.set("status", opts.status);
+ if (opts.workspaceId !== undefined) params.set("workspaceId", opts.workspaceId);
const qs = params.toString();
const url = qs.length > 0 ? `${opts.server}/conversations?${qs}` : `${opts.server}/conversations`;
const res = await deps.fetchImpl(url);
diff --git a/packages/cli/src/main.ts b/packages/cli/src/main.ts
index 9dfc317..cba0de7 100644
--- a/packages/cli/src/main.ts
+++ b/packages/cli/src/main.ts
@@ -24,12 +24,12 @@ import { extractLastText, formatConversationList, renderEvent } from "./render.j
const USAGE = `Usage:
dispatch models [--server <url>]
- dispatch list [<prefix>] [--status <active|idle|closed>] [--all] [--server <url>]
+ dispatch list [<prefix>] [--status <active|idle|closed>] [--workspace <id>] [--all] [--server <url>]
dispatch stop <conversationId> [--server <url>]
dispatch compact <conversationId> [--server <url>]
dispatch read <conversationId> [--server <url>]
dispatch open <conversationId> [--server <url>]
- dispatch send <conversationId> --text "..." [--queue] [--open] [--cwd <dir>] [--effort <level>] [--workspace <id>] [--server <url>]
+ dispatch send <conversationId> --text "..." [--file <path>] [--queue] [--open] [--cwd <dir>] [--effort <level>] [--workspace <id>] [--server <url>]
dispatch <modelName> --text "..." [--file <path>] [--cwd <dir>] [--conversation <id>] [--effort <level>] [--workspace <id>] [--server <url>] [--show-reasoning] [--open]
dispatch --help
@@ -61,6 +61,7 @@ async function main(): Promise<void> {
server: parsed.server,
...(parsed.query !== undefined && { query: parsed.query }),
...(status !== undefined && { status }),
+ ...(parsed.workspaceId !== undefined && { workspaceId: parsed.workspaceId }),
},
);
const table = formatConversationList(result.conversations, Date.now());
@@ -156,10 +157,20 @@ async function main(): Promise<void> {
process.stdout.write(`Signaled frontend to open ${conversationId}\n`);
}
+ let fileContent: string | undefined;
+ if (parsed.file) {
+ fileContent = await readFile(parsed.file, "utf-8");
+ }
+ const message = composeMessage({
+ ...(parsed.text !== undefined && { text: parsed.text }),
+ ...(parsed.file !== undefined && { file: parsed.file }),
+ ...(fileContent !== undefined && { fileContent }),
+ });
+
if (parsed.queue) {
const queued = await enqueueMessage(
{ fetchImpl: globalThis.fetch },
- { server: parsed.server, conversationId, text: parsed.text },
+ { server: parsed.server, conversationId, text: message },
);
const line = queued.startedTurn
? `Started turn for ${conversationId}`
@@ -168,7 +179,7 @@ async function main(): Promise<void> {
} else {
const request = {
conversationId,
- message: parsed.text,
+ message,
...(parsed.cwd !== undefined && { cwd: parsed.cwd }),
...(parsed.reasoningEffort !== undefined && { reasoningEffort: parsed.reasoningEffort }),
...(parsed.workspaceId !== undefined && { workspaceId: parsed.workspaceId }),
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts
index 6c9652d..dca34c2 100644
--- a/packages/kernel/src/contracts/events.ts
+++ b/packages/kernel/src/contracts/events.ts
@@ -11,6 +11,7 @@ export type {
TurnDoneEvent,
TurnErrorEvent,
TurnInputEvent,
+ TurnProviderRetryEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index c67607b..f3e5bca 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -40,6 +40,7 @@ export type {
TurnDoneEvent,
TurnErrorEvent,
TurnInputEvent,
+ TurnProviderRetryEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
@@ -109,6 +110,7 @@ export type {
export type {
EventEmitter,
FinishReason,
+ RetryStrategy,
RunTurnInput,
RunTurnResult,
} from "./runtime.js";
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts
index 03e62c2..dc74c84 100644
--- a/packages/kernel/src/contracts/runtime.ts
+++ b/packages/kernel/src/contracts/runtime.ts
@@ -140,6 +140,22 @@ export interface RunTurnInput {
* double-persist them.
*/
readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void;
+
+ /**
+ * Optional injected retry strategy for retryable provider errors (e.g. HTTP
+ * 429 / 5xx "overloaded"). When omitted, a retryable error ends the step
+ * exactly as before (backward-compatible). When provided, the runtime wraps
+ * `provider.stream()` consumption in a retry loop: on a retryable error
+ * (an emitted `error` ProviderEvent with `retryable === true`, OR a thrown
+ * error) — ONLY when no content was emitted yet this step (the safety
+ * invariant — never duplicate partial output) — it asks `retry.delayFor`
+ * for a delay, emits a transient `provider-retry` AgentEvent, sleeps via the
+ * injected `retry.sleep` (abortable), and re-calls `provider.stream()`.
+ *
+ * Injected (not ambient): the kernel imports no timer and owns no schedule.
+ * Mirrors the `now`/`logger` injection pattern — optional + backward-compatible.
+ */
+ readonly retry?: RetryStrategy;
}
/**
@@ -156,3 +172,31 @@ export interface RunTurnResult {
/** Why the turn ended. */
readonly finishReason: FinishReason;
}
+
+/**
+ * Injected retry strategy for retryable provider errors (e.g. HTTP 429 / 5xx).
+ *
+ * The kernel provides the HOOK (this contract + the retry loop in `runTurn`);
+ * the shell (session-orchestrator) provides the POLICY (the concrete schedule)
+ * and the I/O (the actual sleep). The kernel imports no timer — `sleep` is an
+ * injected effect so the runtime stays pure and deterministic in tests.
+ *
+ * Retries are ONLY attempted when NO content was emitted yet this step (the
+ * safety invariant — never duplicate partial output). When omitted on
+ * `RunTurnInput`, no retry happens (backward-compatible: a retryable error ends
+ * the step exactly as before).
+ */
+export interface RetryStrategy {
+ /**
+ * Pure, deterministic decision: given the 0-based attempt index, return the
+ * delay in ms to sleep before the next retry, or `undefined` to stop (budget
+ * exhausted). No I/O, no clock — fully testable.
+ */
+ readonly delayFor: (attempt: number) => number | undefined;
+ /**
+ * Injected effect: actually sleep for the given ms. Must honor the abort
+ * signal — reject when aborted so the turn seals `aborted`. The kernel
+ * imports no timer; the shell provides a `setTimeout`-based implementation.
+ */
+ readonly sleep: (ms: number, signal: AbortSignal) => Promise<void>;
+}
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts
index b194577..5805e28 100644
--- a/packages/kernel/src/runtime/events.ts
+++ b/packages/kernel/src/runtime/events.ts
@@ -164,3 +164,17 @@ export function errorEvent(
}
return { type: "error", conversationId, turnId, message };
}
+
+export function providerRetryEvent(
+ conversationId: string,
+ turnId: string,
+ attempt: number,
+ delayMs: number,
+ message: string,
+ code?: string,
+): AgentEvent {
+ if (code !== undefined) {
+ return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message, code };
+ }
+ return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message };
+}
diff --git a/packages/kernel/src/runtime/index.ts b/packages/kernel/src/runtime/index.ts
index e1156e3..e0dd656 100644
--- a/packages/kernel/src/runtime/index.ts
+++ b/packages/kernel/src/runtime/index.ts
@@ -2,6 +2,7 @@ export type { StepDispatcher } from "./dispatch.js";
export { createStepDispatcher, executeToolCall } from "./dispatch.js";
export {
errorEvent,
+ providerRetryEvent,
reasoningDeltaEvent,
textDeltaEvent,
toolCallEvent,
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 08d3055..a9fc3d9 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -2886,4 +2886,539 @@ describe("runTurn", () => {
expect(drainCallCount).toBe(MAX_STEPS - 1);
});
});
+
+ // ── Retry with backoff ──────────────────────────────────────────────────
+ //
+ // PURE tests: a fake `sleep` (records calls, resolves instantly, can abort
+ // on a chosen call) + a pure `delayFor` (the canonical schedule + 8h budget).
+ // A stub `ProviderContract` whose `stream` yields a retryable error N times
+ // then a finish. ZERO mocks of `@dispatch/*` modules — effects injected.
+
+ /** The canonical backoff schedule (matches the orchestrator's concrete strategy). */
+ const RETRY_SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000];
+ const RETRY_TAIL_MS = 1_800_000; // 30m
+ const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; // 8h
+
+ /** Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). */
+ function cumulativeSleepMs(attempt: number): number {
+ let sum = 0;
+ for (let i = 0; i <= attempt; i++) {
+ sum += i < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[i] : RETRY_TAIL_MS;
+ }
+ return sum;
+ }
+
+ /** Pure, deterministic delay decision (no I/O, no clock). */
+ function delayFor(attempt: number): number | undefined {
+ const delay = attempt < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[attempt] : RETRY_TAIL_MS;
+ if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop
+ return delay;
+ }
+
+ /** The full schedule delayFor would emit (until budget exhausted). */
+ function fullSchedule(): number[] {
+ const result: number[] = [];
+ let attempt = 0;
+ while (true) {
+ const delay = delayFor(attempt);
+ if (delay === undefined) break;
+ result.push(delay);
+ attempt++;
+ }
+ return result;
+ }
+
+ /**
+ * Fake, controllable `sleep`: records every call's delay, resolves
+ * instantly (no real waiting), and can abort the controller on a chosen
+ * 1-based call index to simulate "abort during sleep".
+ */
+ function createFakeSleep(controller: AbortController): {
+ sleep: (ms: number, signal: AbortSignal) => Promise<void>;
+ calls: number[];
+ abortOnCall: (n: number) => void;
+ } {
+ const calls: number[] = [];
+ let abortAt: number | undefined;
+ const sleep = async (ms: number, _signal: AbortSignal): Promise<void> => {
+ calls.push(ms);
+ if (abortAt !== undefined && calls.length === abortAt) {
+ controller.abort();
+ throw new Error("aborted");
+ }
+ // Otherwise resolve instantly (no real waiting).
+ };
+ return {
+ sleep,
+ calls,
+ abortOnCall: (n: number) => {
+ abortAt = n;
+ },
+ };
+ }
+
+ /** A provider that yields a retryable error `errorCount` times, then success. */
+ function createRetryingProvider(opts: {
+ errorCount: number;
+ error?: { message: string; code?: string; retryable?: boolean };
+ success?: ProviderEvent[];
+ }): { provider: ProviderContract; streamCalls: { value: number } } {
+ const streamCalls = { value: 0 };
+ const error: ProviderEvent = {
+ type: "error",
+ message: opts.error?.message ?? "overloaded",
+ ...(opts.error?.code !== undefined ? { code: opts.error.code } : {}),
+ ...(opts.error?.retryable !== undefined ? { retryable: opts.error.retryable } : {}),
+ };
+ const success = opts.success ?? [
+ { type: "text-delta", delta: "hi" },
+ { type: "finish", reason: "stop" },
+ ];
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = streamCalls.value++;
+ return (async function* () {
+ if (idx < opts.errorCount) {
+ yield error;
+ return;
+ }
+ for (const event of success) yield event;
+ })();
+ },
+ };
+ return { provider, streamCalls };
+ }
+
+ describe("retry with backoff", () => {
+ it("retries a retryable emitted error on schedule then succeeds", async () => {
+ const { provider } = createRetryingProvider({
+ errorCount: 3,
+ error: { message: "HTTP 429: overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("stop");
+ // 3 retries: 5s, 10s, 30s.
+ expect(fake.calls).toEqual([5_000, 10_000, 30_000]);
+ // 3 provider-retry events (one per sleep), then the successful text.
+ const retryEvents = events.filter((e) => e.type === "provider-retry");
+ expect(retryEvents).toHaveLength(3);
+ if (retryEvents[0]?.type === "provider-retry") {
+ expect(retryEvents[0].attempt).toBe(0);
+ expect(retryEvents[0].delayMs).toBe(5_000);
+ expect(retryEvents[0].message).toBe("HTTP 429: overloaded");
+ expect(retryEvents[0].code).toBe("429");
+ expect(retryEvents[0].conversationId).toBe("conv-1");
+ expect(retryEvents[0].turnId).toBe("turn-1");
+ }
+ if (retryEvents[1]?.type === "provider-retry") {
+ expect(retryEvents[1].attempt).toBe(1);
+ expect(retryEvents[1].delayMs).toBe(10_000);
+ }
+ if (retryEvents[2]?.type === "provider-retry") {
+ expect(retryEvents[2].attempt).toBe(2);
+ expect(retryEvents[2].delayMs).toBe(30_000);
+ }
+ // The error was suppressed (no error event emitted — retry succeeded).
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ // The successful content still streams.
+ const deltas = events.filter((e) => e.type === "text-delta");
+ expect(deltas).toHaveLength(1);
+ });
+
+ it("sleep is called with the full schedule [5s,10s,30s,60s,5m,10m,15m,30m,30m…]", async () => {
+ // Provider errors forever → retries until budget exhausted → gives up.
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ // Budget exhausted → give up → error.
+ expect(result.finishReason).toBe("error");
+
+ // The sleep schedule matches the pure delayFor output exactly.
+ expect(fake.calls).toEqual(fullSchedule());
+
+ // Head of the schedule (the 8 stepped delays).
+ expect(fake.calls.slice(0, 8)).toEqual([
+ 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000,
+ ]);
+ // Tail repeats 30m.
+ expect(fake.calls[8]).toBe(1_800_000);
+ expect(fake.calls.at(-1)).toBe(1_800_000);
+
+ // 8h cumulative budget cap: head (3705s) + 13×30m = ~7h31m, then stop.
+ // 21 retries (attempts 0..20), then delayFor(21) → undefined → give up.
+ expect(fake.calls).toHaveLength(21);
+ const totalSlept = fake.calls.reduce((a, b) => a + b, 0);
+ expect(totalSlept).toBeLessThanOrEqual(RETRY_BUDGET_MS);
+ expect(totalSlept).toBe(3_705_000 + 13 * 1_800_000); // 27_105_000
+
+ // One provider-retry per sleep, plus a final error (give-up).
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(21);
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ const errEvt = events.find((e) => e.type === "error");
+ if (errEvt?.type === "error") {
+ expect(errEvt.message).toBe("overloaded");
+ expect(errEvt.code).toBe("429");
+ }
+ });
+
+ it("does NOT retry after content was emitted (safety invariant)", async () => {
+ // Provider yields text (content) THEN a retryable error. Because content
+ // was emitted, retrying is unsafe (would duplicate partial output).
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ yield {
+ type: "error",
+ message: "overloaded",
+ code: "429",
+ retryable: true,
+ } as ProviderEvent;
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ // No retries: stream called exactly once.
+ expect(callCount).toBe(1);
+ expect(fake.calls).toHaveLength(0);
+ // The error is emitted (give-up) and partial content preserved.
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1);
+ });
+
+ it("does NOT retry a non-retryable emitted error (retryable: false)", async () => {
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "bad request", code: "400", retryable: false },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ });
+
+ it("does NOT retry a non-retryable emitted error (retryable absent)", async () => {
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "bad request", code: "400" }, // no retryable field
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ });
+
+ it("give-up emits the final error when budget is exhausted", async () => {
+ // Custom delayFor that allows exactly 1 retry then stops.
+ const shortDelayFor = (attempt: number): number | undefined =>
+ attempt === 0 ? 100 : undefined;
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor: shortDelayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("error");
+ expect(fake.calls).toEqual([100]); // one retry, then give up
+ // One provider-retry (attempt 0), then the final error.
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(1);
+ const errs = events.filter((e) => e.type === "error");
+ expect(errs).toHaveLength(1);
+ if (errs[0]?.type === "error") {
+ expect(errs[0].message).toBe("overloaded");
+ expect(errs[0].code).toBe("429");
+ }
+ });
+
+ it("abort during sleep seals the turn aborted", async () => {
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+ fake.abortOnCall(2); // abort on the 2nd sleep
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("aborted");
+ // Two sleeps attempted; the 2nd aborted.
+ expect(fake.calls).toHaveLength(2);
+ // No terminal error emitted (it was an abort, not a give-up).
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ // One provider-retry before the aborted sleep (attempt 0).
+ const retries = events.filter((e) => e.type === "provider-retry");
+ expect(retries).toHaveLength(2);
+ // The done event carries reason "aborted".
+ const done = events.find((e) => e.type === "done");
+ if (done?.type === "done") {
+ expect(done.reason).toBe("aborted");
+ }
+ });
+
+ it("omitting retry keeps the pre-retry behavior (backward-compatible)", async () => {
+ // A retryable error with no retry configured → ends the step as today.
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ // no retry field
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ });
+
+ it("retries a THROWN error (retryable-by-default when pre-content)", async () => {
+ // A thrown error (no retryable flag) before content is retried.
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ if (callCount <= 2) {
+ throw new Error("network blip");
+ }
+ yield { type: "text-delta", delta: "hi" } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(callCount).toBe(3); // 2 throws retried, 3rd succeeds
+ expect(fake.calls).toEqual([5_000, 10_000]);
+ expect(result.finishReason).toBe("stop");
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(2);
+ // Thrown errors have no code.
+ if (events[0]?.type === "provider-retry") {
+ expect(events[0].code).toBeUndefined();
+ expect(events[0].message).toBe("network blip");
+ }
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ });
+
+ it("does NOT retry a thrown error after content was emitted", async () => {
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ throw new Error("network blip");
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(callCount).toBe(1);
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1);
+ });
+
+ it("provider-retry events interleave correctly: error → retry-event → sleep → retry", async () => {
+ // Verify ordering: each provider-retry event comes BEFORE its sleep,
+ // and the successful content comes only after the last retry.
+ const { provider } = createRetryingProvider({
+ errorCount: 2,
+ error: { message: "overloaded", code: "429", retryable: true },
+ success: [
+ { type: "text-delta", delta: "ok" },
+ { type: "finish", reason: "stop" },
+ ],
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ const types = events.map((e) => e.type);
+ // turn-start, provider-retry(0), provider-retry(1), text-delta, step-complete, done
+ expect(types[0]).toBe("turn-start");
+ const firstRetryIdx = types.indexOf("provider-retry");
+ const textIdx = types.indexOf("text-delta");
+ expect(firstRetryIdx).toBeGreaterThan(0);
+ expect(textIdx).toBeGreaterThan(firstRetryIdx);
+ // Both retries precede the text.
+ const retryCount = types.filter((t) => t === "provider-retry").length;
+ expect(retryCount).toBe(2);
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 940c77f..ac87a1f 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -6,12 +6,18 @@ import type {
ProviderStreamOptions,
Usage,
} from "../contracts/provider.js";
-import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js";
+import type {
+ EventEmitter,
+ RetryStrategy,
+ RunTurnInput,
+ RunTurnResult,
+} from "../contracts/runtime.js";
import type { ToolCall, ToolContract } from "../contracts/tool.js";
import { createStepDispatcher, type StepDispatcher } from "./dispatch.js";
import {
doneEvent,
errorEvent,
+ providerRetryEvent,
reasoningDeltaEvent,
stepCompleteEvent,
textDeltaEvent,
@@ -121,6 +127,8 @@ interface StepContext {
readonly now: (() => number) | undefined;
/** Per-turn provider options (model, systemPrompt, …) threaded to stream(). */
readonly providerOpts: ProviderStreamOptions | undefined;
+ /** Optional injected retry strategy (omit = no retry, backward-compatible). */
+ readonly retry: RetryStrategy | undefined;
}
interface TimingState {
@@ -250,12 +258,10 @@ function processEvent(
case "finish":
break;
case "error":
- if (event.code !== undefined) {
- chunks.push({ type: "error", message: event.message, code: event.code });
- } else {
- chunks.push({ type: "error", message: event.message });
- }
- ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, event.message, event.code));
+ // Handled by the retry loop in executeStep (not here): an error event
+ // is intercepted before processEvent so the step can decide whether to
+ // retry (suppressing the error) or give up (emit it). processEvent
+ // never receives an "error" event.
break;
}
}
@@ -316,34 +322,142 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
// Swallow — D7.
}
- try {
- const opts: ProviderStreamOptions = {
- ...ctx.providerOpts,
- ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}),
- };
- const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
- for await (const event of stream) {
- if (ctx.signal.aborted) break;
- processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes);
- if (event.type === "usage") {
- stepUsage = addUsage(stepUsage, event.usage);
+ // Retry loop: wrap provider.stream() consumption. Retries are ONLY
+ // attempted when no content was emitted yet this step (the safety
+ // invariant — never duplicate partial output). On a retryable error —
+ // either an EMITTED `error` ProviderEvent with `retryable === true`, OR a
+ // THROWN error (retryable-by-default when pre-content) — with !hadContent:
+ // ask retry.delayFor(attempt); if it returns a delay → emit a transient
+ // provider-retry AgentEvent, sleep via the injected retry.sleep (abortable),
+ // attempt++, re-call provider.stream(); if it returns undefined (budget
+ // exhausted) → give up. Non-retryable emitted errors (retryable === false or
+ // absent), errors after content, and the no-retry-configured case all fall
+ // through to "give up" — identical to the pre-retry behavior.
+ let hadContent = false;
+ let attempt = 0;
+ while (true) {
+ let errored = false;
+ let wasThrown = false;
+ let errorMessage: string | undefined;
+ let errorCode: string | undefined;
+ let errorRetryable: boolean | undefined;
+ let thrownErr: unknown;
+
+ try {
+ const opts: ProviderStreamOptions = {
+ ...ctx.providerOpts,
+ ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}),
+ };
+ const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
+ for await (const event of stream) {
+ if (ctx.signal.aborted) break;
+ if (event.type === "error") {
+ // Intercept: hold for the retry decision — don't push a chunk
+ // or emit yet (a successful retry would leave a stale error).
+ errored = true;
+ errorMessage = event.message;
+ errorCode = event.code;
+ errorRetryable = event.retryable;
+ break;
+ }
+ if (
+ event.type === "text-delta" ||
+ event.type === "reasoning-delta" ||
+ event.type === "tool-call" ||
+ event.type === "usage"
+ ) {
+ hadContent = true;
+ }
+ processEvent(
+ event,
+ chunks,
+ toolCalls,
+ dispatcher,
+ ctx,
+ stepSpan,
+ timing,
+ toolDispatchTimes,
+ );
+ if (event.type === "usage") {
+ stepUsage = addUsage(stepUsage, event.usage);
+ }
+ if (event.type === "finish") {
+ finishReason = event.reason;
+ }
}
- if (event.type === "finish") {
- finishReason = event.reason;
+ } catch (err) {
+ errored = true;
+ wasThrown = true;
+ errorMessage = err instanceof Error ? err.message : String(err);
+ errorCode = undefined;
+ errorRetryable = undefined;
+ thrownErr = err;
+ }
+
+ // Abort (during stream) → stop; the runTurn loop seals aborted.
+ if (ctx.signal.aborted) {
+ break;
+ }
+
+ // No error → step succeeded.
+ if (!errored) {
+ break;
+ }
+
+ // Retryable? A thrown error is retryable-by-default when pre-content;
+ // an emitted error is retryable ONLY when `retryable === true` (absent
+ // or false → not retried, per the contract).
+ const isRetryable = wasThrown ? true : errorRetryable === true;
+ if (ctx.retry !== undefined && !hadContent && isRetryable) {
+ const delay = ctx.retry.delayFor(attempt);
+ if (delay !== undefined) {
+ // Emit the transient provider-retry event BEFORE the sleep so the
+ // UI shows "⚠ retrying in Ns…" immediately. Not persisted as a
+ // chat message — it never pollutes the prompt.
+ ctx.emit(
+ providerRetryEvent(
+ ctx.conversationId,
+ ctx.turnId,
+ attempt,
+ delay,
+ errorMessage ?? "",
+ errorCode,
+ ),
+ );
+ // Abortable sleep. If the signal fires during sleep, the shell's
+ // sleep rejects — we catch it and break so the turn seals aborted.
+ try {
+ await ctx.retry.sleep(delay, ctx.signal);
+ } catch {
+ // Abort during sleep (or unexpected sleep failure).
+ }
+ if (ctx.signal.aborted) {
+ break;
+ }
+ attempt++;
+ continue;
}
+ // delayFor returned undefined → budget exhausted → give up.
+ }
+
+ // Give up: emit the suppressed error and end the step. This is the
+ // single emission point for a terminal provider error (non-retryable,
+ // post-content, budget-exhausted, or no-retry-configured).
+ const message = errorMessage ?? "";
+ if (errorCode !== undefined) {
+ chunks.push({ type: "error", message, code: errorCode });
+ } else {
+ chunks.push({ type: "error", message });
}
- } catch (err) {
- const message = err instanceof Error ? err.message : String(err);
- chunks.push({ type: "error", message });
- ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message));
+ ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message, errorCode));
finishReason = "error";
- // Close step span with error
try {
- stepSpan?.end({ err });
+ stepSpan?.end({ err: thrownErr ?? new Error(message) });
} catch {
// Swallow — D7.
}
stepSpan = undefined;
+ break;
}
// Close timing spans: if no first token was seen, end ttft with firstToken: false
@@ -527,6 +641,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
computerId: input.computerId,
now,
providerOpts: input.providerOpts,
+ retry: input.retry,
});
totalUsage = addUsage(totalUsage, stepResult.usage);
diff --git a/packages/lsp/src/aggregate.test.ts b/packages/lsp/src/aggregate.test.ts
new file mode 100644
index 0000000..4579a0a
--- /dev/null
+++ b/packages/lsp/src/aggregate.test.ts
@@ -0,0 +1,141 @@
+import { describe, expect, it } from "vitest";
+import { type AggregateServer, aggregateDiagnostics } from "./aggregate.js";
+import type { LanguageServerClient } from "./client.js";
+
+/**
+ * A minimal fake client: only `waitForDiagnostics` is exercised by
+ * aggregateDiagnostics, so we stub just that. Cast to the real type (mirrors
+ * tool.test.ts) — no real process, no internal mocks of our own modules.
+ */
+function fakeClient(
+ waitForDiagnostics: LanguageServerClient["waitForDiagnostics"],
+): LanguageServerClient {
+ return { waitForDiagnostics } as unknown as LanguageServerClient;
+}
+
+const SERVER_A: AggregateServer = { id: "a", name: "Ruby-LSP", root: "/p" };
+const SERVER_B: AggregateServer = { id: "b", name: "Steep", root: "/p" };
+
+describe("aggregateDiagnostics", () => {
+ it("returns merged diagnostics from all responding servers, tagged by source", async () => {
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async () => ({ formatted: "ERROR L1:1: boom", slow: false, timedOut: false })),
+ ],
+ [
+ "b",
+ fakeClient(async () => ({ formatted: "WARNING L2:3: meh", slow: false, timedOut: false })),
+ ],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ expect(result.timedOut).toBe(false);
+ expect(result.formatted).toContain("[Ruby-LSP]");
+ expect(result.formatted).toContain("boom");
+ expect(result.formatted).toContain("[Steep]");
+ expect(result.formatted).toContain("meh");
+ });
+
+ it("skips a server that times out with a raise-to-user notice, and still returns the fast server's result", async () => {
+ // Steep never resolves within the cap → timedOut; ruby-lsp answers fast.
+ const clients = new Map<string, LanguageServerClient>([
+ ["a", fakeClient(async () => ({ formatted: "", slow: false, timedOut: false }))],
+ ["b", fakeClient(async () => ({ formatted: "", slow: false, timedOut: true }))],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ expect(result.timedOut).toBe(true);
+ // The skip notice names the offending server and the cap.
+ expect(result.formatted).toContain("[Steep]");
+ expect(result.formatted).toContain("took too long");
+ expect(result.formatted).toContain(">10s");
+ expect(result.formatted).toContain("raise this to the user");
+ // ruby-lsp answered cleanly (empty diagnostics) → no line for it.
+ expect(result.formatted).not.toContain("[Ruby-LSP]");
+ });
+
+ it("runs servers concurrently: a slow server does not delay a fast one's contribution order", async () => {
+ const callOrder: string[] = [];
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async () => {
+ callOrder.push("a-start");
+ await new Promise((r) => setTimeout(r, 5));
+ callOrder.push("a-end");
+ return { formatted: "from-a", slow: false, timedOut: false };
+ }),
+ ],
+ [
+ "b",
+ fakeClient(async () => {
+ callOrder.push("b-start");
+ await new Promise((r) => setTimeout(r, 30));
+ callOrder.push("b-end");
+ return { formatted: "from-b", slow: false, timedOut: false };
+ }),
+ ],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ // Both started before either ended → concurrent, not sequential.
+ expect(callOrder.slice(0, 2).sort()).toEqual(["a-start", "b-start"]);
+ expect(result.formatted).toContain("from-a");
+ expect(result.formatted).toContain("from-b");
+ });
+
+ it("a missing client (dead/excluded) contributes nothing and never rejects", async () => {
+ const result = await aggregateDiagnostics(() => undefined, [SERVER_A], "/p/x.rb", 10_000, {});
+ expect(result.formatted).toBe("");
+ expect(result.timedOut).toBe(false);
+ });
+
+ it("forwards text + minSeverity to each client's waitForDiagnostics", async () => {
+ const seen: Array<{ text?: string; minSeverity?: number; timeoutMs: number }> = [];
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async (_path, opts) => {
+ seen.push({
+ text: opts?.text,
+ minSeverity: opts?.minSeverity,
+ timeoutMs: opts?.timeoutMs ?? -1,
+ });
+ return { formatted: "", slow: false, timedOut: false };
+ }),
+ ],
+ ]);
+
+ await aggregateDiagnostics((id) => clients.get(id), [SERVER_A], "/p/x.rb", 7000, {
+ text: "post-edit buffer",
+ minSeverity: 2,
+ });
+
+ expect(seen).toHaveLength(1);
+ expect(seen[0]?.text).toBe("post-edit buffer");
+ expect(seen[0]?.minSeverity).toBe(2);
+ expect(seen[0]?.timeoutMs).toBe(7000);
+ });
+});
diff --git a/packages/lsp/src/aggregate.ts b/packages/lsp/src/aggregate.ts
new file mode 100644
index 0000000..b044e21
--- /dev/null
+++ b/packages/lsp/src/aggregate.ts
@@ -0,0 +1,80 @@
+/**
+ * Concurrent multi-server diagnostics aggregation.
+ *
+ * Queries every matching language server AT ONCE (not one-at-a-time), each
+ * capped at `timeoutMs`. A server that doesn't push diagnostics within the cap
+ * is SKIPPED with a per-server notice rather than blocking the others — so one
+ * slow/dead server (e.g. a corrupted Steep) can't hold up the fast one's
+ * (ruby-lsp) results for the full timeout on every edit.
+ *
+ * The only I/O here is `client.waitForDiagnostics` (injected via `getClient`),
+ * so this is unit-testable with a fake client and no real process.
+ */
+
+import type { LanguageServerClient } from "./client.js";
+
+export interface AggregateServer {
+ readonly id: string;
+ readonly name: string;
+ readonly root: string;
+}
+
+export interface AggregateOpts {
+ /** Post-edit buffer; when omitted the server reads from disk. */
+ readonly text?: string | undefined;
+ /** Only include diagnostics with severity ≤ this (1=Error, 2=Warning). */
+ readonly minSeverity?: number | undefined;
+}
+
+export interface AggregateResult {
+ /** Merged diagnostics tagged by source + a per-skipped-server notice. */
+ readonly formatted: string;
+ /** True if at least one server was skipped for exceeding the cap. */
+ readonly timedOut: boolean;
+}
+
+/**
+ * Query `servers` concurrently, each capped at `timeoutMs`. Returns merged
+ * diagnostics tagged by source (`[name]\n…`) and, for any server that did not
+ * respond in time, a `⚠️ [name] LSP took too long (>Ns), skipped — please raise
+ * this to the user.` notice. Never rejects: a client error yields an empty
+ * contribution for that server.
+ */
+export async function aggregateDiagnostics(
+ getClient: (id: string, root: string) => LanguageServerClient | undefined,
+ servers: readonly AggregateServer[],
+ absolutePath: string,
+ timeoutMs: number,
+ opts: AggregateOpts,
+): Promise<AggregateResult> {
+ const entries = await Promise.all(
+ servers.map(async (server) => {
+ const client = getClient(server.id, server.root);
+ if (!client) return null;
+ const waitOpts: { text?: string; timeoutMs: number; minSeverity?: number } = { timeoutMs };
+ if (opts.text !== undefined) waitOpts.text = opts.text;
+ if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity;
+ const result = await client.waitForDiagnostics(absolutePath, waitOpts);
+ return { server, result };
+ }),
+ );
+
+ const parts: string[] = [];
+ let timedOut = false;
+ const capSeconds = Math.round(timeoutMs / 1000);
+
+ for (const entry of entries) {
+ if (!entry) continue;
+ const { server, result } = entry;
+ if (result.timedOut) {
+ timedOut = true;
+ parts.push(
+ `⚠️ [${server.name}] LSP took too long (>${capSeconds}s), diagnostics skipped — please raise this to the user.`,
+ );
+ } else if (result.formatted) {
+ parts.push(`[${server.name}]\n${result.formatted}`);
+ }
+ }
+
+ return { formatted: parts.join("\n\n"), timedOut };
+}
diff --git a/packages/lsp/src/client.test.ts b/packages/lsp/src/client.test.ts
index 681860f..338ef0b 100644
--- a/packages/lsp/src/client.test.ts
+++ b/packages/lsp/src/client.test.ts
@@ -3,6 +3,7 @@ import {
type FileWatcher,
type FsAccess,
LanguageServerClient,
+ type ProcessExitHandler,
type SpawnProcess,
} from "./client.js";
import { encode } from "./framing.js";
@@ -288,4 +289,149 @@ describe("client", () => {
client.shutdown();
expect(state.killed).toBe(true);
});
+
+ it("onExit marks the client broken (error) so callers stop querying a corpse", async () => {
+ const state = { killed: false };
+ let exitCb: ProcessExitHandler | null = null;
+ const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null };
+
+ const spawnWithExit: SpawnProcess = () => ({
+ stdin: { write: () => {} },
+ stdout: {
+ on: (_event: string, cb: (data: Uint8Array) => void) => {
+ stdoutHolder.cb = cb;
+ },
+ },
+ pid: 999,
+ kill: () => {
+ state.killed = true;
+ },
+ onExit: (handler) => {
+ exitCb = handler;
+ },
+ });
+
+ const { client } = makeClient({ spawn: spawnWithExit });
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ stdoutHolder.cb?.(
+ encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })),
+ );
+ await startPromise;
+ expect(client.getState()).toBe("connected");
+
+ // Simulate the process dying (user kill / crash).
+ exitCb?.({ code: 1 });
+
+ expect(client.getState()).toBe("error");
+ expect(client.getStateError()).toMatch(/process exited/);
+ // The (still-alive-in-test) process was killed to avoid a zombie.
+ expect(state.killed).toBe(true);
+ });
+
+ it("a dead client is skipped by waitForDiagnostics callers (state !== connected)", async () => {
+ // Build a client, connect, kill via onExit, then assert a diagnostics
+ // query would not block: getState() is "error" so the matching filter
+ // (state === "connected") excludes it. We assert the state guard.
+ const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null };
+ let exitCb: ProcessExitHandler | null = null;
+ const spawnWithExit: SpawnProcess = () => ({
+ stdin: { write: () => {} },
+ stdout: {
+ on: (_e, cb) => {
+ stdoutHolder.cb = cb;
+ },
+ },
+ pid: 1,
+ kill: () => {},
+ onExit: (handler) => {
+ exitCb = handler;
+ },
+ });
+
+ const { client } = makeClient({ spawn: spawnWithExit });
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ stdoutHolder.cb?.(
+ encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })),
+ );
+ await startPromise;
+
+ exitCb?.({ code: null });
+ expect(client.getState()).toBe("error");
+ // The aggregate / getDiagnostics matching filter requires "connected".
+ expect(client.getState() === "connected").toBe(false);
+ });
+
+ it("corruption detector marks the client broken after repeated identical diagnostics despite text changes", async () => {
+ // A healthy server would change diagnostics as the file changes; a
+ // corrupted one re-emits the SAME non-empty set. Drive 5 edits with
+ // different text but identical diagnostics → client flips to error.
+ const { client, serverResponses } = makeClient();
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } }));
+ await startPromise;
+
+ const phantom = JSON.stringify({
+ jsonrpc: "2.0",
+ method: "textDocument/publishDiagnostics",
+ params: {
+ uri: "file:///project/game.rb",
+ diagnostics: [
+ {
+ range: { start: { line: 0, character: 27 }, end: { line: 0, character: 28 } },
+ severity: 1,
+ message: "SyntaxError: unexpected token",
+ },
+ ],
+ },
+ });
+
+ const path = "/project/game.rb";
+ // The first call establishes the baseline snapshot (no increment).
+ // Each subsequent call with identical diagnostics + changed text
+ // increments; the 6th call (5th increment) trips the threshold.
+ for (let i = 1; i <= 5; i++) {
+ const p = client.waitForDiagnostics(path, { text: `buf-v${i}`, timeoutMs: 2000 });
+ // Push the identical phantom diagnostics so the poll resolves.
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(phantom);
+ await p;
+ expect(client.getState()).toBe("connected");
+ }
+ // 6th identical-across-changed-text repeat trips the threshold.
+ const p6 = client.waitForDiagnostics(path, { text: "buf-v6", timeoutMs: 2000 });
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(phantom);
+ await p6;
+
+ expect(client.getState()).toBe("error");
+ expect(client.getStateError()).toMatch(/repeated stale diagnostics/i);
+ });
+
+ it("corruption detector does NOT trip on a clean file (empty diagnostics stay identical)", async () => {
+ const { client, serverResponses } = makeClient();
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } }));
+ await startPromise;
+
+ const clean = JSON.stringify({
+ jsonrpc: "2.0",
+ method: "textDocument/publishDiagnostics",
+ params: { uri: "file:///project/game.rb", diagnostics: [] },
+ });
+ const path = "/project/game.rb";
+
+ for (let i = 1; i <= 6; i++) {
+ const p = client.waitForDiagnostics(path, { text: `clean-v${i}`, timeoutMs: 2000 });
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(clean);
+ await p;
+ }
+ // Empty diagnostics never count as "stale" — a clean file staying clean
+ // is normal, not corruption.
+ expect(client.getState()).toBe("connected");
+ });
});
diff --git a/packages/lsp/src/client.ts b/packages/lsp/src/client.ts
index 677a22a..ac7d025 100644
--- a/packages/lsp/src/client.ts
+++ b/packages/lsp/src/client.ts
@@ -4,13 +4,22 @@
* FileWatcher, and forwards matching disk changes.
*/
-import { DiagnosticsStore, type PublishDiagnosticsParams } from "./diagnostics.js";
+import { DiagnosticsStore, diagnosticKey, type PublishDiagnosticsParams } from "./diagnostics.js";
import { computeChangeRange } from "./diff.js";
import { FrameDecoder } from "./framing.js";
import { languageId as resolveLanguageId } from "./language.js";
import { JsonRpcConnection, type WriteFn } from "./rpc.js";
import { FileChangeType, WatchedFilesRegistry } from "./watched-files.js";
+/** Info delivered to an `onExit` handler when the child process terminates. */
+export interface ProcessExitInfo {
+ readonly code: number | null;
+ readonly signal?: string;
+}
+
+/** A handler registered to be called when the child process exits. */
+export type ProcessExitHandler = (info: ProcessExitInfo) => void;
+
export interface SpawnedProcess {
readonly stdin: { readonly write: (bytes: Uint8Array) => void };
readonly stdout:
@@ -22,6 +31,13 @@ export interface SpawnedProcess {
| undefined;
readonly pid: number | undefined;
readonly kill: () => void;
+ /**
+ * Register a handler fired when the child process exits (code|signal).
+ * Optional: when absent, death is detected via stdout-end instead. Wires
+ * Bun's `proc.exited` in production; tests invoke it directly to simulate
+ * a crash. Lets the client stop querying a dead server (no per-edit hang).
+ */
+ readonly onExit?: ((handler: ProcessExitHandler) => void) | undefined;
}
export type SpawnProcess = (
@@ -102,6 +118,18 @@ export class LanguageServerClient {
private openDocuments = new Map<string, { version: number; text: string }>();
/** Sync mode captured from the server's initialize capabilities: 1=Full, 2=Incremental. */
private textDocumentChange: 1 | 2 = 1;
+ /**
+ * Corruption detection: the last diagnostic-key set + synced text per URI.
+ * A healthy server's diagnostics change when the file changes; a corrupted
+ * one (e.g. Steep's ~3h phantom-SyntaxError drift) re-emits the identical
+ * non-empty set across edits. `staleRepeat` counts consecutive such repeats
+ * across URIs; at the threshold the client is marked broken (→ respawn).
+ */
+ private lastDiagSnapshot = new Map<string, { keys: Set<string>; text: string }>();
+ private staleRepeat = 0;
+ private static readonly STALE_REPEAT_THRESHOLD = 5;
+ /** Default timeout for outbound requests (hover/definition/references). */
+ private static readonly REQUEST_TIMEOUT_MS = 10_000;
constructor(deps: ClientDeps) {
this.deps = deps;
@@ -128,6 +156,12 @@ export class LanguageServerClient {
}
const proc = this.deps.spawn(this.deps.command as string[], spawnOpts);
this.process = proc;
+ // Detect process death so we stop querying a corpse (fixes the
+ // per-edit hang after a server is killed/crashes). onExit is the
+ // primary signal; stdout-end is the defence-in-depth fallback.
+ if (proc.onExit) {
+ proc.onExit((info) => this.handleExit(info));
+ }
const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes);
const rpc = new JsonRpcConnection(writeFn);
@@ -158,8 +192,11 @@ export class LanguageServerClient {
for await (const chunk of source) {
this.handleBytes(chunk);
}
+ // stdout closed — the process is gone (defence-in-depth alongside onExit,
+ // which some edges never call). Idempotent via handleExit's guard.
+ this.handleExit({ code: null });
} catch {
- // process exited
+ this.handleExit({ code: null });
}
})();
}
@@ -172,6 +209,65 @@ export class LanguageServerClient {
});
}
+ /**
+ * The server process exited (onExit or stdout-end). Transition to a broken
+ * state so callers skip it and the manager re-spawns after backoff — instead
+ * of polling a corpse for the full timeout on every edit. Idempotent.
+ */
+ private handleExit(info: ProcessExitInfo): void {
+ if (this.state === "error" || this.state === "not-started") return;
+ const detail = info.signal !== undefined ? `signal ${info.signal}` : `code ${info.code ?? "?"}`;
+ this.markBroken(`language server process exited (${detail})`);
+ }
+
+ /**
+ * Mark this client permanently broken: kill the process if still alive
+ * (corruption case), dispose the rpc (rejects pending requests), and drop
+ * edge handles. The manager's status() observes state:"error" and re-spawns
+ * after the bounded backoff. Called on process death AND on corruption.
+ */
+ private markBroken(reason: string): void {
+ if (this.state === "error") return;
+ this.state = "error";
+ this.stateError = reason;
+ this.fileWatcherHandle?.close();
+ this.fileWatcherHandle = null;
+ this.process?.kill();
+ this.process = null;
+ this.rpc?.dispose();
+ this.rpc = null;
+ }
+
+ /**
+ * Detect a server stuck re-emitting identical non-empty diagnostics
+ * despite the file content changing between calls — the signature of a
+ * corrupted parse/type-check state (e.g. Steep's ~3h phantom-SyntaxError
+ * drift, where a fresh CLI reports green on the same project). After
+ * STALE_REPEAT_THRESHOLD consecutive such repeats, mark the client broken
+ * so it is skipped + re-spawned. A clean file (empty diagnostics) or a
+ * genuinely changing diagnostic set resets the counter. Note the
+ * tradeoff: a real, unfixed error on an untouched line also "stays the
+ * same across edits", so this can false-positive on a healthy server —
+ * the threshold is set conservatively and the CLI type-check gate remains
+ * authoritative either way.
+ */
+ private detectStaleDiagnostics(uri: string, text: string): void {
+ const merged = this.diagnostics.getMerged(uri);
+ const keys = new Set(merged.map((d) => diagnosticKey(d)));
+ const prev = this.lastDiagSnapshot.get(uri);
+ if (prev && keys.size > 0 && setsEqual(keys, prev.keys) && text !== prev.text) {
+ this.staleRepeat++;
+ } else {
+ this.staleRepeat = 0;
+ }
+ this.lastDiagSnapshot.set(uri, { keys, text });
+ if (this.staleRepeat >= LanguageServerClient.STALE_REPEAT_THRESHOLD) {
+ this.markBroken(
+ "language server emitting repeated stale diagnostics despite file changes — likely corrupted; restarting",
+ );
+ }
+ }
+
private handleBytes(chunk: Uint8Array): void {
const messages = this.decoder.decode(chunk);
for (const msg of messages) {
@@ -403,26 +499,37 @@ export class LanguageServerClient {
await this.open(filePath);
}
- const slowThreshold = 10_000;
const start = Date.now();
- // Poll until the server pushes diagnostics (even empty = done) or timeout.
- return new Promise((resolve) => {
+ // Poll until the server pushes diagnostics (even empty = done) or the
+ // per-server cap elapses (then we skip it — see aggregateDiagnostics).
+ const received = await new Promise<boolean>((resolve) => {
const check = () => {
const elapsed = Date.now() - start;
- const received = this.diagnostics.hasReceivedPush(uri);
- if (received || elapsed >= timeoutMs) {
- resolve({
- formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity),
- slow: elapsed > slowThreshold,
- timedOut: !received,
- });
+ const got = this.diagnostics.hasReceivedPush(uri);
+ if (got || elapsed >= timeoutMs) {
+ resolve(got);
return;
}
setTimeout(check, 100);
};
check();
});
+
+ // Only a server that actually pushed can be corruption-checked.
+ if (received) {
+ this.detectStaleDiagnostics(uri, opts?.text ?? "");
+ }
+
+ // `slow` is structurally false now: the per-server cap is 10s, so
+ // elapsed can never exceed the old "unusually long" threshold. That
+ // warning is superseded by the timeout→skip notice produced in
+ // aggregateDiagnostics. The field is kept for contract compatibility.
+ return {
+ formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity),
+ slow: false,
+ timedOut: !received,
+ };
}
getWatchedFilesRegistry(): WatchedFilesRegistry {
@@ -433,11 +540,21 @@ export class LanguageServerClient {
return this.diagnostics;
}
- async request(method: string, params?: unknown): Promise<unknown> {
+ /**
+ * Send a request (hover/definition/references/documentSymbol). Capped at
+ * REQUEST_TIMEOUT_MS so a dead/slow server can't hang the turn — the
+ * initialize handshake bypasses this (it calls rpc.sendRequest directly
+ * with its own 45s race).
+ */
+ async request(
+ method: string,
+ params?: unknown,
+ timeoutMs: number = LanguageServerClient.REQUEST_TIMEOUT_MS,
+ ): Promise<unknown> {
if (!this.rpc || this.state !== "connected") {
throw new Error("Client not connected");
}
- return this.rpc.sendRequest(method, params);
+ return this.rpc.sendRequest(method, params, timeoutMs);
}
shutdown(): void {
@@ -450,3 +567,11 @@ export class LanguageServerClient {
this.state = "not-started";
}
}
+
+function setsEqual<T>(a: Set<T>, b: Set<T>): boolean {
+ if (a.size !== b.size) return false;
+ for (const v of a) {
+ if (!b.has(v)) return false;
+ }
+ return true;
+}
diff --git a/packages/lsp/src/diagnostics.ts b/packages/lsp/src/diagnostics.ts
index bc7ac0a..50beca9 100644
--- a/packages/lsp/src/diagnostics.ts
+++ b/packages/lsp/src/diagnostics.ts
@@ -91,7 +91,7 @@ export class DiagnosticsStore {
}
}
-function diagnosticKey(d: Diagnostic): string {
+export function diagnosticKey(d: Diagnostic): string {
const r = d.range;
return `${r.start.line}:${r.start.character}-${r.end.line}:${r.end.character}:${d.severity ?? 0}:${d.message}`;
}
diff --git a/packages/lsp/src/extension.ts b/packages/lsp/src/extension.ts
index 8e3178a..c0fee44 100644
--- a/packages/lsp/src/extension.ts
+++ b/packages/lsp/src/extension.ts
@@ -8,6 +8,7 @@
import { extname, join } from "node:path";
import type { Extension, HostAPI, ServiceHandle } from "@dispatch/kernel";
import { defineService } from "@dispatch/kernel";
+import { aggregateDiagnostics } from "./aggregate.js";
import type { SpawnedProcess } from "./client.js";
import { LspManager } from "./manager.js";
import { createLspTool } from "./tool.js";
@@ -43,6 +44,14 @@ function realSpawn(
stderr: proc.stderr,
pid: proc.pid,
kill: () => proc.kill(),
+ // Surface process exit so the client can stop querying a dead server
+ // and self-heal (respawn). Bun's Subprocess.exited resolves with the
+ // exit code (or rejects if killed by signal — treat as code:null).
+ onExit: (handler) => {
+ (proc as { exited: Promise<number | null> }).exited
+ .then((code) => handler({ code }))
+ .catch(() => handler({ code: null }));
+ },
};
}
@@ -108,14 +117,20 @@ export const extension: Extension = {
return manager.status(cwd);
},
async getDiagnostics(opts: GetDiagnosticsOpts): Promise<DiagnosticsResult> {
- const timeoutMs = opts.timeoutMs ?? 60_000;
- const slowThreshold = 10_000;
+ // 10s hard ceiling per server, regardless of what the caller
+ // passes (the edit hook still passes 60_000 — clamped here, so
+ // no other-unit edit is needed). A server that doesn't respond
+ // in 10s is skipped with a notice instead of waited out.
+ const PER_SERVER_CAP_MS = 10_000;
+ const timeoutMs = Math.min(opts.timeoutMs ?? PER_SERVER_CAP_MS, PER_SERVER_CAP_MS);
const fileExt = extname(opts.filePath).toLowerCase();
const absolutePath = opts.filePath.startsWith("/")
? opts.filePath
: join(opts.cwd, opts.filePath);
// Get all connected servers matching this file's extension.
+ // A dead/corrupted server has state:"error" and is excluded —
+ // no per-edit hang on a corpse.
const statuses = await manager.status(opts.cwd);
const matching = statuses.filter(
(s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt),
@@ -125,34 +140,15 @@ export const extension: Extension = {
return { formatted: "", slow: false, timedOut: false };
}
- const parts: string[] = [];
- let anySlow = false;
- let anyTimedOut = false;
- const start = Date.now();
-
- for (const s of matching) {
- const client = manager.getClient(s.id, s.root);
- if (!client) continue;
- const waitOpts: { text?: string; timeoutMs?: number; minSeverity?: number } = {
- timeoutMs,
- };
- if (opts.text !== undefined) waitOpts.text = opts.text;
- if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity;
- const result = await client.waitForDiagnostics(absolutePath, waitOpts);
- if (result.slow) anySlow = true;
- if (result.timedOut) anyTimedOut = true;
- if (result.formatted) {
- parts.push(`[${s.name}]\n${result.formatted}`);
- }
- }
-
- const elapsed = Date.now() - start;
+ const agg = await aggregateDiagnostics(
+ (id, root) => manager.getClient(id, root),
+ matching,
+ absolutePath,
+ timeoutMs,
+ { text: opts.text, minSeverity: opts.minSeverity },
+ );
- return {
- formatted: parts.join("\n\n"),
- slow: anySlow || elapsed > slowThreshold,
- timedOut: anyTimedOut,
- };
+ return { formatted: agg.formatted, slow: false, timedOut: agg.timedOut };
},
};
host.provideService(lspServiceHandle, service);
diff --git a/packages/lsp/src/manager.test.ts b/packages/lsp/src/manager.test.ts
index 1649111..d8cba4f 100644
--- a/packages/lsp/src/manager.test.ts
+++ b/packages/lsp/src/manager.test.ts
@@ -361,4 +361,92 @@ describe("manager", () => {
expect(s[0]?.error).toContain("[from .dispatch/lsp.json]");
expect(s[0]?.error).toContain("spawn failed");
});
+
+ it("a client that dies after connecting is skipped + re-spawned after backoff (no storm, no eternal hang)", async () => {
+ // A spawn that completes the initialize handshake AND lets the test
+ // simulate process death via the captured onExit handler.
+ const exitHandlers: Array<(info: { code: number | null; signal?: string }) => void> = [];
+ let spawnCount = 0;
+ const spawn: SpawnProcess = () => {
+ spawnCount++;
+ let messageHandler: ((data: Uint8Array) => void) | null = null;
+ const proc: SpawnedProcess = {
+ stdin: {
+ write: (bytes: Uint8Array) => {
+ const decoded = new TextDecoder().decode(bytes);
+ const headerEnd = decoded.indexOf("\r\n\r\n");
+ if (headerEnd === -1) return;
+ const json = decoded.slice(headerEnd + 4);
+ try {
+ const msg = JSON.parse(json);
+ if (msg.method === "initialize") {
+ setTimeout(() => {
+ const response = JSON.stringify({
+ jsonrpc: "2.0",
+ id: msg.id,
+ result: { capabilities: {} },
+ });
+ messageHandler?.(encode(response));
+ }, 1);
+ }
+ } catch {
+ // ignore
+ }
+ },
+ },
+ stdout: {
+ on: (_event: string, cb: (data: Uint8Array) => void) => {
+ messageHandler = cb;
+ },
+ },
+ pid: 1000 + spawnCount,
+ kill: () => {},
+ onExit: (handler) => {
+ exitHandlers.push(handler);
+ },
+ };
+ return proc;
+ };
+
+ const clock = { now: 0 };
+ const manager = new LspManager({
+ spawn,
+ fileWatcher: noopFileWatcher(),
+ fs: fakeFs({
+ "/project/.dispatch/lsp.json": JSON.stringify({
+ servers: {
+ steep: {
+ command: ["steep", "--stdio"],
+ extensions: [".rb"],
+ rootMarkers: [],
+ },
+ },
+ }),
+ }),
+ now: () => clock.now,
+ });
+
+ // 1) Connects.
+ const s1 = await manager.status("/project");
+ expect(s1[0]?.state).toBe("connected");
+ expect(spawnCount).toBe(1);
+
+ // 2) Simulate the process dying (user kill / crash) via onExit.
+ exitHandlers[0]?.({ code: 1 });
+ const clientAfterDeath = manager.getClient("steep", "/project");
+ expect(clientAfterDeath?.getState()).toBe("error");
+
+ // 3) status() now reports error (and seeds a broken entry for backoff).
+ // Backoff not elapsed yet (clock frozen at 0) → NOT re-spawned.
+ const s2 = await manager.status("/project");
+ expect(s2[0]?.state).toBe("error");
+ expect(s2[0]?.error).toMatch(/process exited/);
+ expect(spawnCount).toBe(1); // no retry storm before backoff
+
+ // 4) After the backoff elapses, status() re-spawns a fresh server.
+ clock.now = 31_000;
+ const s3 = await manager.status("/project");
+ expect(s3[0]?.state).toBe("connected");
+ expect(spawnCount).toBe(2); // re-spawned exactly once
+ });
});
diff --git a/packages/lsp/src/manager.ts b/packages/lsp/src/manager.ts
index 7153956..bc84479 100644
--- a/packages/lsp/src/manager.ts
+++ b/packages/lsp/src/manager.ts
@@ -134,6 +134,19 @@ export class LspManager {
if (existing) {
const state = existing.client.getState();
const stateError = existing.client.getStateError();
+ // A client that died or corrupted AFTER connecting flipped its
+ // own state to "error" (client.ts handleExit/markBroken). Spawn
+ // succeeded so there's no broken entry yet — seed one so the
+ // bounded-backoff path above re-spawns it, instead of reporting
+ // error forever (and so getDiagnostics' "connected" filter skips
+ // it, avoiding a per-edit hang on the corpse).
+ if (state === "error" && !this.broken.has(key)) {
+ this.broken.set(key, {
+ configFingerprint: configFingerprint(server),
+ brokenAt: this.now(),
+ error: enrichError(server, stateError ?? "server unavailable"),
+ });
+ }
const status: LspServerStatus = {
id: server.id,
name: server.name,
diff --git a/packages/lsp/src/rpc.test.ts b/packages/lsp/src/rpc.test.ts
index 05ce924..7b22ec5 100644
--- a/packages/lsp/src/rpc.test.ts
+++ b/packages/lsp/src/rpc.test.ts
@@ -84,3 +84,29 @@ it("handleMessage does not throw on malformed JSON", async () => {
await expect(conn.handleMessage("")).resolves.toBeUndefined();
await expect(conn.handleMessage("not json at all")).resolves.toBeUndefined();
});
+
+describe("sendRequest timeout", () => {
+ it("rejects with a timeout error when no response arrives within timeoutMs", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("textDocument/hover", {}, 50);
+ await expect(promise).rejects.toThrow(/LSP request timed out after 50ms: textDocument\/hover/);
+ });
+
+ it("clears the timer on a normal response (no unhandled rejection)", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("textDocument/hover", {}, 5000);
+ conn.handleMessage(frameResponse(1, { ok: true }));
+ await expect(promise).resolves.toEqual({ ok: true });
+ // Give the (now-cleared) timer window ample time to prove it never fires.
+ await new Promise((r) => setTimeout(r, 80));
+ });
+
+ it("does not time out when no timeoutMs is given (initialize handshake path)", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("initialize", {});
+ // A late response well past any plausible default still resolves.
+ await new Promise((r) => setTimeout(r, 60));
+ conn.handleMessage(frameResponse(1, { capabilities: {} }));
+ await expect(promise).resolves.toEqual({ capabilities: {} });
+ });
+});
diff --git a/packages/lsp/src/rpc.ts b/packages/lsp/src/rpc.ts
index 6b82624..95157de 100644
--- a/packages/lsp/src/rpc.ts
+++ b/packages/lsp/src/rpc.ts
@@ -39,11 +39,36 @@ export class JsonRpcConnection {
this.write = write;
}
- sendRequest(method: string, params?: unknown): Promise<unknown> {
+ /**
+ * Send a request and await the correlated response. If `timeoutMs` is given,
+ * the promise rejects with a timeout error after that long — so a dead/slow
+ * server can't hang the caller forever (hover/definition/references).
+ * No `timeoutMs` = wait indefinitely (used by the initialize handshake, which
+ * has its own 45s race).
+ */
+ sendRequest(method: string, params?: unknown, timeoutMs?: number): Promise<unknown> {
const id = this.nextId++;
const msg: JsonRpcMessage = { jsonrpc: "2.0", id, method, params };
return new Promise((resolve, reject) => {
- this.pending.set(id, { resolve, reject });
+ let timer: ReturnType<typeof setTimeout> | undefined;
+ // Wrap resolve/reject so the timer is cleared on a normal response
+ // (or on dispose) — no dangling timer after completion.
+ const finish = (fn: () => void): void => {
+ if (timer) clearTimeout(timer);
+ fn();
+ };
+ const entry: PendingRequest = {
+ resolve: (value: unknown) => finish(() => resolve(value)),
+ reject: (reason: unknown) => finish(() => reject(reason)),
+ };
+ if (timeoutMs !== undefined) {
+ timer = setTimeout(() => {
+ if (this.pending.delete(id)) {
+ reject(new Error(`LSP request timed out after ${timeoutMs}ms: ${method}`));
+ }
+ }, timeoutMs);
+ }
+ this.pending.set(id, entry);
this.sendMessage(msg);
});
}
diff --git a/packages/lsp/src/tool.test.ts b/packages/lsp/src/tool.test.ts
index 03787ae..efd1514 100644
--- a/packages/lsp/src/tool.test.ts
+++ b/packages/lsp/src/tool.test.ts
@@ -176,4 +176,101 @@ describe("tool", () => {
expect(result.isError).toBe(true);
expect(result.content).toContain("requires both");
});
+
+ it("diagnostics op: a server that times out is skipped with a raise-to-user notice", async () => {
+ const mockClient = {
+ getState: () => "connected" as const,
+ getStateError: () => undefined,
+ request: async () => null,
+ waitForDiagnostics: async () => ({ formatted: "", slow: false, timedOut: true }),
+ };
+
+ const tool = createLspTool(
+ stubManager({
+ status: async () => [
+ {
+ id: "steep",
+ name: "Steep",
+ root: "/project",
+ extensions: [".rb"],
+ state: "connected",
+ },
+ ],
+ getClient: () => mockClient as never,
+ }),
+ );
+
+ const result = await tool.execute(
+ { operation: "diagnostics", path: "game.rb" },
+ {
+ toolCallId: "test",
+ onOutput: () => {},
+ signal: AbortSignal.timeout(5000),
+ log: {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: () => ({}) as never,
+ span: () => ({}) as never,
+ },
+ cwd: "/project",
+ },
+ );
+
+ expect(result.isError).not.toBe(true);
+ expect(result.content).toContain("[Steep]");
+ expect(result.content).toContain("took too long");
+ expect(result.content).toContain(">10s");
+ expect(result.content).toContain("raise this to the user");
+ });
+
+ it("diagnostics op: responding servers' diagnostics are merged, tagged by source", async () => {
+ const mockClient = {
+ getState: () => "connected" as const,
+ getStateError: () => undefined,
+ request: async () => null,
+ waitForDiagnostics: async () => ({
+ formatted: "ERROR L1:1: boom",
+ slow: false,
+ timedOut: false,
+ }),
+ };
+
+ const tool = createLspTool(
+ stubManager({
+ status: async () => [
+ {
+ id: "steep",
+ name: "Steep",
+ root: "/project",
+ extensions: [".rb"],
+ state: "connected",
+ },
+ ],
+ getClient: () => mockClient as never,
+ }),
+ );
+
+ const result = await tool.execute(
+ { operation: "diagnostics", path: "game.rb" },
+ {
+ toolCallId: "test",
+ onOutput: () => {},
+ signal: AbortSignal.timeout(5000),
+ log: {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: () => ({}) as never,
+ span: () => ({}) as never,
+ },
+ cwd: "/project",
+ },
+ );
+
+ expect(result.content).toContain("[Steep]");
+ expect(result.content).toContain("boom");
+ });
});
diff --git a/packages/lsp/src/tool.ts b/packages/lsp/src/tool.ts
index 8d282ec..be0d269 100644
--- a/packages/lsp/src/tool.ts
+++ b/packages/lsp/src/tool.ts
@@ -6,6 +6,7 @@
import { extname, resolve } from "node:path";
import type { ToolContract, ToolExecuteContext, ToolResult } from "@dispatch/kernel";
+import { aggregateDiagnostics } from "./aggregate.js";
import type { LspManager } from "./manager.js";
type Operation = "diagnostics" | "hover" | "definition" | "references" | "documentSymbol";
@@ -157,6 +158,8 @@ export function createLspTool(manager: LspManager): ToolContract {
switch (operation) {
case "diagnostics": {
+ // 10s hard ceiling per server (same policy as the edit path).
+ const DIAGNOSTICS_TIMEOUT_MS = 10_000;
// Query ALL connected servers whose extensions match this file.
const matching = statuses.filter(
(s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt),
@@ -179,31 +182,27 @@ export function createLspTool(manager: LspManager): ToolContract {
if (!client) {
return { content: "Language server client not available.", isError: true };
}
- const result = await client.waitForDiagnostics(absolutePath);
+ const result = await client.waitForDiagnostics(absolutePath, {
+ timeoutMs: DIAGNOSTICS_TIMEOUT_MS,
+ });
+ if (result.timedOut) {
+ return {
+ content: `⚠️ [${connected.name}] LSP took too long (>10s), diagnostics skipped — please raise this to the user.`,
+ };
+ }
return { content: result.formatted || "No diagnostics found." };
}
- // Query each matching server and merge results, tagged by source.
- const parts: string[] = [];
- let anyTimedOut = false;
- for (const s of matching) {
- const client = manager.getClient(s.id, s.root);
- if (!client) continue;
- const result = await client.waitForDiagnostics(absolutePath, { timeoutMs: 60_000 });
- if (result.timedOut) anyTimedOut = true;
- if (result.slow) {
- parts.push(
- `⚠️ LSP is taking unusually long. If this happens more than once, raise it to the user.`,
- );
- }
- if (result.formatted) {
- parts.push(`[${s.name}]\n${result.formatted}`);
- }
- }
- if (anyTimedOut && parts.length === 0) {
- parts.push("Diagnostics timed out (server may still be indexing).");
- }
- return { content: parts.length > 0 ? parts.join("\n\n") : "No diagnostics found." };
+ // Query matching servers concurrently, each capped at 10s;
+ // a non-responding server is skipped with a notice.
+ const agg = await aggregateDiagnostics(
+ (id, root) => manager.getClient(id, root),
+ matching,
+ absolutePath,
+ DIAGNOSTICS_TIMEOUT_MS,
+ {},
+ );
+ return { content: agg.formatted || "No diagnostics found." };
}
case "hover": {
const client = await getFirstMatchingClient(manager, statuses, fileExt);
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index fa8d9e9..aaafb76 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -12,6 +12,7 @@ export {
conversationOpened,
conversationStatusChanged,
createCompactionService,
+ createRetryStrategy,
createSessionOrchestrator,
createWarmService,
type EnqueueInput,
@@ -34,8 +35,13 @@ export {
} from "./orchestrator.js";
export {
buildUserMessage,
+ cumulativeSleepMs,
defaultDispatchPolicy,
+ delayFor,
generateTurnId,
+ RETRY_BUDGET_MS,
+ RETRY_SCHEDULE_MS,
+ RETRY_TAIL_MS,
resolveReasoningEffort,
selectFirstProvider,
} from "./pure.js";
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index ae27e59..a1401d6 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -11,6 +11,7 @@ import type {
ProviderEvent,
ProviderStreamOptions,
ReasoningEffort,
+ RetryStrategy,
RunTurnInput,
RunTurnResult,
ToolContract,
@@ -24,6 +25,7 @@ import { createMetricsAccumulator } from "./metrics.js";
import {
buildUserMessage,
defaultDispatchPolicy,
+ delayFor,
generateTurnId,
resolveModelName,
resolveReasoningEffort,
@@ -342,12 +344,45 @@ export interface SessionOrchestratorBundle {
readonly activeConversations: ReadonlySet<string>;
}
+/**
+ * The concrete retry strategy wired into every turn's `RunTurnInput.retry`.
+ *
+ * `delayFor` is the pure schedule (`5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`,
+ * then repeat 30m until 8h cumulative scheduled sleep) — no I/O, no clock.
+ * `sleep` is the abortable I/O effect: a `setTimeout`-based promise that
+ * rejects when the turn's abort signal fires (so a retry in flight seals the
+ * turn `aborted`). The kernel imports no timer; this is the shell-provided I/O.
+ */
+export function createRetryStrategy(): RetryStrategy {
+ const sleep = (ms: number, signal: AbortSignal): Promise<void> => {
+ return new Promise((resolve, reject) => {
+ if (signal.aborted) {
+ reject(new Error("aborted"));
+ return;
+ }
+ const timer = setTimeout(() => {
+ signal.removeEventListener("abort", onAbort);
+ resolve();
+ }, ms);
+ const onAbort = () => {
+ clearTimeout(timer);
+ reject(new Error("aborted"));
+ };
+ signal.addEventListener("abort", onAbort, { once: true });
+ });
+ };
+ return { delayFor, sleep };
+}
+
export function createSessionOrchestrator(
deps: SessionOrchestratorDeps,
): SessionOrchestratorBundle {
const activeConversations = new Set<string>();
const subscribers = new Map<string, Set<TurnEventListener>>();
const activeTurns = new Map<string, ActiveTurn>();
+ // One stateless retry strategy shared by every turn (delayFor is pure; sleep
+ // is a stateless setTimeout closure). Wired into each RunTurnInput.retry.
+ const retryStrategy = createRetryStrategy();
function emitToHub(conversationId: string, event: AgentEvent): void {
const turn = activeTurns.get(conversationId);
@@ -640,6 +675,7 @@ export function createSessionOrchestrator(
turnId,
signal: controller.signal,
providerOpts,
+ retry: retryStrategy,
...(turnLogger !== undefined ? { logger: turnLogger } : {}),
...(effectiveCwd !== undefined ? { cwd: effectiveCwd } : {}),
...(effectiveComputerId !== undefined ? { computerId: effectiveComputerId } : {}),
diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts
index 9e5d3c4..2cbe15f 100644
--- a/packages/session-orchestrator/src/pure.test.ts
+++ b/packages/session-orchestrator/src/pure.test.ts
@@ -2,8 +2,13 @@ import type { ProviderContract } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
import {
buildUserMessage,
+ cumulativeSleepMs,
defaultDispatchPolicy,
+ delayFor,
generateTurnId,
+ RETRY_BUDGET_MS,
+ RETRY_SCHEDULE_MS,
+ RETRY_TAIL_MS,
resolveReasoningEffort,
selectFirstProvider,
} from "./pure.js";
@@ -100,3 +105,59 @@ describe("resolveReasoningEffort", () => {
expect(resolveReasoningEffort(undefined, "max")).toBe("max");
});
});
+
+describe("retry backoff schedule (delayFor)", () => {
+ it("emits the stepped head: 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m", () => {
+ expect(delayFor(0)).toBe(5_000);
+ expect(delayFor(1)).toBe(10_000);
+ expect(delayFor(2)).toBe(30_000);
+ expect(delayFor(3)).toBe(60_000);
+ expect(delayFor(4)).toBe(300_000);
+ expect(delayFor(5)).toBe(600_000);
+ expect(delayFor(6)).toBe(900_000);
+ expect(delayFor(7)).toBe(1_800_000);
+ });
+
+ it("repeats 30m after the head", () => {
+ expect(delayFor(8)).toBe(RETRY_TAIL_MS);
+ expect(delayFor(9)).toBe(RETRY_TAIL_MS);
+ expect(delayFor(20)).toBe(RETRY_TAIL_MS);
+ });
+
+ it("gives up (returns undefined) once cumulative sleep exceeds 8h", () => {
+ // Head sums to 3,705,000 ms; +1,800,000 per extra step. 8h = 28,800,000.
+ // attempt 20 cumulative = 3,705,000 + 13*1,800,000 = 27,105,000 (< 8h) → retry.
+ expect(delayFor(20)).toBe(RETRY_TAIL_MS);
+ // attempt 21 cumulative = 27,105,000 + 1,800,000 = 28,905,000 (> 8h) → stop.
+ expect(delayFor(21)).toBeUndefined();
+ });
+
+ it("cumulativeSleepMs matches the sum of the schedule", () => {
+ expect(cumulativeSleepMs(0)).toBe(5_000);
+ expect(cumulativeSleepMs(1)).toBe(15_000);
+ expect(cumulativeSleepMs(7)).toBe(RETRY_SCHEDULE_MS.reduce((a, b) => a + b, 0));
+ // 8h budget is 28,800,000 ms.
+ expect(RETRY_BUDGET_MS).toBe(8 * 60 * 60 * 1000);
+ // The last retry (attempt 20) keeps cumulative under budget.
+ expect(cumulativeSleepMs(20)).toBeLessThanOrEqual(RETRY_BUDGET_MS);
+ // The next (attempt 21) exceeds it.
+ expect(cumulativeSleepMs(21)).toBeGreaterThan(RETRY_BUDGET_MS);
+ });
+
+ it("the full schedule has 21 retries then stops", () => {
+ const schedule: number[] = [];
+ let attempt = 0;
+ while (true) {
+ const delay = delayFor(attempt);
+ if (delay === undefined) break;
+ schedule.push(delay);
+ attempt++;
+ }
+ expect(schedule).toHaveLength(21);
+ expect(schedule[0]).toBe(5_000);
+ expect(schedule.at(-1)).toBe(RETRY_TAIL_MS);
+ // 8 stepped head + 13 tail repeats.
+ expect(schedule.slice(0, 8)).toEqual([...RETRY_SCHEDULE_MS]);
+ expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true);
+ });
+});
diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts
index 9a31e17..a028cbe 100644
--- a/packages/session-orchestrator/src/pure.ts
+++ b/packages/session-orchestrator/src/pure.ts
@@ -9,6 +9,53 @@ export function buildUserMessage(text: string): ChatMessage {
return { role: "user", chunks: [{ type: "text", text }] };
}
+// ── Provider-error retry backoff schedule ───────────────────────────────────
+//
+// Pure, deterministic delay decision (no I/O, no clock) for retrying retryable
+// provider errors (HTTP 429 / 5xx "overloaded"). The concrete `sleep` (I/O)
+// is wired in the orchestrator; this owns only the policy.
+
+/**
+ * Stepped backoff schedule (ms): 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m.
+ * After the head is exhausted, {@link RETRY_TAIL_MS} (30m) repeats.
+ */
+export const RETRY_SCHEDULE_MS = [
+ 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000,
+] as const;
+
+/** Tail delay (ms) repeated after the stepped head: 30 minutes. */
+export const RETRY_TAIL_MS = 1_800_000;
+
+/** Cumulative scheduled-sleep budget (ms) after which retrying gives up: 8h. */
+export const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000;
+
+/**
+ * Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]).
+ * Pure — no I/O, no clock.
+ */
+export function cumulativeSleepMs(attempt: number): number {
+ let sum = 0;
+ for (let i = 0; i <= attempt; i++) {
+ sum += i < RETRY_SCHEDULE_MS.length ? (RETRY_SCHEDULE_MS[i] ?? RETRY_TAIL_MS) : RETRY_TAIL_MS;
+ }
+ return sum;
+}
+
+/**
+ * Pure, deterministic delay decision for the retry strategy: given the
+ * 0-based attempt index, return the delay in ms to sleep before the next
+ * retry, or `undefined` to stop (cumulative budget exhausted). No I/O, no
+ * clock — fully testable. Matches the plan's schedule:
+ * `5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, then repeat 30m until 8h of
+ * cumulative scheduled sleep is reached, then give up.
+ */
+export function delayFor(attempt: number): number | undefined {
+ const scheduled = RETRY_SCHEDULE_MS[attempt];
+ const delay = scheduled !== undefined ? scheduled : RETRY_TAIL_MS;
+ if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop
+ return delay;
+}
+
/**
* Resolve the reasoning-effort level for a turn:
* per-turn override → persisted per-conversation value → default `"high"`.
diff --git a/packages/tool-edit-file/src/extension.ts b/packages/tool-edit-file/src/extension.ts
index 2eaa0e9..9dbebda 100644
--- a/packages/tool-edit-file/src/extension.ts
+++ b/packages/tool-edit-file/src/extension.ts
@@ -41,7 +41,10 @@ export const extension: Extension = {
filePath: opts.filePath,
text: opts.text,
cwd: opts.cwd,
- timeoutMs: 60_000,
+ // 10s matches the LSP service's per-server cap (see packages/lsp).
+ // The service clamps this anyway; stated explicitly so the call
+ // site is honest about the effective live-diagnostics budget.
+ timeoutMs: 10_000,
minSeverity: 2, // errors + warnings only
});
};
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index f6a95cf..8dc3a72 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -273,6 +273,7 @@ export type AgentEvent =
| TurnUsageEvent
| TurnStepCompleteEvent
| TurnErrorEvent
+ | TurnProviderRetryEvent
| TurnDoneEvent
| TurnSealedEvent
| TurnSteeringEvent;
@@ -429,6 +430,31 @@ export interface TurnErrorEvent {
readonly code?: string;
}
+/**
+ * A retryable provider error is being retried with backoff. Emitted once per
+ * scheduled retry, BEFORE the sleep, so the UI can show "⚠ Server overloaded —
+ * retrying in 5s…" immediately. TRANSIENT: emitted to the frontend but NOT
+ * persisted into the model's message history (it never pollutes the prompt).
+ *
+ * When the retry budget is exhausted, the existing `error` event is emitted and
+ * the turn seals — so the final failure is still a persisted error. `attempt` is
+ * 0-based (the Nth retry about to happen); `delayMs` is the scheduled sleep
+ * before that retry fires.
+ */
+export interface TurnProviderRetryEvent {
+ readonly type: "provider-retry";
+ readonly conversationId: string;
+ readonly turnId: string;
+ /** 0-based: this is the Nth retry about to happen. */
+ readonly attempt: number;
+ /** ms the client should expect to wait before the retry fires. */
+ readonly delayMs: number;
+ /** The endpoint's error verbatim (e.g. "HTTP 429: {…overloaded_error…}"). */
+ readonly message: string;
+ /** The HTTP code when known (e.g. "429"). */
+ readonly code?: string;
+}
+
/** The turn has completed (model finished generating). */
export interface TurnDoneEvent {
readonly type: "done";
diff --git a/tasks.md b/tasks.md
index b14645c..d1d55e7 100644
--- a/tasks.md
+++ b/tasks.md
@@ -5,10 +5,36 @@
> Keep this lean and current; do not let it re-accrete a step-by-step changelog.
## Status (current)
-`tsc -b` EXIT 0 · biome clean · **1549 vitest** green. (worktree `feature/ssh-support`;
-baseline re-verified after `bun install`.)
-
-## SSH support — transparent remote execution (IN PROGRESS)
+`tsc -b` EXIT 0 · biome clean · **1730 vitest** pass (+6 sshd-integration skipped). (worktree `feature/ssh-support`;
+merged `dev` — brings retry-with-backoff (`provider-retry` AgentEvent) + the LSP-dead-server fix alongside the
+SSH waves below.)
+
+## Retry with backoff on retryable provider errors (DONE — from dev)
+When the upstream LLM API returns a retryable error (HTTP 429 / 5xx "overloaded"),
+the kernel now retries `provider.stream()` with a stepped backoff, visibly, until
+the 8h cumulative-sleep budget is exhausted — then emits the final error and
+seals the turn. Retries fire ONLY when no content was emitted yet this step (the
+safety invariant — never duplicate partial output). Plan:
+`notes/retry-with-backoff-plan.md`; report: `reports/retry-with-backoff.md`.
+- **Architecture (kernel hook + shell policy/I/O):** kernel provides the hook
+ (`RetryStrategy` contract + the retry loop in `runTurn`); the shell
+ (session-orchestrator) provides the policy (the schedule) + the I/O (an
+ abortable `setTimeout` sleep). Kernel imports no timer. `retry?` is optional
+ → omit = no retry (backward-compatible).
+- **New transient `AgentEvent` variant** `provider-retry` (`@dispatch/wire`),
+ emitted once per scheduled retry BEFORE the sleep so the UI can show
+ "⚠ retrying in Ns…" immediately; NOT persisted to model history (never
+ pollutes the prompt). Final failure is still a persisted `error` + seal.
+- **Schedule:** `5s,10s,30s,60s,5m,10m,15m,30m`, then repeat 30m until 8h of
+ cumulative scheduled sleep → ~21 retries then give up. Pure `delayFor(attempt)`.
+- **Retry trigger:** emitted `error` with `retryable===true` → retry;
+ `retryable` false/absent → give up; a THROWN error → retryable-by-default
+ ONLY when pre-content. All gated on `!hadContent` (text/reasoning/tool-call/usage).
+- **Frontend handoff (5d3f, separate repo `../dispatch-web`):** render
+ `provider-retry` as a yellow warning system-message bubble showing `message`
+ (+`code`) with the `delayMs` countdown.
+
+## SSH support — transparent remote execution (DONE — waves 0-5c)
Plan: `notes/ssh-support-plan.md` (decisions locked in §0.5/§13). Orchestrated in
waves (ORCHESTRATOR.md §2a — pre-author the contract seam, then parallel
owner-agents on disjoint packages).
@@ -29,8 +55,6 @@ owner-agents on disjoint packages).
+ remote tool-drop filter: drops `lsp` + `__`-namespaced MCP tools when
remote) + `transport-contract` (ChatRequest.computerId + computer endpoint
API types). `tsc -b` EXIT 0, biome clean, **1620 vitest** (was 1599).
- CR-1 (non-blocking): MCP filter doesn't preserve `computerId` on
- ToolAssembly — fix folded into wave 4.
- [x] **Wave 4** (parallel): `transport-http` (computer endpoints + `/chat`
threading + the `ComputerService` seam the ssh package will provide) +
`transport-ws` (computerId through chat.send/queue) + `mcp` (CR-1: preserve
@@ -50,6 +74,9 @@ owner-agents on disjoint packages).
computerServiceHandle. orchestrator added missing @dispatch/exec-backend dep to
host-bin + bun install. **LIVE-VERIFIED**: server boots clean ("Dispatch booted",
no disabled extensions). tsc -b EXIT 0, biome clean, 1690 vitest (+6 sshd skipped).
+- [x] **Merge dev**: brought retry-with-backoff (`provider-retry` AgentEvent — what
+ the FE consumes) + LSP-dead-server fix into the SSH branch. All code files
+ auto-merged cleanly; only `tasks.md` conflicted (orchestrator-resolved).
- [ ] **DEFERRED — CR-6 usageCount**: `listComputers()` returns `usageCount: 0` until a
conversation-store count-by-alias helper + host-bin wiring is added (non-blocking —
discovery/connect/execute all work; only the count badge shows 0). Follow-up.
@@ -63,7 +90,6 @@ Key decisions: ssh2 + ssh-config (project-local deps); key-only auth from
`~/.ssh/config` (no CRUD entity); computerId persisted per-conversation; LSP/MCP
silently dropped on remote turns; edit_file works w/o diagnostics remotely.
-
## Per-edit LSP diagnostics auto-append (DONE)
After a successful `edit_file`, the extension now calls LSP `getDiagnostics` on the
post-edit buffer and appends any errors/warnings (severity ≤ 2) to the tool result —