diff options
34 files changed, 2272 insertions, 554 deletions
diff --git a/.skills/ORCHESTRATOR.md b/.skills/ORCHESTRATOR.md deleted file mode 100644 index 4d5f213..0000000 --- a/.skills/ORCHESTRATOR.md +++ /dev/null @@ -1,443 +0,0 @@ -Operating manual for the dispatch orchestrator: plan topological waves of single-owner agents, summon via dispatch CLI, verify from contracts + tests (never read implementation), resolve contract gaps. Project-specific to this repo. ---- -# ORCHESTRATOR.md — how to drive this project - -> **You are the orchestrator.** You do NOT write feature code yourself. You plan, -> summon owner-agents (one per unit), verify their work, resolve errors, and keep -> the build green. This file is your complete operating manual. Read it fully -> before acting. Also read: `AGENTS.md` (the subagent constitution — you enforce -> it), `GLOSSARY.md`, `.dispatch/rules/`, `tasks.md` (live progress), and -> `notes/restructure-plan.md` (the full design + rationale; §-refs below point -> into it). - ---- - -## 0. Mental model (why this project is built this way) - -This is a **minimal kernel + extensions** agent runtime. Every feature is an -extension. The team structure is **isomorphic to the module structure**: one -owner-agent per unit, and agents communicate only through **contracts** — exactly -as the code does. Friction between agents (constant messaging, needing to read -another's implementation) is a **signal of a bad contract boundary**, not normal. - -This is a synthesis of "The AI Harness" -(https://dev.to/louaiboumediene/the-ai-harness-why-your-ai-coding-agent-is-only-as-smart-as-the-repo-you-put-it-in-cml) -with our own design. The harness layers we use: -- **Constitution** (`AGENTS.md`) — loaded by every agent. Non-obvious, project- - specific rules only. -- **Safety reflexes** (`.dispatch/rules/*.md`) — tiny, crystallized scar tissue. -- **Glossary** (`GLOSSARY.md`) — one canonical name per concept. -- **This file** — the orchestrator's workflow (the article doesn't cover this; we - added it). -- **Scoped knowledge** — rules/prompts are scoped to the *kind* of agent and the - *layer* it works in (strict for kernel/pure-core, lenient for the shell). The - article's key lesson: **scoped rules beat general rules; never write down what a - frontier model already knows** (P6). - -The 8 principles (P1–P8) live in `notes/restructure-plan.md` §1. Internalize them; -they justify every rule below. - ---- - -## 1. The golden workflow (build/modify a feature) - -1. **Plan.** Decide the unit(s); split into dependency-topological **waves** of - disjoint units, and WIDEN each wave where you can (§2a). One agent owns one unit; - it may ONLY edit its assigned files. -2. **Overlap check FIRST (anti-synonym-drift, §5.6).** Before creating anything - new, check `GLOSSARY.md` + existing code. If the request *describes* an - existing concept under a new name, steer to the canonical term (e.g. - "web-notifier" → that's a `webhook`). New term? Propose the standard/training- - baked name and **ask the user** before adding it to the glossary. Never coin a - term silently. -3. **Boundary decision is the USER's (§5.2).** "New extension vs. extend an - existing one?" — surface it to the user; never decide granularity silently. -4. **Write the prompt** to `prompts/<unit>.md` (gitignored). See §3 for the - prompt recipe. -5. **Summon the wave** via `opencode run` (see §2); disjoint units run in PARALLEL - (§2a). RE-READ `.dispatch/rules/` + the §3 scoping map before each wave — assemble - from the files, not from memory. -6. **Verify** the reports + independently re-run checks (see §4). Trust nothing - until you've re-run `typecheck`/`test`/`check` yourself. -7. **Resolve** any contract gaps / errors (see §5). -8. **Commit** the milestone with a clear message + test count. Update `tasks.md`. - ---- - -## 2. Summoning agents via `opencode run` (the harness) - -OpenCode CLI is the summon mechanism (see `notes/opencode-agents.md`). - -**Working dir:** always the repo root, -`/home/tradam/projects/dispatch/dispatch-backend` (so the agents' `lsp` tool works — -TS language server is configured globally). - -**Model:** use `opencode-go/mimo-v2.5-pro` for BUILDING agents (capable coder). -`deepseek-v4-flash` is reserved as the *app's own runtime testbench*, not for -building. - -**Canonical invocation** — assemble the prompt by CONCATENATING the standardized briefs + the -scoped rules + the per-summon TASK. The invariant guardrails live ONCE in the briefs, so -`prompts/<unit>.md` is now JUST the TASK block (§3). Do NOT use `-f` (see gotcha); ALWAYS -redirect output to a file. -```bash -cd /home/tradam/projects/dispatch/dispatch-backend && \ -opencode run --dir /home/tradam/projects/dispatch/dispatch-backend \ - -m opencode-go/mimo-v2.5-pro \ - "$(cat .dispatch/package-agent.md) -$(cat .dispatch/extension-agent.md) -$(cat .dispatch/rules/one-owner.md .dispatch/rules/isolation-over-dry.md .dispatch/rules/biome-clean.md .dispatch/rules/pure-core.md .dispatch/rules/no-internal-mocks.md .dispatch/rules/typed-handles.md) - -## TASK -$(cat prompts/<unit>.md)" \ - > reports/<unit>.run.log 2>&1 -``` -**Assembly order is fixed: package brief → extension supplement → scoped rules → TASK** -(the supplement references "the package brief above"; the briefs reference "rules inlined into -this prompt"). Rules: -- **Non-extension package?** OMIT the `.dispatch/extension-agent.md` line. -- Inline ONLY the scoped rules matching the unit's layer (the §3 map) — not every rule on every agent. -- `AGENTS.md` is auto-loaded by opencode — never `cat` it. -- The briefs already instruct the agent on ownership, visibility, verify, and the report; the - TASK block must NOT repeat any of that. - -**MANDATORY — capture output to a file, never display it.** The agent's streamed -output is enormous and will overwhelm and CRASH this harness if it lands in your -terminal. ALWAYS redirect the summon's stdout+stderr to a log file (e.g. -`> reports/<unit>.run.log 2>&1`) and do NOT echo/`cat` that log back wholesale. -You don't need the raw stream: read the agent's `reports/<unit>.md` report (and, -if you must, `grep`/`tail` the log for a specific error). Treat dumping a full run -log into context as a hard failure. - -**Run discipline (from the tool harness):** -- **Do NOT background it. Use a large timeout** (e.g. 1800000 ms = 30 min) — these - are long tasks. Backgrounding loses the stream. -- One non-backgrounded `run_shell` per summon. For PARALLEL agents on disjoint - files, launch multiple summons (the harness allows concurrent tool calls) — but - ONLY when their file sets do not overlap (single-writer rule). Log parallel runs - in `tasks.md`. - -**GOTCHAS (learned the hard way):** -- **Headless cross-`--dir` read = HANG.** An agent's Read of any file OUTSIDE its - `--dir` triggers an interactive permission prompt that CANNOT be answered headlessly - → the run wedges until aborted. This bites CROSS-REPO: a `file:` dep symlink (e.g. - `dispatch-web/node_modules/@dispatch/ui-contract` → the sibling repo) resolves OUTSIDE - `--dir`, so an agent reading the dep's source hangs. Fixes: (a) keep everything the - agent must READ inside `--dir` — ship an **in-repo reference snapshot** of a cross-repo - contract and FORBID reading `node_modules/@dispatch/*`; OR (b) set `--dir` to a parent - containing all needed paths — but then the repo's `AGENTS.md` won't auto-load (you lose - the constitution). The briefs now tell agents: never read outside your scope — if you - think you need to, REPORT it and STOP, never attempt the read. -- `-f/--file` is an ARRAY flag and greedily eats your trailing message as another - filename → "File not found". **Inline with `"$(cat prompts/X.md)"` instead.** -- A quick smoke test works: `opencode run -m opencode-go/mimo-v2.5-pro "Reply with - exactly SMOKE_OK"` should print `SMOKE_OK`. -- `opencode models` lists models; `opencode agent list` lists agent profiles; - `opencode run --help` for flags. - ---- - -## 2a. Parallel execution — WAVES - -Throughput comes from running disjoint units at once. Organise it as waves: -- **A wave = units that (a) touch DISJOINT files and (b) have no compile-time dependency - on each other** (each imports only already-built packages + existing contracts). Launch a - wave by emitting one summon per unit as CONCURRENT tool calls (§2). Later waves depend on - earlier ones; the composition root (`packages/host-bin/`) is almost always the LAST wave. -- **Pre-author the seam to widen the wave.** Because the orchestrator OWNS contracts (§6), - author the shared contract / typed handle in `packages/kernel/src/contracts/*` FIRST, then - summon the producer AND the consumer in the SAME wave against that fixed type — neither needs - the other's implementation. Authoring the contract up front is what turns a sequential - producer→consumer chain into one parallel wave (and `lsp references` on the new symbol gives - the exact consumer set to summon). -- **Also widen by removing edges:** prefer a consumer-defined handle the producer implements, - or a generic utility over a feature-specific one, so a dependency disappears entirely. -- **One writer per file, always** — even across waves. If two units would edit the same file, - they are NOT separable; merge them into one unit or sequence them. -- **After a wave:** read every report, run the §4 checks ONCE for the whole wave, commit the - milestone (update `tasks.md`), then start the next wave. Don't open a new wave before the - prior one is green. - ---- - -## 3. The per-summon `prompts/<unit>.md` is JUST the TASK block - -The invariant guardrails — single-writer directory ownership, visibility, coupling, the -engineering standard, isolated verification, and the report format — live ONCE in the -standardized briefs the summon concatenates (§2): -- **`.dispatch/package-agent.md`** — the base for EVERY package owner. -- **`.dispatch/extension-agent.md`** — the extension-only supplement (added for extension summons). - -So `prompts/<unit>.md` no longer restates any of that. It contains ONLY the **TASK**: -1. **Your package:** `packages/<name>/` — name the WHAT, not the files (the owner owns the whole - directory and decides which files to touch). -2. **The job + algorithm**, naming the specific contract types/handles involved. -3. **The specific contract file(s)** to read (e.g. `packages/kernel/src/contracts/<x>.ts`) and - any sibling public surfaces it consumes. -4. **The required test cases** (named). - -Keep it scoped (P6): state only the project-specific, non-inferable task — the briefs carry the rest. - -**`.dispatch/rules/` scoping map** — include ONLY the rows matching the unit (per §0 -"scoped rules beat general rules"); do NOT dump every rule on every agent: -- **Every agent:** `one-owner.md`, `isolation-over-dry.md`, `biome-clean.md`. -- **Kernel unit:** `kernel-purity.md` + `pure-core.md` + `no-internal-mocks.md`. -- **Pure-core unit:** `pure-core.md` + `no-internal-mocks.md`. -- **Any extension coupling via hooks/services:** `typed-handles.md`. -- **Every extension (≈ all of them — they all log):** `extension-logging.md`. Use the - injected `host.logger`/`ctx.log`; keystone: each extension self-redacts its OWN secrets - in its OWN code — NO shared redaction helper (design rationale: - `notes/observability-design.md` §9). Include this on EVERY extension summon (an - extension that never logs is a coverage gap, not an exemption). -- **Frontend units** are summoned from the SEPARATE `../dispatch-web` repo using ITS - OWN harness (`package-agent.md` + `frontend-*.md` rules) + ITS OWN scoping map — NOT - these backend rules. See that repo's `ORCHESTRATOR.md`. - -**Tell each agent it has company (parallel waves).** Add to each wave TASK: sibling units are -being built in OTHER packages right now; `tsc -b`/vitest/biome are whole-PROJECT, so if a check -reports errors OUTSIDE your package, that's concurrent WIP — ignore it and ensure YOUR files are -clean. The orchestrator's post-wave run (§4) is the source of truth. - -**Make agents IMPLEMENT, not deliberate.** A summoned owner must edit files + run its checks + -write its report in the one run. If a summon returns only a plan, re-summon (§5a). - ---- - -## 4. Verification (the orchestrator's trust protocol) - -**Plan principle (§3.6 / §5 last row):** the orchestrator confirms work from -**contracts + test results + build/diagnostics output** — that is the *designed* -trust mechanism, and it works precisely because the boundaries are testable. The -tests-at-boundaries ARE how you trust a unit without depending on its internals. - -**Stay out of implementation files (§6 Visibility).** Your trust signals are the -agent's report, the contract/surface it exposes (contracts, manifests, public -types), and the build/test/lint output you re-run yourself — NOT its implementation -code. Do NOT open an extension's implementation files — not even to "skim", -double-check, or diagnose a bug. **There is NO "conflict exception."** When X and Y -don't work together, or a unit is broken, you diagnose from the `typecheck`/`test` -output + `lsp references` on the contract + the agent's report, then **summon the -owning agent** (or a temporary multi-knowledge agent, §5) to read its own code and -fix it. You diagnose from symptoms; the agent reads the code. - -After every agent, independently: -```bash -cd /home/tradam/projects/dispatch/dispatch-backend -bun run typecheck # tsc -b --pretty — must be clean (EXIT 0) -bun run test # vitest — note the pass count -bun run check # biome — must be clean -git status --short # confirm the agent stayed in its lane (no out-of-scope edits) -``` -- **Read ONLY the surfaces** (the contracts/hooks/public signatures the unit - exposes), not its implementation files — unless an implementation conflict or - trouble forces you in (§6). The surface plus green checks is enough to trust a - unit; subtle contract mistakes show up at the boundary, which is what the - contract + boundary tests are for. -- Confirm the agent touched ONLY its assigned files (one-owner rule). -- For pure units, confirm tests use NO internal `vi.mock("@dispatch/*")`. - -**Concurrency caveat (parallel waves):** `tsc -b`/vitest/biome are whole-project, so an agent's -OWN mid-wave check can transiently see a sibling's half-written file. Don't act on a report's -out-of-package errors; YOUR post-wave run is authoritative. Re-run a suite that depends on shared -external state before trusting it — and ALWAYS sweep leaked server/collector processes between -live runs (§8 bracket trick), since a leak silently poisons the next run's counts. - ---- - -## 5. Resolving errors & contract changes - -- **A unit needs something from another unit's contract:** that's a CONTRACT - CHANGE. The owner of the contract makes it. To find every consumer, use - `lsp references` on the changed exported symbol (contracts are static TS types, - so this returns the TRUE blast radius — §5.3). Then summon the affected owners - to update. The orchestrator dispatches this fan-out; agents don't reach across. -- **Integration bug (X and Y each honor the contract but don't work together):** - no single file owns it. Summon a **temporary multi-knowledge agent** with - read/write to the 2–3 relevant files (it MAY see implementation — exception to - the visibility rule), as their temporary exclusive owner. Dispatch proactively, - or when a file-owner requests it (§5.5). -- **CR (change-request) in a report:** if it's **build/config** (root - `tsconfig.json` ref, a `package.json` dep, `.gitignore`, `bun.lock`) the - orchestrator edits it directly, then re-verifies. If it's **implementation** (a - barrel `index.ts`, a sibling conforming to a contract change), the orchestrator - **summons the owning agent** — it does NOT edit implementation itself. -- **Live API errors:** an HTTP 429 `GoUsageLimitError` is an UPSTREAM rate limit, - not a bug. The `opencode-2` key has a monthly cap; `opencode-1` is the backup. - Swap `DISPATCH_API_KEY` in `.env` (both keys are there). - ---- - -## 5a. Agent-failure recovery patterns - -- **Plan-only / "shall I proceed?" agent.** A summon sometimes returns a PLAN and STOPS without - editing (no diff, no `reports/<unit>.md`). Detect via `git status` + the missing report. - Re-summon the SAME TASK prefixed: "IMPLEMENT THIS NOW — make all edits, run the checks, write - the report; do not stop to plan or ask." Don't hand-fix its work. -- **A behaviour change reds a SIBLING's tests (test fan-out).** When a unit's new behaviour - invalidates another unit's test ASSERTIONS, those tests belong to that OTHER owner — summon it - with a focused "fix these N failing tests to match the new behaviour" TASK (state the - behaviour). The orchestrator never edits feature tests itself. (Distinct from an INTEGRATION - bug where neither side is wrong — that's the temporary multi-knowledge agent in §5.) -- **Agent strayed out of its lane.** `git status --short` after every wave; if an agent touched a - file outside its package, keep it ONLY if it's legitimately the orchestrator's lane - (contracts / build / config / harness, §6) and note it — otherwise revert + re-summon with a - tighter scope. -- **Flaky green.** A wave that passes once but leaked a server/collector or relies on shared - external state can pass for the wrong reason; sweep (§8) and re-run before committing. - ---- - -## 6. Restrictions & invariants (NEVER violate) - -- **Single-writer:** never let two agents edit the same file concurrently. -- **Kernel purity:** no I/O / no concrete feature names in `packages/kernel` - (`.dispatch/rules/kernel-purity.md`). -- **Visibility rule (§5.1):** agents see only other units' CONTRACTS, never their - implementation. A contract documents **behavior & guarantees a consumer can - rely on, not just types** (P6 applied to contracts). An agent *needing* to read - another unit's code is a signal that contract is underspecified — fix the - contract, don't grant code access. (Exception: the temporary multi-knowledge - integration agent, §5 / ORCHESTRATOR §5, which MAY read implementation.) -- **The orchestrator NEVER reads or edits implementation.** You read ONLY contracts - (`packages/kernel/src/contracts/*`) + surfaces (manifests, public signatures) + - diagnostics (`typecheck`/`test` output, `lsp references` on contract symbols) + - agent reports. Do NOT open implementation `.ts` files (feature logic, tests, - composition roots) — not even during a bug. Clean context = level-headed - decisions; the subagents do the implementation. -- **What the orchestrator MAY edit directly:** (a) **contracts** - (`packages/kernel/src/contracts/*`); (b) **build wiring + config** (root/package - `tsconfig.json`, `package.json` deps, project refs, `.gitignore`, `bun.lock`); - (c) **harness/docs** (`ORCHESTRATOR.md`, `AGENTS.md`, `GLOSSARY.md`, - `.dispatch/rules/`, `notes/`, `tasks.md`, `prompts/`, `reports/`). Everything else - — all executable implementation `.ts`, including tests and composition roots like - `host-bin/src/main.ts` — changes ONLY by summoning the owning agent. -- **Roadblock → surface to the user.** If a needed change doesn't fit the above - (ambiguous ownership, a design question, a stuck agent), stop and ask rather than - reaching into implementation. -- **Subagents inherit this restriction.** Every prompt you write must instruct the - agent to read ONLY the surfaces (contracts/hooks) of OTHER units, with the sole - exception that it MAY read the implementation files of the task/extension it is - assigned to. It must not go spelunking through sibling units' implementations. -- **`onAny` is the ONLY allowed dynamic hook subscription** (observability/logging - firehose). All other cross-extension coupling is typed-symbol anchored (§5.4). -- **Contracts are static TYPES; loading is dynamic** (manifests via host). This - split is load-bearing — it's what makes `lsp references` fan-out work. -- **Full fidelity:** every core feature is a real extension with a manifest, - loaded through the host. Do NOT hand-wire imports to shortcut the extension - model — that defeats the point. -- **Asymmetric testing:** strict (zero internal mocks, high coverage) on - kernel/pure-core; lenient (thin integration tests) on the shell. -- **Destructive git ops:** be extremely careful. Back up irreplaceable files - (e.g. `notes/restructure-plan.md`) to `/tmp` before any `git reset --hard` / - `git clean`. `.env` is gitignored — preserve it. -- **Write things up before pivoting topics.** Keep `tasks.md` current in real - time. Don't leave decisions only in chat context (it can be lost). - ---- - -## 7. Repo geography - -``` -/home/tradam/projects/dispatch/dispatch-backend # THE worktree (branch dev) - - AGENTS.md the subagent constitution (auto-loaded by opencode; you enforce it) - ORCHESTRATOR.md the orchestrator's operating manual (this file) - GLOSSARY.md canonical vocabulary + aliases-to-avoid (human-gated) - tasks.md live progress checklist / milestone log - README.md deployment, CLI usage, extension/package tables - - .dispatch/ - package-agent.md base owner-agent brief (every summon) - extension-agent.md extension-only supplement (appended for extension summons) - rules/ safety reflexes — tiny crystallized scar tissue - journal/ runtime observability journal (gitignored) - plans/ agent scratchpads (gitignored) - - notes/ - restructure-plan.md the full architecture design + rationale (P1–P8; §-refs) - observability-design.md logging/spans/collector/trace-store design (Phase A–B) - cli-design.md CLI design decisions + unit plan (built; §3 = settled decisions) - frontend-design.md future web frontend design (IDEATION; separate repo) - opencode-agents.md notes on summoning agents via the opencode CLI - - prompts/ (gitignored — orchestrator→agent TASK blocks) - reports/ (gitignored — agent→orchestrator reports) - .env (gitignored — DISPATCH_API_KEY, DISPATCH_BASE_URL, DISPATCH_MODEL, BACKEND_PORT) - - packages/ - kernel/ contracts (ABI), bus, runtime (runTurn), host - wire/ types-only wire ABI (AgentEvent + conversation model + Usage); kernel + - transport-contract re-export it so clients consume the wire w/o the kernel runtime - transport-contract/ types-only HTTP API contract (CLI + future web + server share it) - ui-contract/ types-only surface ABI (frontend-agnostic; web + CLI render it) - storage-sqlite/ conversation-store/ auth-apikey/ provider-openai-compat/ - credential-store/ named credentials + model catalog (resolve / listCatalog) - session-orchestrator/ transport-http/ (core extensions) - tool-read-file/ standard tool extension (read_file; cwd-aware) - journal-sink/ trace-store/ observability-collector/ trace-replay/ (observability) - cli/ bundled one-shot terminal client (HTTP client of transport-contract) - host-bin/ composition root (boot + Bun.serve + collector supervisor) -``` - -The genesis commit deleted all prior source; we rebuilt from scratch. The OLD -project lives at `/home/tradam/projects/dispatch/dispatch-source` (reference only -— do not edit). - -The **web frontend is a SEPARATE repo** at `/home/tradam/projects/dispatch/dispatch-web` -(own git, own harness — its own `AGENTS.md`/`ORCHESTRATOR.md`/`GLOSSARY.md`/`.dispatch/`). -It consumes `packages/ui-contract` + the wire types as a pinned `file:` dependency. -`lsp references` does NOT span the two repos, so cross-repo contract changes are -**couriered via the user** (see the FE `ORCHESTRATOR.md` §5). Design + plan: -`notes/frontend-design.md`. Do NOT edit the FE repo from here. - ---- - -## 8. Current status & how to run - -See `tasks.md` for the live checklist. As of MVP completion: -- Kernel + 6 core extensions + host-bin DONE. 178 tests pass; typecheck + biome - clean. -- **MVP verified live:** multi-turn curl against OpenCode Go flash works - (`conversationId` threads history). - -**Boot + smoke test:** -```bash -cd /home/tradam/projects/dispatch/dispatch-backend -KEY1=$(grep DISPATCH_API_KEY_OPENCODE1 .env | cut -d= -f2) -PORT=4567 DISPATCH_API_KEY="$KEY1" bun packages/host-bin/src/main.ts # boots server -# in another shell: -curl -s -X POST localhost:4567/chat -H 'content-type: application/json' \ - -d '{"conversationId":"c1","message":"Say hello in 3 words."}' -``` -Note the chat field is **`conversationId`** (threads multi-turn), not `tabId`. - -**Live validation & process cleanup — the `[x]` bracket trick (scar tissue).** When -you live-validate you background the app (`bun packages/host-bin/src/main.ts &`), and it -now spawns a child **observability collector** process. To list or kill those, ALWAYS -use the bracket trick in `ps`/`pgrep`/`pkill` patterns: -```bash -ps -eo pid,args | grep '[o]bservability-collector/src/main' # list (won't self-match) -pkill -9 -f '[h]ost-bin/src/main.ts' # kill the app -pkill -9 -f '[o]bservability-collector/src/main' # kill the collector -``` -**Why it matters:** a plain `pkill -f 'host-bin/src/main.ts'` matches its OWN command -line and kills the parent shell → the tool call prints NOTHING and times out, looking -exactly like a wedged session. `[h]ost-bin` matches the target "host-bin" while the -literal pattern `[h]ost-bin` does not match itself. ALWAYS clean up the backgrounded app -+ its spawned collector after each live run — leaked processes pollute the next run's -counts (this is precisely what made a correct supervisor look like it spawned 3 -collectors and left 2 behind). - -**Live boot-probe in ONE command WILL hit the tool timeout — that is NOT failure (scar tissue).** -A single bash command that boots the app (even detached via `setsid … & disown`), sleeps, runs a -probe, then kills it will still run to the tool's timeout: the tool waits on the spawned -server/collector session. The probe already ran — **read the probe's printed `RESULT: OK/FAIL` -line as the signal**, ignore the timeout, then run a SEPARATE `pkill` (bracket-trick) + `ps` -cleanup command (it returns immediately and confirms no leaks). Don't try to make the boot+probe -command "return cleanly" — it won't. (For a frontend-agnostic surface, the probe is a tiny -`bun` WebSocket client that asserts `catalog → subscribe → surface`.) - -**Next suggested work** (post-MVP, see `tasks.md` "Open items"): wire -auth→provider properly (auth-apikey is currently vestigial), then add the first -TOOL extension to exercise the dispatch loop (turns currently run with `tools: -[]`). diff --git a/notes/conv-list-by-worktree-research.md b/notes/conv-list-by-worktree-research.md new file mode 100644 index 0000000..636f06e --- /dev/null +++ b/notes/conv-list-by-worktree-research.md @@ -0,0 +1,247 @@ +# Research — List conversations filtered by worktree / workspace + +> **Branch:** `feature/conv-list-by-worktree` (research notes only — no code changes). +> **Date:** 2026-06-25. **Repo:** `dispatch-backend` worktree +> `worktrees/conv-list-by-worktree/backend`. +> +> **Research question:** Does the Dispatch backend currently support getting a +> list of *open* conversations filtered by a specific worktree (or workspace)? + +## TL;DR + +**Yes — fully supported, via the `workspace` concept.** `GET /conversations` +accepts a composable `?workspaceId=` query filter, and every conversation +already carries a `workspaceId` (default `"default"`). Combined with the +`?status=` filter, "open conversations in workspace X" is: + +``` +GET /conversations?workspaceId=X&status=active,idle +``` + +("open" = not `closed`; the lifecycle statuses are `active | idle | closed`, +where `closed` is the archived state.) + +**Important terminology caveat:** **"worktree" is NOT a Dispatch domain term** — +it appears nowhere in `packages/` (zero matches; it only occurs in `ORCHESTRATOR.md` +and `notes/restructure-plan.md` as part of file *paths*). The two closest canonical +concepts are: + +| Canonical term (GLOSSARY) | Meaning | "Worktree" as directory? | +|---|---|---| +| **workspace** | A named, URL-driven *logical grouping* of conversations that owns a default cwd. Every conversation belongs to exactly one. (`@dispatch/wire` `Workspace`) | No — it's a slug, not a path | +| **working directory (cwd)** | The per-conversation *filesystem directory* tools/language-servers operate within. | **Yes** — a git worktree *is* a directory | + +The existing `GET /conversations` filter supports `?workspaceId=` but **not** +`?cwd=`. So if the intent is "conversations whose working directory is a *specific +git worktree path*", that is NOT supported today (see §3b). If the intent is the +logical grouping, it's fully supported (see §1–§2). + +--- + +## 1. Is there an existing endpoint? — YES + +`GET /conversations` lists all known conversations and supports three +**composable** query filters: + +| Filter | Values | Effect | +|---|---|---| +| `?workspaceId=<slug>` | workspace slug | Restrict to conversations in that workspace | +| `?status=<csv>` | `active`,`idle`,`closed` | Restrict to those lifecycle statuses | +| `?q=<prefix>` | id prefix | Short-id resolution; id-prefix match (applied in-memory) | + +**Open conversations in a workspace** = `?status=active,idle` (excludes `closed`) +combined with `?workspaceId=`. + +There is also a `GET /workspaces` endpoint that lists workspaces (each with a +`conversationCount`), and per-workspace CRUD — see §4. + +--- + +## 2. How it works (endpoint, parameters, response shape) + +### 2a. The route — `packages/transport-http/src/app.ts:775-814` + +```ts +app.get("/conversations", async (c) => { + // ?status= comma-separated (e.g. "active,idle"). Default: all. Invalid dropped. + const statusFilter = parseStatusFilter(c.req.query("status")); + // ?workspaceId= . Missing/empty/whitespace → ignored (all workspaces). + const workspaceId = rawWorkspaceId !== undefined && rawWorkspaceId.trim().length > 0 + ? rawWorkspaceId.trim() : undefined; + const filter = (statusFilter !== undefined || workspaceId !== undefined) + ? { ...(statusFilter ? { status: statusFilter } : {}), + ...(workspaceId ? { workspaceId } : {}) } + : undefined; + const all = await opts.conversationStore.listConversations(filter); + // ?q= prefix filter applied in-memory on the result. + const conversations = q.length > 0 ? all.filter((m) => m.id.startsWith(q)) : all; + return c.json({ conversations }, 200); // 500 on store error +}); +``` + +- `?status=` parsing: `parseStatusFilter` (`packages/transport-http/src/logic.ts:24-38`). + Valid values are `"active" | "idle" | "closed"` (`VALID_STATUSES`, logic.ts:16). + Invalid values are silently dropped; if *all* values are invalid → no filter + (returns all). +- `?workspaceId=` is whitespace-trimmed; empty/whitespace-only is ignored. +- The `filter` object passed to the store is `{ status?, workspaceId? }` — either + or both optional. + +### 2b. Response shape + +`200` → `ConversationListResponse` (`packages/transport-contract/src/index.ts:710-713`, +re-exported from `@dispatch/wire`): + +```ts +interface ConversationListResponse { + readonly conversations: readonly ConversationMeta[]; +} + +interface ConversationMeta { // @dispatch/wire, wire/src/index.ts:518-536 + readonly id: string; + readonly createdAt: number; // epoch-ms, set on first write + readonly lastActivityAt: number; // epoch-ms, updated on every append + readonly title: string; // first user message (truncated 80) or PUT /title + readonly status: ConversationStatus; // "active" | "idle" | "closed" + readonly workspaceId: string; // always present; "default" fallback + readonly compactedFrom?: string; // present iff post-compaction +} +``` + +- Sorted by `lastActivityAt` **descending** (most recent first); stable sort keeps + first-seen (index) order for ties. +- `workspaceId` is always present on the response — conversations never assigned + read as `"default"` (see `toMeta`, `store.ts:329-337`). +- Errors: store failure → `500 { error: "Failed to list conversations" }`. + +### 2c. The store filter — `packages/conversation-store/src/store.ts` + +`ConversationStore.listConversations` filter type (`store.ts:91-94`): + +```ts +readonly listConversations: (filter?: { + readonly status?: readonly ConversationStatus[]; + readonly workspaceId?: string; +}) => Promise<readonly ConversationMeta[]>; +``` + +Implementation (`store.ts:695-733`): reads the conversation index, dedups +(first-seen order), reads each conversation's meta row, applies both filters, sorts +desc by `lastActivityAt`. Specifically: + +- `statusFilter` → membership test against `row.status`. +- `workspaceFilter` → equality against `row.workspaceId ?? DEFAULT_WORKSPACE_ID` + (so legacy rows with no stored workspaceId match `"default"`). + +`DEFAULT_WORKSPACE_ID` is `"default"`. + +--- + +## 3. "What would it take to add it?" + +### 3a. If "worktree" means the logical **workspace** → already done. Nothing to add. + +The capability, data model, contract types, and tests all exist. Pin +`@dispatch/transport-contract` (currently `0.22.0`) and `@dispatch/wire` +(`0.12.0`) and call `GET /conversations?workspaceId=<slug>&status=active,idle`. + +### 3b. If "worktree" literally means a **filesystem directory / git worktree path** → +NOT supported today; small, well-contained change. + +The directory concept maps to **working directory (cwd)**, which is per-conversation +(`conversation-store` `getCwd`/`setCwd`, keyed per conversation; `GET/PUT +/conversations/:id/cwd`). The list endpoint does NOT support a `?cwd=` filter, and +`ConversationMeta` does NOT carry a `cwd` field (confirmed: `wire/src/index.ts:518-536` +has no `cwd`; filter type `store.ts:91-94` has no `cwd`). + +To add a `?cwd=` filter (filter conversations by their working directory), the change +touches three layers, all additive: + +1. **Contract (`@dispatch/wire`)** — (optional) add `readonly cwd?: string | null` to + `ConversationMeta` so the caller can see each conversation's directory. Additive + type bump (e.g. `0.12.0 → 0.13.0`). Not strictly required if filtering is + server-side-only, but useful for the FE to render. +2. **Store (`conversation-store/src/store.ts`)**: + - Widen the `listConversations` filter to `{ status?, workspaceId?, cwd? }`. + - In the scan loop (`store.ts:718-728`), for each candidate call `getCwd(id)` + (or the effective cwd via `getEffectiveCwd`) and compare. **Cost note:** this is + an extra storage read per conversation (cwd is stored in its own key, not in the + meta row) — fine for typical counts; if scale matters, add a `cwd` field to + `ConversationMetaRow` (populated on `setCwd`) so the filter is a row comparison + with no extra reads. The latter is the better design and mirrors how + `workspaceId` was added to the meta row. + - Decide exact-match (path equality) vs. prefix/normalized match (e.g. a worktree + and its subdirs). Equality is simplest and probably sufficient. +3. **Transport (`transport-http/src/app.ts` + `logic.ts`)** — parse `?cwd=` (trim; + empty → ignore, mirroring `?workspaceId=`) and pass `cwd` into the store filter. + Update `ConversationListResponse` doc. + +The worktree-as-directory case is the only one that requires new code; the +workspace case requires none. + +--- + +## 4. Related workspace capabilities (for context) + +The workspace model is fully built out (courier doc `frontend-workspaces-handoff.md` +→ implemented in `@dispatch/[email protected]` / `@dispatch/transport-contract`): + +- `GET /workspaces` → `WorkspaceListResponse` (workspaces sorted by `lastActivityAt` + desc, each with `conversationCount`). The `"default"` workspace is always + synthesized/present. +- `PUT /workspaces/:id` (create-on-miss, idempotent), `GET /workspaces/:id` (pure + read, 404 if missing), `PUT /workspaces/:id/title`, `PUT /workspaces/:id/default-cwd`, + `DELETE /workspaces/:id` (closes all its conversations → `status="closed"`, + reassigns them to `"default"`, returns `closedCount`; 409 for `"default"`). +- Workspace slug regex: `^[a-z0-9](?:[a-z0-9-]{0,38}[a-z0-9])?$` (1–40, lowercase, + digits, internal hyphens). Validated by `isValidWorkspaceSlug`. +- `workspaceId` is auto-created on turn start if missing (`title = id`, + `defaultCwd = null`). Auto-assigned on `/chat` and `POST /conversations/:id/queue`. +- `DELETE /conversations/:id/cwd` clears the explicit per-conversation cwd + (falls back to the workspace `defaultCwd`, then the server default). +- cwd resolution: explicit per-conversation cwd → workspace `defaultCwd` → server + default (`process.cwd()`). `GET /conversations/:id/cwd` returns the explicit cwd + only; `GET /conversations/:id/lsp` roots at the *effective* cwd. + +`Workspace` type (`wire/src/index.ts:566-577`): `{ id, title, defaultCwd: string|null, +createdAt, lastActivityAt }`. `WorkspaceEntry extends Workspace { conversationCount }`. + +--- + +## 5. Test coverage (verified green in-repo) + +- `conversation-store/src/store-workspace.test.ts:369` — "listConversations filtered + by workspaceId" (asserts work-a returns `[a1,a2]`, work-b returns `[b1]`). +- `conversation-store/src/store.test.ts:1463` — "listConversations filters by status", + incl. `{ status: ["active","idle"] }` (= "open") returns `[conv1, conv3]` + (excludes the `closed` conv2). Also `store.test.ts:1485` — status persists across a + fresh store instance. +- `transport-http/src/app.test.ts:3696` — "GET /conversations?workspaceId= filters" + (asserts the store receives `{ workspaceId: "proj" }` and responds `200`). +- `transport-http/src/app.test.ts` (q-filter block ~3133-3156) — `?q=` prefix + + empty/whitespace handling; 500 on store throw. + +The two filters are independently testable and composable at the store level +(same `filter` object carries both); HTTP-level composition of `?status=…&workspaceId=…` +is not asserted by an explicit combined test, but the handler builds one merged +`filter` object so composition is structural. + +--- + +## 6. Verdict & recommendation + +- **For the workspace interpretation (the canonical reading of "grouping of + conversations"): the feature already exists.** Use + `GET /conversations?workspaceId=<slug>&status=active,idle`. No code changes, no + data-model changes, no contract bumps needed. +- **For the literal "git worktree as a directory" interpretation: not supported** as a + list filter; the closest concept is per-conversation **cwd**. Adding a `?cwd=` filter + is a small additive change across `@dispatch/wire` (optional `cwd` on + `ConversationMeta`), `conversation-store` (`listConversations` filter + ideally a + `cwd` column on the meta row for cheap filtering), and `transport-http` (parse + `?cwd=`). See §3b. +- **Terminology:** recommend the user confirm which concept they mean. If they mean + the logical grouping, "workspace" is already the canonical GLOSSARY term — no new + vocabulary. If they truly need directory-based grouping, that is a distinct feature + from workspaces and should be scoped as such (it overlaps with, but is not, a + workspace; a single directory could be shared by multiple workspaces or none). diff --git a/notes/retry-with-backoff-plan.md b/notes/retry-with-backoff-plan.md new file mode 100644 index 0000000..99f6f5d --- /dev/null +++ b/notes/retry-with-backoff-plan.md @@ -0,0 +1,138 @@ +# Plan — Retry-with-backoff on retryable provider errors (FINALIZED) + +**Goal:** When the upstream LLM API returns a retryable error (e.g. "server +overloaded"), retry the request with a stepped backoff, visibly, until the +budget is exhausted. + +## The error (from the prod DB) — detection is already done + +- **HTTP 429** (46×) and **HTTP 502** (1×), **no 503s**. +- Body: `{"error":{"type":"overloaded_error","message":"The service is temporarily overloaded. Please retry."}}` +- `packages/openai-stream/src/stream.ts:201` **already sets** + `retryable: response.status >= 500 || response.status === 429` on the error + event, and `ProviderErrorEvent` (`kernel/contracts/provider.ts:72`) **already + declares `retryable?: boolean`**. The kernel's `processEvent` just ignores it. +- The error is **emitted (not thrown) and before any content** → retrying + `provider.stream()` is safe (no partial chunks to roll back). + +## Decision 1 — the backoff schedule + +`5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, then **repeat 30m** until **8h of +cumulative retry-wait** is reached, then give up (emit the final error + seal). + +Pure function of the attempt index (0 = first retry): +```ts +const SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000]; +const TAIL_MS = 1_800_000; // 30m +const BUDGET_MS = 8 * 60 * 60 * 1000; // 8h + +// pure, deterministic, no I/O +function delayFor(attempt: number): number | undefined { + const delay = attempt < SCHEDULE_MS.length ? SCHEDULE_MS[attempt] : TAIL_MS; + if (cumulativeSleepMs(attempt) > BUDGET_MS) return undefined; // over budget → stop + return delay; +} +``` +- `cumulativeSleepMs(attempt)` = sum of delay[0..attempt]; head (8 steps) sums to + 3,705s, then +1,800s per extra step. 8h (28,800s) is reached at attempt ~21 + → ~21 retries, ~7h32m of sleeping, then give up. +- Budget = cumulative *scheduled sleep* (pure/testable). If you prefer wall-clock + since first error, it switches to using the injected `now` — easy change. + +## Decision 2 — visible (yellow system-message warning) + 5d3f handoff + +Add a new **transient** `AgentEvent` variant (emitted to the frontend, NOT +persisted into the model's message history — so it never pollutes the prompt): + +```ts +// @dispatch/wire (AgentEvent union gains this member) +export interface TurnProviderRetryEvent { + readonly type: "provider-retry"; + readonly conversationId: string; + readonly turnId: string; + /** 0-based: this is the Nth retry about to happen. */ + readonly attempt: number; + /** ms the client should expect to wait before the retry fires. */ + readonly delayMs: number; + /** The endpoint's error verbatim (e.g. "HTTP 429: {…overloaded_error…}"). */ + readonly message: string; + /** The HTTP code when known (e.g. "429"). */ + readonly code?: string; +} +``` +- Emitted once per scheduled retry, BEFORE the sleep, so the UI shows + "⚠ Server overloaded — retrying in 5s…" immediately. +- When retries are exhausted (8h), the existing `error` event is emitted (as + today) and the turn seals — so the final failure is still a persisted error. + +**Frontend handoff to 5d3f:** render `provider-retry` as a yellow warning +system-message bubble showing `message` (+ `code`), with the countdown. (I do +the backend; 5d3f does the renderer — handoff via dispatch CLI.) + +## Decision 3 — retry ANY retryable error + +Retry trigger (both paths), **only when no content has been emitted yet** +(the safety invariant — never duplicate partial output): + +- **Emitted** `error` ProviderEvent with `retryable === true` → retry. (429/502/5xx + network fetch errors — all pre-content.) +- **Thrown** error (mid-stream, caught in `executeStep`'s `catch`) → treated as **retryable-by-default when pre-content** (most mid-stream throws are transient network/SSE issues). A thrown error after content is emitted is NOT retried (can't safely). + +So "if it's retryable, retry it" = the `retryable` flag drives emitted errors; +thrown errors default to retryable when nothing was streamed yet. Non-retryable +emitted errors (`retryable: false`/absent) end the step as today. + +## Architecture — kernel provides the HOOK, shell provides POLICY + I/O + +(Constitution: kernel touches no I/O; effects injected; decision pure.) + +### Kernel contract (`kernel/src/contracts/runtime.ts`) — add to `RunTurnInput`: +```ts +export interface RetryStrategy { + /** Pure: attempt → delay ms, or undefined to stop (budget exhausted). */ + readonly delayFor: (attempt: number) => number | undefined; + /** Injected effect: actually sleep. Kernel imports no timer. Abortable. */ + readonly sleep: (ms: number, signal: AbortSignal) => Promise<void>; +} +export interface RunTurnInput { + // …existing… + /** Optional injected retry. Omit = no retry (backward-compatible). */ + readonly retry?: RetryStrategy; +} +``` + +### Kernel loop (`kernel/src/runtime/run-turn.ts`, `executeStep`): +Wrap stream consumption in a retry loop: +- track `hadContent` (any text/reasoning/tool-call/usage seen); +- on a retryable error (emitted `retryable:true` OR thrown) with `!hadContent`: + - `delay = retry.delayFor(attempt)`; if `undefined` → give up (emit the + suppressed error, end step); + - else emit `providerRetryEvent(attempt, delay, message, code)`, `await + retry.sleep(delay, signal)`, `attempt++`, re-call `provider.stream()`; +- on abort during sleep → reject, seal turn `aborted` (existing flow). + +### Shell wiring (`session-orchestrator/src/orchestrator.ts`): +- Provide the concrete `RetryStrategy`: `delayFor` = the schedule + 8h budget + above; `sleep` = abortable `setTimeout`-based promise. +- Pass `retry` into the `RunTurnInput` it builds (line 589). + +## Build breakdown by unit (execution) + +| Unit (owner) | Change | +|---|---| +| `@dispatch/wire` | add `TurnProviderRetryEvent` to `AgentEvent` union | +| `kernel` contracts | add `RetryStrategy` + `retry?` on `RunTurnInput` | +| `kernel` events.ts | `providerRetryEvent(...)` constructor | +| `kernel` run-turn.ts | retry loop in `executeStep` (the core logic) | +| `kernel` run-turn.test.ts | pure tests: fake `sleep` + pure `delayFor`; assert schedule, no-after-content retry, give-up emits error, abort-during-sleep | +| `session-orchestrator` | wire concrete schedule + real `setTimeout` sleep | +| `transport-ws` | if it has an exhaustive `switch(event.type)`, add the `provider-retry` case | +| `transport-http` (mine) | **no change** — `serializeEventLine` is generic `JSON.stringify` | +| frontend (5d3f) | render `provider-retry` as a yellow warning system message | + +## Open items +- **8h budget = cumulative scheduled sleep** (pure). Confirm OK vs wall-clock. +- **Thrown errors default retryable-when-pre-content.** Confirm (vs only the + flagged emitted path). +- **Execution mode:** this spans kernel + wire + orchestrator (outside my + transport-http unit). Build it directly across units, or dispatch each slice + to its unit owner via the dispatch CLI? diff --git a/packages/cli/src/args.test.ts b/packages/cli/src/args.test.ts index 3d07c96..e613f31 100644 --- a/packages/cli/src/args.test.ts +++ b/packages/cli/src/args.test.ts @@ -254,6 +254,41 @@ describe("parseArgs", () => { }); }); + it("parses 'list' with --workspace", () => { + expect(parseArgs(["list", "--workspace", "proj"], { defaultServer })).toEqual({ + kind: "list", + server: "http://localhost:24203", + workspaceId: "proj", + all: false, + }); + }); + + it("parses 'list' with -w shorthand", () => { + const result = parseArgs(["list", "-w", "ws"], { defaultServer }); + expect(result.kind).toBe("list"); + if (result.kind === "list") expect(result.workspaceId).toBe("ws"); + }); + + it("parses 'list' with --workspace, --status, and a prefix together", () => { + const result = parseArgs(["list", "abc", "--status", "active", "--workspace", "proj"], { + defaultServer, + }); + expect(result).toEqual({ + kind: "list", + server: "http://localhost:24203", + query: "abc", + status: "active", + workspaceId: "proj", + all: false, + }); + }); + + it("errors when --workspace has no value (list)", () => { + const result = parseArgs(["list", "--workspace"], { defaultServer }); + expect(result.kind).toBe("error"); + if (result.kind === "error") expect(result.message).toContain("--workspace requires a value"); + }); + it("parses 'list' with --all", () => { expect(parseArgs(["list", "--all"], { defaultServer })).toEqual({ kind: "list", @@ -320,11 +355,31 @@ describe("parseArgs", () => { server: "http://localhost:24203", conversationId: "deadbeef", text: "hi", + file: undefined, + queue: false, + open: false, + }); + }); + + it("parses 'send' with --file", () => { + expect(parseArgs(["send", "deadbeef", "--file", "foo.txt"], { defaultServer })).toEqual({ + kind: "send", + server: "http://localhost:24203", + conversationId: "deadbeef", + text: undefined, + file: "foo.txt", queue: false, open: false, }); }); + it("parses 'send' with both --text and --file", () => { + const result = parseArgs(["send", "deadbeef", "--text", "hi", "--file", "f.txt"], { + defaultServer, + }); + expect(result).toMatchObject({ kind: "send", text: "hi", file: "f.txt" }); + }); + it("parses 'send' with --queue", () => { const result = parseArgs(["send", "deadbeef", "--text", "hi", "--queue"], { defaultServer, @@ -334,6 +389,7 @@ describe("parseArgs", () => { server: "http://localhost:24203", conversationId: "deadbeef", text: "hi", + file: undefined, queue: true, open: false, }); @@ -348,6 +404,7 @@ describe("parseArgs", () => { server: "http://localhost:24203", conversationId: "deadbeef", text: "hi", + file: undefined, queue: false, open: true, }); @@ -363,6 +420,7 @@ describe("parseArgs", () => { server: "http://localhost:24203", conversationId: "deadbeef", text: "hi", + file: undefined, queue: false, open: false, cwd: "/tmp", @@ -370,10 +428,10 @@ describe("parseArgs", () => { }); }); - it("requires --text", () => { + it("errors when --text and --file are both missing", () => { const result = parseArgs(["send", "deadbeef"], { defaultServer }); expect(result.kind).toBe("error"); - if (result.kind === "error") expect(result.message).toContain("--text"); + if (result.kind === "error") expect(result.message).toContain("--text or --file"); }); it("requires a conversation id", () => { @@ -386,6 +444,12 @@ describe("parseArgs", () => { const result = parseArgs(["send", "deadbeef", "--text"], { defaultServer }); expect(result.kind).toBe("error"); }); + + it("errors when --file has no value", () => { + const result = parseArgs(["send", "deadbeef", "--file"], { defaultServer }); + expect(result.kind).toBe("error"); + if (result.kind === "error") expect(result.message).toContain("--file requires a value"); + }); }); describe("open", () => { diff --git a/packages/cli/src/args.ts b/packages/cli/src/args.ts index 8a63777..74cc56a 100644 --- a/packages/cli/src/args.ts +++ b/packages/cli/src/args.ts @@ -33,6 +33,7 @@ export type ParsedCommand = readonly server: string; readonly query?: string; readonly status?: string; + readonly workspaceId?: string; readonly all: boolean; } | { readonly kind: "compact"; readonly server: string; readonly conversationId: string } @@ -42,7 +43,8 @@ export type ParsedCommand = readonly kind: "send"; readonly server: string; readonly conversationId: string; - readonly text: string; + readonly text?: string | undefined; + readonly file?: string | undefined; readonly queue: boolean; readonly open: boolean; readonly cwd?: string; @@ -84,6 +86,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma let server = opts.defaultServer; let query: string | undefined; let status: string | undefined; + let workspaceId: string | undefined; let all = false; for (let i = 1; i < argv.length; i++) { const arg = argv[i] as string; @@ -93,6 +96,9 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma } else if (arg === "--status") { if (i + 1 >= argv.length) return { kind: "error", message: "--status requires a value" }; status = argv[++i]; + } else if (arg === "--workspace" || arg === "-w") { + if (i + 1 >= argv.length) return { kind: "error", message: "--workspace requires a value" }; + workspaceId = argv[++i]; } else if (arg === "--all") { all = true; } else if (arg.startsWith("--")) { @@ -108,6 +114,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma server, ...(query !== undefined && { query }), ...(status !== undefined && { status }), + ...(workspaceId !== undefined && { workspaceId }), all, }; } @@ -204,6 +211,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma let server = opts.defaultServer; let conversationId: string | undefined; let text: string | undefined; + let file: string | undefined; let queue = false; let open = false; let cwd: string | undefined; @@ -221,6 +229,10 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma if (i + 1 >= argv.length) return { kind: "error", message: "--text requires a value" }; text = argv[++i]; break; + case "--file": + if (i + 1 >= argv.length) return { kind: "error", message: "--file requires a value" }; + file = argv[++i]; + break; case "--queue": queue = true; break; @@ -263,8 +275,11 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma if (conversationId === undefined) { return { kind: "error", message: "'send' requires a conversation id" }; } - if (text === undefined) { - return { kind: "error", message: "'send' requires --text" }; + if (!text && !file) { + return { + kind: "error", + message: "At least one of --text or --file is required for 'send'", + }; } return { @@ -272,6 +287,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma server, conversationId, text, + file, queue, open, ...(cwd !== undefined && { cwd }), diff --git a/packages/cli/src/http.test.ts b/packages/cli/src/http.test.ts index 2aa61e9..ab39813 100644 --- a/packages/cli/src/http.test.ts +++ b/packages/cli/src/http.test.ts @@ -289,6 +289,36 @@ describe("fetchConversations", () => { expect(calledUrl).toBe("http://localhost:24203/conversations?q=abc+def"); }); + it("appends ?workspaceId=<value> when a workspaceId is given", async () => { + let calledUrl: string | undefined; + const fakeFetch = (async (url: string | URL | Request): Promise<Response> => { + calledUrl = String(url); + return new Response(JSON.stringify({ conversations: [] }), { status: 200 }); + }) as unknown as typeof fetch; + + await fetchConversations( + { fetchImpl: fakeFetch }, + { server: "http://localhost:24203", workspaceId: "proj" }, + ); + expect(calledUrl).toBe("http://localhost:24203/conversations?workspaceId=proj"); + }); + + it("combines ?status= and ?workspaceId= when both are given", async () => { + let calledUrl: string | undefined; + const fakeFetch = (async (url: string | URL | Request): Promise<Response> => { + calledUrl = String(url); + return new Response(JSON.stringify({ conversations: [] }), { status: 200 }); + }) as unknown as typeof fetch; + + await fetchConversations( + { fetchImpl: fakeFetch }, + { server: "http://localhost:24203", status: "active,idle", workspaceId: "proj" }, + ); + expect(calledUrl).toBe( + "http://localhost:24203/conversations?status=active%2Cidle&workspaceId=proj", + ); + }); + it("throws on non-OK status", async () => { const fakeFetch = (async (): Promise<Response> => new Response("boom", { status: 500 })) as unknown as typeof fetch; diff --git a/packages/cli/src/http.ts b/packages/cli/src/http.ts index 42fcfec..e13842a 100644 --- a/packages/cli/src/http.ts +++ b/packages/cli/src/http.ts @@ -98,6 +98,7 @@ interface FetchConversationsOpts { readonly server: string; readonly query?: string; readonly status?: string; + readonly workspaceId?: string; } export async function fetchConversations( @@ -107,6 +108,7 @@ export async function fetchConversations( const params = new URLSearchParams(); if (opts.query !== undefined) params.set("q", opts.query); if (opts.status !== undefined) params.set("status", opts.status); + if (opts.workspaceId !== undefined) params.set("workspaceId", opts.workspaceId); const qs = params.toString(); const url = qs.length > 0 ? `${opts.server}/conversations?${qs}` : `${opts.server}/conversations`; const res = await deps.fetchImpl(url); diff --git a/packages/cli/src/main.ts b/packages/cli/src/main.ts index 9dfc317..cba0de7 100644 --- a/packages/cli/src/main.ts +++ b/packages/cli/src/main.ts @@ -24,12 +24,12 @@ import { extractLastText, formatConversationList, renderEvent } from "./render.j const USAGE = `Usage: dispatch models [--server <url>] - dispatch list [<prefix>] [--status <active|idle|closed>] [--all] [--server <url>] + dispatch list [<prefix>] [--status <active|idle|closed>] [--workspace <id>] [--all] [--server <url>] dispatch stop <conversationId> [--server <url>] dispatch compact <conversationId> [--server <url>] dispatch read <conversationId> [--server <url>] dispatch open <conversationId> [--server <url>] - dispatch send <conversationId> --text "..." [--queue] [--open] [--cwd <dir>] [--effort <level>] [--workspace <id>] [--server <url>] + dispatch send <conversationId> --text "..." [--file <path>] [--queue] [--open] [--cwd <dir>] [--effort <level>] [--workspace <id>] [--server <url>] dispatch <modelName> --text "..." [--file <path>] [--cwd <dir>] [--conversation <id>] [--effort <level>] [--workspace <id>] [--server <url>] [--show-reasoning] [--open] dispatch --help @@ -61,6 +61,7 @@ async function main(): Promise<void> { server: parsed.server, ...(parsed.query !== undefined && { query: parsed.query }), ...(status !== undefined && { status }), + ...(parsed.workspaceId !== undefined && { workspaceId: parsed.workspaceId }), }, ); const table = formatConversationList(result.conversations, Date.now()); @@ -156,10 +157,20 @@ async function main(): Promise<void> { process.stdout.write(`Signaled frontend to open ${conversationId}\n`); } + let fileContent: string | undefined; + if (parsed.file) { + fileContent = await readFile(parsed.file, "utf-8"); + } + const message = composeMessage({ + ...(parsed.text !== undefined && { text: parsed.text }), + ...(parsed.file !== undefined && { file: parsed.file }), + ...(fileContent !== undefined && { fileContent }), + }); + if (parsed.queue) { const queued = await enqueueMessage( { fetchImpl: globalThis.fetch }, - { server: parsed.server, conversationId, text: parsed.text }, + { server: parsed.server, conversationId, text: message }, ); const line = queued.startedTurn ? `Started turn for ${conversationId}` @@ -168,7 +179,7 @@ async function main(): Promise<void> { } else { const request = { conversationId, - message: parsed.text, + message, ...(parsed.cwd !== undefined && { cwd: parsed.cwd }), ...(parsed.reasoningEffort !== undefined && { reasoningEffort: parsed.reasoningEffort }), ...(parsed.workspaceId !== undefined && { workspaceId: parsed.workspaceId }), diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index 6c9652d..dca34c2 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -11,6 +11,7 @@ export type { TurnDoneEvent, TurnErrorEvent, TurnInputEvent, + TurnProviderRetryEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index c67607b..f3e5bca 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -40,6 +40,7 @@ export type { TurnDoneEvent, TurnErrorEvent, TurnInputEvent, + TurnProviderRetryEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, @@ -109,6 +110,7 @@ export type { export type { EventEmitter, FinishReason, + RetryStrategy, RunTurnInput, RunTurnResult, } from "./runtime.js"; diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index 03e62c2..dc74c84 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -140,6 +140,22 @@ export interface RunTurnInput { * double-persist them. */ readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void; + + /** + * Optional injected retry strategy for retryable provider errors (e.g. HTTP + * 429 / 5xx "overloaded"). When omitted, a retryable error ends the step + * exactly as before (backward-compatible). When provided, the runtime wraps + * `provider.stream()` consumption in a retry loop: on a retryable error + * (an emitted `error` ProviderEvent with `retryable === true`, OR a thrown + * error) — ONLY when no content was emitted yet this step (the safety + * invariant — never duplicate partial output) — it asks `retry.delayFor` + * for a delay, emits a transient `provider-retry` AgentEvent, sleeps via the + * injected `retry.sleep` (abortable), and re-calls `provider.stream()`. + * + * Injected (not ambient): the kernel imports no timer and owns no schedule. + * Mirrors the `now`/`logger` injection pattern — optional + backward-compatible. + */ + readonly retry?: RetryStrategy; } /** @@ -156,3 +172,31 @@ export interface RunTurnResult { /** Why the turn ended. */ readonly finishReason: FinishReason; } + +/** + * Injected retry strategy for retryable provider errors (e.g. HTTP 429 / 5xx). + * + * The kernel provides the HOOK (this contract + the retry loop in `runTurn`); + * the shell (session-orchestrator) provides the POLICY (the concrete schedule) + * and the I/O (the actual sleep). The kernel imports no timer — `sleep` is an + * injected effect so the runtime stays pure and deterministic in tests. + * + * Retries are ONLY attempted when NO content was emitted yet this step (the + * safety invariant — never duplicate partial output). When omitted on + * `RunTurnInput`, no retry happens (backward-compatible: a retryable error ends + * the step exactly as before). + */ +export interface RetryStrategy { + /** + * Pure, deterministic decision: given the 0-based attempt index, return the + * delay in ms to sleep before the next retry, or `undefined` to stop (budget + * exhausted). No I/O, no clock — fully testable. + */ + readonly delayFor: (attempt: number) => number | undefined; + /** + * Injected effect: actually sleep for the given ms. Must honor the abort + * signal — reject when aborted so the turn seals `aborted`. The kernel + * imports no timer; the shell provides a `setTimeout`-based implementation. + */ + readonly sleep: (ms: number, signal: AbortSignal) => Promise<void>; +} diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index b194577..5805e28 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -164,3 +164,17 @@ export function errorEvent( } return { type: "error", conversationId, turnId, message }; } + +export function providerRetryEvent( + conversationId: string, + turnId: string, + attempt: number, + delayMs: number, + message: string, + code?: string, +): AgentEvent { + if (code !== undefined) { + return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message, code }; + } + return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message }; +} diff --git a/packages/kernel/src/runtime/index.ts b/packages/kernel/src/runtime/index.ts index e1156e3..e0dd656 100644 --- a/packages/kernel/src/runtime/index.ts +++ b/packages/kernel/src/runtime/index.ts @@ -2,6 +2,7 @@ export type { StepDispatcher } from "./dispatch.js"; export { createStepDispatcher, executeToolCall } from "./dispatch.js"; export { errorEvent, + providerRetryEvent, reasoningDeltaEvent, textDeltaEvent, toolCallEvent, diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 08d3055..a9fc3d9 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -2886,4 +2886,539 @@ describe("runTurn", () => { expect(drainCallCount).toBe(MAX_STEPS - 1); }); }); + + // ── Retry with backoff ────────────────────────────────────────────────── + // + // PURE tests: a fake `sleep` (records calls, resolves instantly, can abort + // on a chosen call) + a pure `delayFor` (the canonical schedule + 8h budget). + // A stub `ProviderContract` whose `stream` yields a retryable error N times + // then a finish. ZERO mocks of `@dispatch/*` modules — effects injected. + + /** The canonical backoff schedule (matches the orchestrator's concrete strategy). */ + const RETRY_SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000]; + const RETRY_TAIL_MS = 1_800_000; // 30m + const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; // 8h + + /** Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). */ + function cumulativeSleepMs(attempt: number): number { + let sum = 0; + for (let i = 0; i <= attempt; i++) { + sum += i < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[i] : RETRY_TAIL_MS; + } + return sum; + } + + /** Pure, deterministic delay decision (no I/O, no clock). */ + function delayFor(attempt: number): number | undefined { + const delay = attempt < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[attempt] : RETRY_TAIL_MS; + if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop + return delay; + } + + /** The full schedule delayFor would emit (until budget exhausted). */ + function fullSchedule(): number[] { + const result: number[] = []; + let attempt = 0; + while (true) { + const delay = delayFor(attempt); + if (delay === undefined) break; + result.push(delay); + attempt++; + } + return result; + } + + /** + * Fake, controllable `sleep`: records every call's delay, resolves + * instantly (no real waiting), and can abort the controller on a chosen + * 1-based call index to simulate "abort during sleep". + */ + function createFakeSleep(controller: AbortController): { + sleep: (ms: number, signal: AbortSignal) => Promise<void>; + calls: number[]; + abortOnCall: (n: number) => void; + } { + const calls: number[] = []; + let abortAt: number | undefined; + const sleep = async (ms: number, _signal: AbortSignal): Promise<void> => { + calls.push(ms); + if (abortAt !== undefined && calls.length === abortAt) { + controller.abort(); + throw new Error("aborted"); + } + // Otherwise resolve instantly (no real waiting). + }; + return { + sleep, + calls, + abortOnCall: (n: number) => { + abortAt = n; + }, + }; + } + + /** A provider that yields a retryable error `errorCount` times, then success. */ + function createRetryingProvider(opts: { + errorCount: number; + error?: { message: string; code?: string; retryable?: boolean }; + success?: ProviderEvent[]; + }): { provider: ProviderContract; streamCalls: { value: number } } { + const streamCalls = { value: 0 }; + const error: ProviderEvent = { + type: "error", + message: opts.error?.message ?? "overloaded", + ...(opts.error?.code !== undefined ? { code: opts.error.code } : {}), + ...(opts.error?.retryable !== undefined ? { retryable: opts.error.retryable } : {}), + }; + const success = opts.success ?? [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ]; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = streamCalls.value++; + return (async function* () { + if (idx < opts.errorCount) { + yield error; + return; + } + for (const event of success) yield event; + })(); + }, + }; + return { provider, streamCalls }; + } + + describe("retry with backoff", () => { + it("retries a retryable emitted error on schedule then succeeds", async () => { + const { provider } = createRetryingProvider({ + errorCount: 3, + error: { message: "HTTP 429: overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("stop"); + // 3 retries: 5s, 10s, 30s. + expect(fake.calls).toEqual([5_000, 10_000, 30_000]); + // 3 provider-retry events (one per sleep), then the successful text. + const retryEvents = events.filter((e) => e.type === "provider-retry"); + expect(retryEvents).toHaveLength(3); + if (retryEvents[0]?.type === "provider-retry") { + expect(retryEvents[0].attempt).toBe(0); + expect(retryEvents[0].delayMs).toBe(5_000); + expect(retryEvents[0].message).toBe("HTTP 429: overloaded"); + expect(retryEvents[0].code).toBe("429"); + expect(retryEvents[0].conversationId).toBe("conv-1"); + expect(retryEvents[0].turnId).toBe("turn-1"); + } + if (retryEvents[1]?.type === "provider-retry") { + expect(retryEvents[1].attempt).toBe(1); + expect(retryEvents[1].delayMs).toBe(10_000); + } + if (retryEvents[2]?.type === "provider-retry") { + expect(retryEvents[2].attempt).toBe(2); + expect(retryEvents[2].delayMs).toBe(30_000); + } + // The error was suppressed (no error event emitted — retry succeeded). + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + // The successful content still streams. + const deltas = events.filter((e) => e.type === "text-delta"); + expect(deltas).toHaveLength(1); + }); + + it("sleep is called with the full schedule [5s,10s,30s,60s,5m,10m,15m,30m,30m…]", async () => { + // Provider errors forever → retries until budget exhausted → gives up. + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + // Budget exhausted → give up → error. + expect(result.finishReason).toBe("error"); + + // The sleep schedule matches the pure delayFor output exactly. + expect(fake.calls).toEqual(fullSchedule()); + + // Head of the schedule (the 8 stepped delays). + expect(fake.calls.slice(0, 8)).toEqual([ + 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000, + ]); + // Tail repeats 30m. + expect(fake.calls[8]).toBe(1_800_000); + expect(fake.calls.at(-1)).toBe(1_800_000); + + // 8h cumulative budget cap: head (3705s) + 13×30m = ~7h31m, then stop. + // 21 retries (attempts 0..20), then delayFor(21) → undefined → give up. + expect(fake.calls).toHaveLength(21); + const totalSlept = fake.calls.reduce((a, b) => a + b, 0); + expect(totalSlept).toBeLessThanOrEqual(RETRY_BUDGET_MS); + expect(totalSlept).toBe(3_705_000 + 13 * 1_800_000); // 27_105_000 + + // One provider-retry per sleep, plus a final error (give-up). + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(21); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + const errEvt = events.find((e) => e.type === "error"); + if (errEvt?.type === "error") { + expect(errEvt.message).toBe("overloaded"); + expect(errEvt.code).toBe("429"); + } + }); + + it("does NOT retry after content was emitted (safety invariant)", async () => { + // Provider yields text (content) THEN a retryable error. Because content + // was emitted, retrying is unsafe (would duplicate partial output). + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + yield { + type: "error", + message: "overloaded", + code: "429", + retryable: true, + } as ProviderEvent; + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + // No retries: stream called exactly once. + expect(callCount).toBe(1); + expect(fake.calls).toHaveLength(0); + // The error is emitted (give-up) and partial content preserved. + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); + }); + + it("does NOT retry a non-retryable emitted error (retryable: false)", async () => { + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "bad request", code: "400", retryable: false }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + }); + + it("does NOT retry a non-retryable emitted error (retryable absent)", async () => { + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "bad request", code: "400" }, // no retryable field + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + }); + + it("give-up emits the final error when budget is exhausted", async () => { + // Custom delayFor that allows exactly 1 retry then stops. + const shortDelayFor = (attempt: number): number | undefined => + attempt === 0 ? 100 : undefined; + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor: shortDelayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("error"); + expect(fake.calls).toEqual([100]); // one retry, then give up + // One provider-retry (attempt 0), then the final error. + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(1); + const errs = events.filter((e) => e.type === "error"); + expect(errs).toHaveLength(1); + if (errs[0]?.type === "error") { + expect(errs[0].message).toBe("overloaded"); + expect(errs[0].code).toBe("429"); + } + }); + + it("abort during sleep seals the turn aborted", async () => { + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + fake.abortOnCall(2); // abort on the 2nd sleep + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("aborted"); + // Two sleeps attempted; the 2nd aborted. + expect(fake.calls).toHaveLength(2); + // No terminal error emitted (it was an abort, not a give-up). + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + // One provider-retry before the aborted sleep (attempt 0). + const retries = events.filter((e) => e.type === "provider-retry"); + expect(retries).toHaveLength(2); + // The done event carries reason "aborted". + const done = events.find((e) => e.type === "done"); + if (done?.type === "done") { + expect(done.reason).toBe("aborted"); + } + }); + + it("omitting retry keeps the pre-retry behavior (backward-compatible)", async () => { + // A retryable error with no retry configured → ends the step as today. + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "overloaded", code: "429", retryable: true }, + }); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + // no retry field + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + }); + + it("retries a THROWN error (retryable-by-default when pre-content)", async () => { + // A thrown error (no retryable flag) before content is retried. + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + if (callCount <= 2) { + throw new Error("network blip"); + } + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(callCount).toBe(3); // 2 throws retried, 3rd succeeds + expect(fake.calls).toEqual([5_000, 10_000]); + expect(result.finishReason).toBe("stop"); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(2); + // Thrown errors have no code. + if (events[0]?.type === "provider-retry") { + expect(events[0].code).toBeUndefined(); + expect(events[0].message).toBe("network blip"); + } + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + }); + + it("does NOT retry a thrown error after content was emitted", async () => { + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("network blip"); + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(callCount).toBe(1); + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); + }); + + it("provider-retry events interleave correctly: error → retry-event → sleep → retry", async () => { + // Verify ordering: each provider-retry event comes BEFORE its sleep, + // and the successful content comes only after the last retry. + const { provider } = createRetryingProvider({ + errorCount: 2, + error: { message: "overloaded", code: "429", retryable: true }, + success: [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + const types = events.map((e) => e.type); + // turn-start, provider-retry(0), provider-retry(1), text-delta, step-complete, done + expect(types[0]).toBe("turn-start"); + const firstRetryIdx = types.indexOf("provider-retry"); + const textIdx = types.indexOf("text-delta"); + expect(firstRetryIdx).toBeGreaterThan(0); + expect(textIdx).toBeGreaterThan(firstRetryIdx); + // Both retries precede the text. + const retryCount = types.filter((t) => t === "provider-retry").length; + expect(retryCount).toBe(2); + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 940c77f..ac87a1f 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -6,12 +6,18 @@ import type { ProviderStreamOptions, Usage, } from "../contracts/provider.js"; -import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js"; +import type { + EventEmitter, + RetryStrategy, + RunTurnInput, + RunTurnResult, +} from "../contracts/runtime.js"; import type { ToolCall, ToolContract } from "../contracts/tool.js"; import { createStepDispatcher, type StepDispatcher } from "./dispatch.js"; import { doneEvent, errorEvent, + providerRetryEvent, reasoningDeltaEvent, stepCompleteEvent, textDeltaEvent, @@ -121,6 +127,8 @@ interface StepContext { readonly now: (() => number) | undefined; /** Per-turn provider options (model, systemPrompt, …) threaded to stream(). */ readonly providerOpts: ProviderStreamOptions | undefined; + /** Optional injected retry strategy (omit = no retry, backward-compatible). */ + readonly retry: RetryStrategy | undefined; } interface TimingState { @@ -250,12 +258,10 @@ function processEvent( case "finish": break; case "error": - if (event.code !== undefined) { - chunks.push({ type: "error", message: event.message, code: event.code }); - } else { - chunks.push({ type: "error", message: event.message }); - } - ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, event.message, event.code)); + // Handled by the retry loop in executeStep (not here): an error event + // is intercepted before processEvent so the step can decide whether to + // retry (suppressing the error) or give up (emit it). processEvent + // never receives an "error" event. break; } } @@ -316,34 +322,142 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { // Swallow — D7. } - try { - const opts: ProviderStreamOptions = { - ...ctx.providerOpts, - ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), - }; - const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); - for await (const event of stream) { - if (ctx.signal.aborted) break; - processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes); - if (event.type === "usage") { - stepUsage = addUsage(stepUsage, event.usage); + // Retry loop: wrap provider.stream() consumption. Retries are ONLY + // attempted when no content was emitted yet this step (the safety + // invariant — never duplicate partial output). On a retryable error — + // either an EMITTED `error` ProviderEvent with `retryable === true`, OR a + // THROWN error (retryable-by-default when pre-content) — with !hadContent: + // ask retry.delayFor(attempt); if it returns a delay → emit a transient + // provider-retry AgentEvent, sleep via the injected retry.sleep (abortable), + // attempt++, re-call provider.stream(); if it returns undefined (budget + // exhausted) → give up. Non-retryable emitted errors (retryable === false or + // absent), errors after content, and the no-retry-configured case all fall + // through to "give up" — identical to the pre-retry behavior. + let hadContent = false; + let attempt = 0; + while (true) { + let errored = false; + let wasThrown = false; + let errorMessage: string | undefined; + let errorCode: string | undefined; + let errorRetryable: boolean | undefined; + let thrownErr: unknown; + + try { + const opts: ProviderStreamOptions = { + ...ctx.providerOpts, + ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), + }; + const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); + for await (const event of stream) { + if (ctx.signal.aborted) break; + if (event.type === "error") { + // Intercept: hold for the retry decision — don't push a chunk + // or emit yet (a successful retry would leave a stale error). + errored = true; + errorMessage = event.message; + errorCode = event.code; + errorRetryable = event.retryable; + break; + } + if ( + event.type === "text-delta" || + event.type === "reasoning-delta" || + event.type === "tool-call" || + event.type === "usage" + ) { + hadContent = true; + } + processEvent( + event, + chunks, + toolCalls, + dispatcher, + ctx, + stepSpan, + timing, + toolDispatchTimes, + ); + if (event.type === "usage") { + stepUsage = addUsage(stepUsage, event.usage); + } + if (event.type === "finish") { + finishReason = event.reason; + } } - if (event.type === "finish") { - finishReason = event.reason; + } catch (err) { + errored = true; + wasThrown = true; + errorMessage = err instanceof Error ? err.message : String(err); + errorCode = undefined; + errorRetryable = undefined; + thrownErr = err; + } + + // Abort (during stream) → stop; the runTurn loop seals aborted. + if (ctx.signal.aborted) { + break; + } + + // No error → step succeeded. + if (!errored) { + break; + } + + // Retryable? A thrown error is retryable-by-default when pre-content; + // an emitted error is retryable ONLY when `retryable === true` (absent + // or false → not retried, per the contract). + const isRetryable = wasThrown ? true : errorRetryable === true; + if (ctx.retry !== undefined && !hadContent && isRetryable) { + const delay = ctx.retry.delayFor(attempt); + if (delay !== undefined) { + // Emit the transient provider-retry event BEFORE the sleep so the + // UI shows "⚠ retrying in Ns…" immediately. Not persisted as a + // chat message — it never pollutes the prompt. + ctx.emit( + providerRetryEvent( + ctx.conversationId, + ctx.turnId, + attempt, + delay, + errorMessage ?? "", + errorCode, + ), + ); + // Abortable sleep. If the signal fires during sleep, the shell's + // sleep rejects — we catch it and break so the turn seals aborted. + try { + await ctx.retry.sleep(delay, ctx.signal); + } catch { + // Abort during sleep (or unexpected sleep failure). + } + if (ctx.signal.aborted) { + break; + } + attempt++; + continue; } + // delayFor returned undefined → budget exhausted → give up. + } + + // Give up: emit the suppressed error and end the step. This is the + // single emission point for a terminal provider error (non-retryable, + // post-content, budget-exhausted, or no-retry-configured). + const message = errorMessage ?? ""; + if (errorCode !== undefined) { + chunks.push({ type: "error", message, code: errorCode }); + } else { + chunks.push({ type: "error", message }); } - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - chunks.push({ type: "error", message }); - ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message)); + ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message, errorCode)); finishReason = "error"; - // Close step span with error try { - stepSpan?.end({ err }); + stepSpan?.end({ err: thrownErr ?? new Error(message) }); } catch { // Swallow — D7. } stepSpan = undefined; + break; } // Close timing spans: if no first token was seen, end ttft with firstToken: false @@ -527,6 +641,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { computerId: input.computerId, now, providerOpts: input.providerOpts, + retry: input.retry, }); totalUsage = addUsage(totalUsage, stepResult.usage); diff --git a/packages/lsp/src/aggregate.test.ts b/packages/lsp/src/aggregate.test.ts new file mode 100644 index 0000000..4579a0a --- /dev/null +++ b/packages/lsp/src/aggregate.test.ts @@ -0,0 +1,141 @@ +import { describe, expect, it } from "vitest"; +import { type AggregateServer, aggregateDiagnostics } from "./aggregate.js"; +import type { LanguageServerClient } from "./client.js"; + +/** + * A minimal fake client: only `waitForDiagnostics` is exercised by + * aggregateDiagnostics, so we stub just that. Cast to the real type (mirrors + * tool.test.ts) — no real process, no internal mocks of our own modules. + */ +function fakeClient( + waitForDiagnostics: LanguageServerClient["waitForDiagnostics"], +): LanguageServerClient { + return { waitForDiagnostics } as unknown as LanguageServerClient; +} + +const SERVER_A: AggregateServer = { id: "a", name: "Ruby-LSP", root: "/p" }; +const SERVER_B: AggregateServer = { id: "b", name: "Steep", root: "/p" }; + +describe("aggregateDiagnostics", () => { + it("returns merged diagnostics from all responding servers, tagged by source", async () => { + const clients = new Map<string, LanguageServerClient>([ + [ + "a", + fakeClient(async () => ({ formatted: "ERROR L1:1: boom", slow: false, timedOut: false })), + ], + [ + "b", + fakeClient(async () => ({ formatted: "WARNING L2:3: meh", slow: false, timedOut: false })), + ], + ]); + + const result = await aggregateDiagnostics( + (id) => clients.get(id), + [SERVER_A, SERVER_B], + "/p/x.rb", + 10_000, + {}, + ); + + expect(result.timedOut).toBe(false); + expect(result.formatted).toContain("[Ruby-LSP]"); + expect(result.formatted).toContain("boom"); + expect(result.formatted).toContain("[Steep]"); + expect(result.formatted).toContain("meh"); + }); + + it("skips a server that times out with a raise-to-user notice, and still returns the fast server's result", async () => { + // Steep never resolves within the cap → timedOut; ruby-lsp answers fast. + const clients = new Map<string, LanguageServerClient>([ + ["a", fakeClient(async () => ({ formatted: "", slow: false, timedOut: false }))], + ["b", fakeClient(async () => ({ formatted: "", slow: false, timedOut: true }))], + ]); + + const result = await aggregateDiagnostics( + (id) => clients.get(id), + [SERVER_A, SERVER_B], + "/p/x.rb", + 10_000, + {}, + ); + + expect(result.timedOut).toBe(true); + // The skip notice names the offending server and the cap. + expect(result.formatted).toContain("[Steep]"); + expect(result.formatted).toContain("took too long"); + expect(result.formatted).toContain(">10s"); + expect(result.formatted).toContain("raise this to the user"); + // ruby-lsp answered cleanly (empty diagnostics) → no line for it. + expect(result.formatted).not.toContain("[Ruby-LSP]"); + }); + + it("runs servers concurrently: a slow server does not delay a fast one's contribution order", async () => { + const callOrder: string[] = []; + const clients = new Map<string, LanguageServerClient>([ + [ + "a", + fakeClient(async () => { + callOrder.push("a-start"); + await new Promise((r) => setTimeout(r, 5)); + callOrder.push("a-end"); + return { formatted: "from-a", slow: false, timedOut: false }; + }), + ], + [ + "b", + fakeClient(async () => { + callOrder.push("b-start"); + await new Promise((r) => setTimeout(r, 30)); + callOrder.push("b-end"); + return { formatted: "from-b", slow: false, timedOut: false }; + }), + ], + ]); + + const result = await aggregateDiagnostics( + (id) => clients.get(id), + [SERVER_A, SERVER_B], + "/p/x.rb", + 10_000, + {}, + ); + + // Both started before either ended → concurrent, not sequential. + expect(callOrder.slice(0, 2).sort()).toEqual(["a-start", "b-start"]); + expect(result.formatted).toContain("from-a"); + expect(result.formatted).toContain("from-b"); + }); + + it("a missing client (dead/excluded) contributes nothing and never rejects", async () => { + const result = await aggregateDiagnostics(() => undefined, [SERVER_A], "/p/x.rb", 10_000, {}); + expect(result.formatted).toBe(""); + expect(result.timedOut).toBe(false); + }); + + it("forwards text + minSeverity to each client's waitForDiagnostics", async () => { + const seen: Array<{ text?: string; minSeverity?: number; timeoutMs: number }> = []; + const clients = new Map<string, LanguageServerClient>([ + [ + "a", + fakeClient(async (_path, opts) => { + seen.push({ + text: opts?.text, + minSeverity: opts?.minSeverity, + timeoutMs: opts?.timeoutMs ?? -1, + }); + return { formatted: "", slow: false, timedOut: false }; + }), + ], + ]); + + await aggregateDiagnostics((id) => clients.get(id), [SERVER_A], "/p/x.rb", 7000, { + text: "post-edit buffer", + minSeverity: 2, + }); + + expect(seen).toHaveLength(1); + expect(seen[0]?.text).toBe("post-edit buffer"); + expect(seen[0]?.minSeverity).toBe(2); + expect(seen[0]?.timeoutMs).toBe(7000); + }); +}); diff --git a/packages/lsp/src/aggregate.ts b/packages/lsp/src/aggregate.ts new file mode 100644 index 0000000..b044e21 --- /dev/null +++ b/packages/lsp/src/aggregate.ts @@ -0,0 +1,80 @@ +/** + * Concurrent multi-server diagnostics aggregation. + * + * Queries every matching language server AT ONCE (not one-at-a-time), each + * capped at `timeoutMs`. A server that doesn't push diagnostics within the cap + * is SKIPPED with a per-server notice rather than blocking the others — so one + * slow/dead server (e.g. a corrupted Steep) can't hold up the fast one's + * (ruby-lsp) results for the full timeout on every edit. + * + * The only I/O here is `client.waitForDiagnostics` (injected via `getClient`), + * so this is unit-testable with a fake client and no real process. + */ + +import type { LanguageServerClient } from "./client.js"; + +export interface AggregateServer { + readonly id: string; + readonly name: string; + readonly root: string; +} + +export interface AggregateOpts { + /** Post-edit buffer; when omitted the server reads from disk. */ + readonly text?: string | undefined; + /** Only include diagnostics with severity ≤ this (1=Error, 2=Warning). */ + readonly minSeverity?: number | undefined; +} + +export interface AggregateResult { + /** Merged diagnostics tagged by source + a per-skipped-server notice. */ + readonly formatted: string; + /** True if at least one server was skipped for exceeding the cap. */ + readonly timedOut: boolean; +} + +/** + * Query `servers` concurrently, each capped at `timeoutMs`. Returns merged + * diagnostics tagged by source (`[name]\n…`) and, for any server that did not + * respond in time, a `⚠️ [name] LSP took too long (>Ns), skipped — please raise + * this to the user.` notice. Never rejects: a client error yields an empty + * contribution for that server. + */ +export async function aggregateDiagnostics( + getClient: (id: string, root: string) => LanguageServerClient | undefined, + servers: readonly AggregateServer[], + absolutePath: string, + timeoutMs: number, + opts: AggregateOpts, +): Promise<AggregateResult> { + const entries = await Promise.all( + servers.map(async (server) => { + const client = getClient(server.id, server.root); + if (!client) return null; + const waitOpts: { text?: string; timeoutMs: number; minSeverity?: number } = { timeoutMs }; + if (opts.text !== undefined) waitOpts.text = opts.text; + if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity; + const result = await client.waitForDiagnostics(absolutePath, waitOpts); + return { server, result }; + }), + ); + + const parts: string[] = []; + let timedOut = false; + const capSeconds = Math.round(timeoutMs / 1000); + + for (const entry of entries) { + if (!entry) continue; + const { server, result } = entry; + if (result.timedOut) { + timedOut = true; + parts.push( + `⚠️ [${server.name}] LSP took too long (>${capSeconds}s), diagnostics skipped — please raise this to the user.`, + ); + } else if (result.formatted) { + parts.push(`[${server.name}]\n${result.formatted}`); + } + } + + return { formatted: parts.join("\n\n"), timedOut }; +} diff --git a/packages/lsp/src/client.test.ts b/packages/lsp/src/client.test.ts index 681860f..338ef0b 100644 --- a/packages/lsp/src/client.test.ts +++ b/packages/lsp/src/client.test.ts @@ -3,6 +3,7 @@ import { type FileWatcher, type FsAccess, LanguageServerClient, + type ProcessExitHandler, type SpawnProcess, } from "./client.js"; import { encode } from "./framing.js"; @@ -288,4 +289,149 @@ describe("client", () => { client.shutdown(); expect(state.killed).toBe(true); }); + + it("onExit marks the client broken (error) so callers stop querying a corpse", async () => { + const state = { killed: false }; + let exitCb: ProcessExitHandler | null = null; + const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null }; + + const spawnWithExit: SpawnProcess = () => ({ + stdin: { write: () => {} }, + stdout: { + on: (_event: string, cb: (data: Uint8Array) => void) => { + stdoutHolder.cb = cb; + }, + }, + pid: 999, + kill: () => { + state.killed = true; + }, + onExit: (handler) => { + exitCb = handler; + }, + }); + + const { client } = makeClient({ spawn: spawnWithExit }); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + stdoutHolder.cb?.( + encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })), + ); + await startPromise; + expect(client.getState()).toBe("connected"); + + // Simulate the process dying (user kill / crash). + exitCb?.({ code: 1 }); + + expect(client.getState()).toBe("error"); + expect(client.getStateError()).toMatch(/process exited/); + // The (still-alive-in-test) process was killed to avoid a zombie. + expect(state.killed).toBe(true); + }); + + it("a dead client is skipped by waitForDiagnostics callers (state !== connected)", async () => { + // Build a client, connect, kill via onExit, then assert a diagnostics + // query would not block: getState() is "error" so the matching filter + // (state === "connected") excludes it. We assert the state guard. + const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null }; + let exitCb: ProcessExitHandler | null = null; + const spawnWithExit: SpawnProcess = () => ({ + stdin: { write: () => {} }, + stdout: { + on: (_e, cb) => { + stdoutHolder.cb = cb; + }, + }, + pid: 1, + kill: () => {}, + onExit: (handler) => { + exitCb = handler; + }, + }); + + const { client } = makeClient({ spawn: spawnWithExit }); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + stdoutHolder.cb?.( + encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })), + ); + await startPromise; + + exitCb?.({ code: null }); + expect(client.getState()).toBe("error"); + // The aggregate / getDiagnostics matching filter requires "connected". + expect(client.getState() === "connected").toBe(false); + }); + + it("corruption detector marks the client broken after repeated identical diagnostics despite text changes", async () => { + // A healthy server would change diagnostics as the file changes; a + // corrupted one re-emits the SAME non-empty set. Drive 5 edits with + // different text but identical diagnostics → client flips to error. + const { client, serverResponses } = makeClient(); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })); + await startPromise; + + const phantom = JSON.stringify({ + jsonrpc: "2.0", + method: "textDocument/publishDiagnostics", + params: { + uri: "file:///project/game.rb", + diagnostics: [ + { + range: { start: { line: 0, character: 27 }, end: { line: 0, character: 28 } }, + severity: 1, + message: "SyntaxError: unexpected token", + }, + ], + }, + }); + + const path = "/project/game.rb"; + // The first call establishes the baseline snapshot (no increment). + // Each subsequent call with identical diagnostics + changed text + // increments; the 6th call (5th increment) trips the threshold. + for (let i = 1; i <= 5; i++) { + const p = client.waitForDiagnostics(path, { text: `buf-v${i}`, timeoutMs: 2000 }); + // Push the identical phantom diagnostics so the poll resolves. + await new Promise((r) => setTimeout(r, 30)); + serverResponses(phantom); + await p; + expect(client.getState()).toBe("connected"); + } + // 6th identical-across-changed-text repeat trips the threshold. + const p6 = client.waitForDiagnostics(path, { text: "buf-v6", timeoutMs: 2000 }); + await new Promise((r) => setTimeout(r, 30)); + serverResponses(phantom); + await p6; + + expect(client.getState()).toBe("error"); + expect(client.getStateError()).toMatch(/repeated stale diagnostics/i); + }); + + it("corruption detector does NOT trip on a clean file (empty diagnostics stay identical)", async () => { + const { client, serverResponses } = makeClient(); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })); + await startPromise; + + const clean = JSON.stringify({ + jsonrpc: "2.0", + method: "textDocument/publishDiagnostics", + params: { uri: "file:///project/game.rb", diagnostics: [] }, + }); + const path = "/project/game.rb"; + + for (let i = 1; i <= 6; i++) { + const p = client.waitForDiagnostics(path, { text: `clean-v${i}`, timeoutMs: 2000 }); + await new Promise((r) => setTimeout(r, 30)); + serverResponses(clean); + await p; + } + // Empty diagnostics never count as "stale" — a clean file staying clean + // is normal, not corruption. + expect(client.getState()).toBe("connected"); + }); }); diff --git a/packages/lsp/src/client.ts b/packages/lsp/src/client.ts index 677a22a..ac7d025 100644 --- a/packages/lsp/src/client.ts +++ b/packages/lsp/src/client.ts @@ -4,13 +4,22 @@ * FileWatcher, and forwards matching disk changes. */ -import { DiagnosticsStore, type PublishDiagnosticsParams } from "./diagnostics.js"; +import { DiagnosticsStore, diagnosticKey, type PublishDiagnosticsParams } from "./diagnostics.js"; import { computeChangeRange } from "./diff.js"; import { FrameDecoder } from "./framing.js"; import { languageId as resolveLanguageId } from "./language.js"; import { JsonRpcConnection, type WriteFn } from "./rpc.js"; import { FileChangeType, WatchedFilesRegistry } from "./watched-files.js"; +/** Info delivered to an `onExit` handler when the child process terminates. */ +export interface ProcessExitInfo { + readonly code: number | null; + readonly signal?: string; +} + +/** A handler registered to be called when the child process exits. */ +export type ProcessExitHandler = (info: ProcessExitInfo) => void; + export interface SpawnedProcess { readonly stdin: { readonly write: (bytes: Uint8Array) => void }; readonly stdout: @@ -22,6 +31,13 @@ export interface SpawnedProcess { | undefined; readonly pid: number | undefined; readonly kill: () => void; + /** + * Register a handler fired when the child process exits (code|signal). + * Optional: when absent, death is detected via stdout-end instead. Wires + * Bun's `proc.exited` in production; tests invoke it directly to simulate + * a crash. Lets the client stop querying a dead server (no per-edit hang). + */ + readonly onExit?: ((handler: ProcessExitHandler) => void) | undefined; } export type SpawnProcess = ( @@ -102,6 +118,18 @@ export class LanguageServerClient { private openDocuments = new Map<string, { version: number; text: string }>(); /** Sync mode captured from the server's initialize capabilities: 1=Full, 2=Incremental. */ private textDocumentChange: 1 | 2 = 1; + /** + * Corruption detection: the last diagnostic-key set + synced text per URI. + * A healthy server's diagnostics change when the file changes; a corrupted + * one (e.g. Steep's ~3h phantom-SyntaxError drift) re-emits the identical + * non-empty set across edits. `staleRepeat` counts consecutive such repeats + * across URIs; at the threshold the client is marked broken (→ respawn). + */ + private lastDiagSnapshot = new Map<string, { keys: Set<string>; text: string }>(); + private staleRepeat = 0; + private static readonly STALE_REPEAT_THRESHOLD = 5; + /** Default timeout for outbound requests (hover/definition/references). */ + private static readonly REQUEST_TIMEOUT_MS = 10_000; constructor(deps: ClientDeps) { this.deps = deps; @@ -128,6 +156,12 @@ export class LanguageServerClient { } const proc = this.deps.spawn(this.deps.command as string[], spawnOpts); this.process = proc; + // Detect process death so we stop querying a corpse (fixes the + // per-edit hang after a server is killed/crashes). onExit is the + // primary signal; stdout-end is the defence-in-depth fallback. + if (proc.onExit) { + proc.onExit((info) => this.handleExit(info)); + } const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes); const rpc = new JsonRpcConnection(writeFn); @@ -158,8 +192,11 @@ export class LanguageServerClient { for await (const chunk of source) { this.handleBytes(chunk); } + // stdout closed — the process is gone (defence-in-depth alongside onExit, + // which some edges never call). Idempotent via handleExit's guard. + this.handleExit({ code: null }); } catch { - // process exited + this.handleExit({ code: null }); } })(); } @@ -172,6 +209,65 @@ export class LanguageServerClient { }); } + /** + * The server process exited (onExit or stdout-end). Transition to a broken + * state so callers skip it and the manager re-spawns after backoff — instead + * of polling a corpse for the full timeout on every edit. Idempotent. + */ + private handleExit(info: ProcessExitInfo): void { + if (this.state === "error" || this.state === "not-started") return; + const detail = info.signal !== undefined ? `signal ${info.signal}` : `code ${info.code ?? "?"}`; + this.markBroken(`language server process exited (${detail})`); + } + + /** + * Mark this client permanently broken: kill the process if still alive + * (corruption case), dispose the rpc (rejects pending requests), and drop + * edge handles. The manager's status() observes state:"error" and re-spawns + * after the bounded backoff. Called on process death AND on corruption. + */ + private markBroken(reason: string): void { + if (this.state === "error") return; + this.state = "error"; + this.stateError = reason; + this.fileWatcherHandle?.close(); + this.fileWatcherHandle = null; + this.process?.kill(); + this.process = null; + this.rpc?.dispose(); + this.rpc = null; + } + + /** + * Detect a server stuck re-emitting identical non-empty diagnostics + * despite the file content changing between calls — the signature of a + * corrupted parse/type-check state (e.g. Steep's ~3h phantom-SyntaxError + * drift, where a fresh CLI reports green on the same project). After + * STALE_REPEAT_THRESHOLD consecutive such repeats, mark the client broken + * so it is skipped + re-spawned. A clean file (empty diagnostics) or a + * genuinely changing diagnostic set resets the counter. Note the + * tradeoff: a real, unfixed error on an untouched line also "stays the + * same across edits", so this can false-positive on a healthy server — + * the threshold is set conservatively and the CLI type-check gate remains + * authoritative either way. + */ + private detectStaleDiagnostics(uri: string, text: string): void { + const merged = this.diagnostics.getMerged(uri); + const keys = new Set(merged.map((d) => diagnosticKey(d))); + const prev = this.lastDiagSnapshot.get(uri); + if (prev && keys.size > 0 && setsEqual(keys, prev.keys) && text !== prev.text) { + this.staleRepeat++; + } else { + this.staleRepeat = 0; + } + this.lastDiagSnapshot.set(uri, { keys, text }); + if (this.staleRepeat >= LanguageServerClient.STALE_REPEAT_THRESHOLD) { + this.markBroken( + "language server emitting repeated stale diagnostics despite file changes — likely corrupted; restarting", + ); + } + } + private handleBytes(chunk: Uint8Array): void { const messages = this.decoder.decode(chunk); for (const msg of messages) { @@ -403,26 +499,37 @@ export class LanguageServerClient { await this.open(filePath); } - const slowThreshold = 10_000; const start = Date.now(); - // Poll until the server pushes diagnostics (even empty = done) or timeout. - return new Promise((resolve) => { + // Poll until the server pushes diagnostics (even empty = done) or the + // per-server cap elapses (then we skip it — see aggregateDiagnostics). + const received = await new Promise<boolean>((resolve) => { const check = () => { const elapsed = Date.now() - start; - const received = this.diagnostics.hasReceivedPush(uri); - if (received || elapsed >= timeoutMs) { - resolve({ - formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity), - slow: elapsed > slowThreshold, - timedOut: !received, - }); + const got = this.diagnostics.hasReceivedPush(uri); + if (got || elapsed >= timeoutMs) { + resolve(got); return; } setTimeout(check, 100); }; check(); }); + + // Only a server that actually pushed can be corruption-checked. + if (received) { + this.detectStaleDiagnostics(uri, opts?.text ?? ""); + } + + // `slow` is structurally false now: the per-server cap is 10s, so + // elapsed can never exceed the old "unusually long" threshold. That + // warning is superseded by the timeout→skip notice produced in + // aggregateDiagnostics. The field is kept for contract compatibility. + return { + formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity), + slow: false, + timedOut: !received, + }; } getWatchedFilesRegistry(): WatchedFilesRegistry { @@ -433,11 +540,21 @@ export class LanguageServerClient { return this.diagnostics; } - async request(method: string, params?: unknown): Promise<unknown> { + /** + * Send a request (hover/definition/references/documentSymbol). Capped at + * REQUEST_TIMEOUT_MS so a dead/slow server can't hang the turn — the + * initialize handshake bypasses this (it calls rpc.sendRequest directly + * with its own 45s race). + */ + async request( + method: string, + params?: unknown, + timeoutMs: number = LanguageServerClient.REQUEST_TIMEOUT_MS, + ): Promise<unknown> { if (!this.rpc || this.state !== "connected") { throw new Error("Client not connected"); } - return this.rpc.sendRequest(method, params); + return this.rpc.sendRequest(method, params, timeoutMs); } shutdown(): void { @@ -450,3 +567,11 @@ export class LanguageServerClient { this.state = "not-started"; } } + +function setsEqual<T>(a: Set<T>, b: Set<T>): boolean { + if (a.size !== b.size) return false; + for (const v of a) { + if (!b.has(v)) return false; + } + return true; +} diff --git a/packages/lsp/src/diagnostics.ts b/packages/lsp/src/diagnostics.ts index bc7ac0a..50beca9 100644 --- a/packages/lsp/src/diagnostics.ts +++ b/packages/lsp/src/diagnostics.ts @@ -91,7 +91,7 @@ export class DiagnosticsStore { } } -function diagnosticKey(d: Diagnostic): string { +export function diagnosticKey(d: Diagnostic): string { const r = d.range; return `${r.start.line}:${r.start.character}-${r.end.line}:${r.end.character}:${d.severity ?? 0}:${d.message}`; } diff --git a/packages/lsp/src/extension.ts b/packages/lsp/src/extension.ts index 8e3178a..c0fee44 100644 --- a/packages/lsp/src/extension.ts +++ b/packages/lsp/src/extension.ts @@ -8,6 +8,7 @@ import { extname, join } from "node:path"; import type { Extension, HostAPI, ServiceHandle } from "@dispatch/kernel"; import { defineService } from "@dispatch/kernel"; +import { aggregateDiagnostics } from "./aggregate.js"; import type { SpawnedProcess } from "./client.js"; import { LspManager } from "./manager.js"; import { createLspTool } from "./tool.js"; @@ -43,6 +44,14 @@ function realSpawn( stderr: proc.stderr, pid: proc.pid, kill: () => proc.kill(), + // Surface process exit so the client can stop querying a dead server + // and self-heal (respawn). Bun's Subprocess.exited resolves with the + // exit code (or rejects if killed by signal — treat as code:null). + onExit: (handler) => { + (proc as { exited: Promise<number | null> }).exited + .then((code) => handler({ code })) + .catch(() => handler({ code: null })); + }, }; } @@ -108,14 +117,20 @@ export const extension: Extension = { return manager.status(cwd); }, async getDiagnostics(opts: GetDiagnosticsOpts): Promise<DiagnosticsResult> { - const timeoutMs = opts.timeoutMs ?? 60_000; - const slowThreshold = 10_000; + // 10s hard ceiling per server, regardless of what the caller + // passes (the edit hook still passes 60_000 — clamped here, so + // no other-unit edit is needed). A server that doesn't respond + // in 10s is skipped with a notice instead of waited out. + const PER_SERVER_CAP_MS = 10_000; + const timeoutMs = Math.min(opts.timeoutMs ?? PER_SERVER_CAP_MS, PER_SERVER_CAP_MS); const fileExt = extname(opts.filePath).toLowerCase(); const absolutePath = opts.filePath.startsWith("/") ? opts.filePath : join(opts.cwd, opts.filePath); // Get all connected servers matching this file's extension. + // A dead/corrupted server has state:"error" and is excluded — + // no per-edit hang on a corpse. const statuses = await manager.status(opts.cwd); const matching = statuses.filter( (s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt), @@ -125,34 +140,15 @@ export const extension: Extension = { return { formatted: "", slow: false, timedOut: false }; } - const parts: string[] = []; - let anySlow = false; - let anyTimedOut = false; - const start = Date.now(); - - for (const s of matching) { - const client = manager.getClient(s.id, s.root); - if (!client) continue; - const waitOpts: { text?: string; timeoutMs?: number; minSeverity?: number } = { - timeoutMs, - }; - if (opts.text !== undefined) waitOpts.text = opts.text; - if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity; - const result = await client.waitForDiagnostics(absolutePath, waitOpts); - if (result.slow) anySlow = true; - if (result.timedOut) anyTimedOut = true; - if (result.formatted) { - parts.push(`[${s.name}]\n${result.formatted}`); - } - } - - const elapsed = Date.now() - start; + const agg = await aggregateDiagnostics( + (id, root) => manager.getClient(id, root), + matching, + absolutePath, + timeoutMs, + { text: opts.text, minSeverity: opts.minSeverity }, + ); - return { - formatted: parts.join("\n\n"), - slow: anySlow || elapsed > slowThreshold, - timedOut: anyTimedOut, - }; + return { formatted: agg.formatted, slow: false, timedOut: agg.timedOut }; }, }; host.provideService(lspServiceHandle, service); diff --git a/packages/lsp/src/manager.test.ts b/packages/lsp/src/manager.test.ts index 1649111..d8cba4f 100644 --- a/packages/lsp/src/manager.test.ts +++ b/packages/lsp/src/manager.test.ts @@ -361,4 +361,92 @@ describe("manager", () => { expect(s[0]?.error).toContain("[from .dispatch/lsp.json]"); expect(s[0]?.error).toContain("spawn failed"); }); + + it("a client that dies after connecting is skipped + re-spawned after backoff (no storm, no eternal hang)", async () => { + // A spawn that completes the initialize handshake AND lets the test + // simulate process death via the captured onExit handler. + const exitHandlers: Array<(info: { code: number | null; signal?: string }) => void> = []; + let spawnCount = 0; + const spawn: SpawnProcess = () => { + spawnCount++; + let messageHandler: ((data: Uint8Array) => void) | null = null; + const proc: SpawnedProcess = { + stdin: { + write: (bytes: Uint8Array) => { + const decoded = new TextDecoder().decode(bytes); + const headerEnd = decoded.indexOf("\r\n\r\n"); + if (headerEnd === -1) return; + const json = decoded.slice(headerEnd + 4); + try { + const msg = JSON.parse(json); + if (msg.method === "initialize") { + setTimeout(() => { + const response = JSON.stringify({ + jsonrpc: "2.0", + id: msg.id, + result: { capabilities: {} }, + }); + messageHandler?.(encode(response)); + }, 1); + } + } catch { + // ignore + } + }, + }, + stdout: { + on: (_event: string, cb: (data: Uint8Array) => void) => { + messageHandler = cb; + }, + }, + pid: 1000 + spawnCount, + kill: () => {}, + onExit: (handler) => { + exitHandlers.push(handler); + }, + }; + return proc; + }; + + const clock = { now: 0 }; + const manager = new LspManager({ + spawn, + fileWatcher: noopFileWatcher(), + fs: fakeFs({ + "/project/.dispatch/lsp.json": JSON.stringify({ + servers: { + steep: { + command: ["steep", "--stdio"], + extensions: [".rb"], + rootMarkers: [], + }, + }, + }), + }), + now: () => clock.now, + }); + + // 1) Connects. + const s1 = await manager.status("/project"); + expect(s1[0]?.state).toBe("connected"); + expect(spawnCount).toBe(1); + + // 2) Simulate the process dying (user kill / crash) via onExit. + exitHandlers[0]?.({ code: 1 }); + const clientAfterDeath = manager.getClient("steep", "/project"); + expect(clientAfterDeath?.getState()).toBe("error"); + + // 3) status() now reports error (and seeds a broken entry for backoff). + // Backoff not elapsed yet (clock frozen at 0) → NOT re-spawned. + const s2 = await manager.status("/project"); + expect(s2[0]?.state).toBe("error"); + expect(s2[0]?.error).toMatch(/process exited/); + expect(spawnCount).toBe(1); // no retry storm before backoff + + // 4) After the backoff elapses, status() re-spawns a fresh server. + clock.now = 31_000; + const s3 = await manager.status("/project"); + expect(s3[0]?.state).toBe("connected"); + expect(spawnCount).toBe(2); // re-spawned exactly once + }); }); diff --git a/packages/lsp/src/manager.ts b/packages/lsp/src/manager.ts index 7153956..bc84479 100644 --- a/packages/lsp/src/manager.ts +++ b/packages/lsp/src/manager.ts @@ -134,6 +134,19 @@ export class LspManager { if (existing) { const state = existing.client.getState(); const stateError = existing.client.getStateError(); + // A client that died or corrupted AFTER connecting flipped its + // own state to "error" (client.ts handleExit/markBroken). Spawn + // succeeded so there's no broken entry yet — seed one so the + // bounded-backoff path above re-spawns it, instead of reporting + // error forever (and so getDiagnostics' "connected" filter skips + // it, avoiding a per-edit hang on the corpse). + if (state === "error" && !this.broken.has(key)) { + this.broken.set(key, { + configFingerprint: configFingerprint(server), + brokenAt: this.now(), + error: enrichError(server, stateError ?? "server unavailable"), + }); + } const status: LspServerStatus = { id: server.id, name: server.name, diff --git a/packages/lsp/src/rpc.test.ts b/packages/lsp/src/rpc.test.ts index 05ce924..7b22ec5 100644 --- a/packages/lsp/src/rpc.test.ts +++ b/packages/lsp/src/rpc.test.ts @@ -84,3 +84,29 @@ it("handleMessage does not throw on malformed JSON", async () => { await expect(conn.handleMessage("")).resolves.toBeUndefined(); await expect(conn.handleMessage("not json at all")).resolves.toBeUndefined(); }); + +describe("sendRequest timeout", () => { + it("rejects with a timeout error when no response arrives within timeoutMs", async () => { + const { conn } = makeConnection(); + const promise = conn.sendRequest("textDocument/hover", {}, 50); + await expect(promise).rejects.toThrow(/LSP request timed out after 50ms: textDocument\/hover/); + }); + + it("clears the timer on a normal response (no unhandled rejection)", async () => { + const { conn } = makeConnection(); + const promise = conn.sendRequest("textDocument/hover", {}, 5000); + conn.handleMessage(frameResponse(1, { ok: true })); + await expect(promise).resolves.toEqual({ ok: true }); + // Give the (now-cleared) timer window ample time to prove it never fires. + await new Promise((r) => setTimeout(r, 80)); + }); + + it("does not time out when no timeoutMs is given (initialize handshake path)", async () => { + const { conn } = makeConnection(); + const promise = conn.sendRequest("initialize", {}); + // A late response well past any plausible default still resolves. + await new Promise((r) => setTimeout(r, 60)); + conn.handleMessage(frameResponse(1, { capabilities: {} })); + await expect(promise).resolves.toEqual({ capabilities: {} }); + }); +}); diff --git a/packages/lsp/src/rpc.ts b/packages/lsp/src/rpc.ts index 6b82624..95157de 100644 --- a/packages/lsp/src/rpc.ts +++ b/packages/lsp/src/rpc.ts @@ -39,11 +39,36 @@ export class JsonRpcConnection { this.write = write; } - sendRequest(method: string, params?: unknown): Promise<unknown> { + /** + * Send a request and await the correlated response. If `timeoutMs` is given, + * the promise rejects with a timeout error after that long — so a dead/slow + * server can't hang the caller forever (hover/definition/references). + * No `timeoutMs` = wait indefinitely (used by the initialize handshake, which + * has its own 45s race). + */ + sendRequest(method: string, params?: unknown, timeoutMs?: number): Promise<unknown> { const id = this.nextId++; const msg: JsonRpcMessage = { jsonrpc: "2.0", id, method, params }; return new Promise((resolve, reject) => { - this.pending.set(id, { resolve, reject }); + let timer: ReturnType<typeof setTimeout> | undefined; + // Wrap resolve/reject so the timer is cleared on a normal response + // (or on dispose) — no dangling timer after completion. + const finish = (fn: () => void): void => { + if (timer) clearTimeout(timer); + fn(); + }; + const entry: PendingRequest = { + resolve: (value: unknown) => finish(() => resolve(value)), + reject: (reason: unknown) => finish(() => reject(reason)), + }; + if (timeoutMs !== undefined) { + timer = setTimeout(() => { + if (this.pending.delete(id)) { + reject(new Error(`LSP request timed out after ${timeoutMs}ms: ${method}`)); + } + }, timeoutMs); + } + this.pending.set(id, entry); this.sendMessage(msg); }); } diff --git a/packages/lsp/src/tool.test.ts b/packages/lsp/src/tool.test.ts index 03787ae..efd1514 100644 --- a/packages/lsp/src/tool.test.ts +++ b/packages/lsp/src/tool.test.ts @@ -176,4 +176,101 @@ describe("tool", () => { expect(result.isError).toBe(true); expect(result.content).toContain("requires both"); }); + + it("diagnostics op: a server that times out is skipped with a raise-to-user notice", async () => { + const mockClient = { + getState: () => "connected" as const, + getStateError: () => undefined, + request: async () => null, + waitForDiagnostics: async () => ({ formatted: "", slow: false, timedOut: true }), + }; + + const tool = createLspTool( + stubManager({ + status: async () => [ + { + id: "steep", + name: "Steep", + root: "/project", + extensions: [".rb"], + state: "connected", + }, + ], + getClient: () => mockClient as never, + }), + ); + + const result = await tool.execute( + { operation: "diagnostics", path: "game.rb" }, + { + toolCallId: "test", + onOutput: () => {}, + signal: AbortSignal.timeout(5000), + log: { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + child: () => ({}) as never, + span: () => ({}) as never, + }, + cwd: "/project", + }, + ); + + expect(result.isError).not.toBe(true); + expect(result.content).toContain("[Steep]"); + expect(result.content).toContain("took too long"); + expect(result.content).toContain(">10s"); + expect(result.content).toContain("raise this to the user"); + }); + + it("diagnostics op: responding servers' diagnostics are merged, tagged by source", async () => { + const mockClient = { + getState: () => "connected" as const, + getStateError: () => undefined, + request: async () => null, + waitForDiagnostics: async () => ({ + formatted: "ERROR L1:1: boom", + slow: false, + timedOut: false, + }), + }; + + const tool = createLspTool( + stubManager({ + status: async () => [ + { + id: "steep", + name: "Steep", + root: "/project", + extensions: [".rb"], + state: "connected", + }, + ], + getClient: () => mockClient as never, + }), + ); + + const result = await tool.execute( + { operation: "diagnostics", path: "game.rb" }, + { + toolCallId: "test", + onOutput: () => {}, + signal: AbortSignal.timeout(5000), + log: { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + child: () => ({}) as never, + span: () => ({}) as never, + }, + cwd: "/project", + }, + ); + + expect(result.content).toContain("[Steep]"); + expect(result.content).toContain("boom"); + }); }); diff --git a/packages/lsp/src/tool.ts b/packages/lsp/src/tool.ts index 8d282ec..be0d269 100644 --- a/packages/lsp/src/tool.ts +++ b/packages/lsp/src/tool.ts @@ -6,6 +6,7 @@ import { extname, resolve } from "node:path"; import type { ToolContract, ToolExecuteContext, ToolResult } from "@dispatch/kernel"; +import { aggregateDiagnostics } from "./aggregate.js"; import type { LspManager } from "./manager.js"; type Operation = "diagnostics" | "hover" | "definition" | "references" | "documentSymbol"; @@ -157,6 +158,8 @@ export function createLspTool(manager: LspManager): ToolContract { switch (operation) { case "diagnostics": { + // 10s hard ceiling per server (same policy as the edit path). + const DIAGNOSTICS_TIMEOUT_MS = 10_000; // Query ALL connected servers whose extensions match this file. const matching = statuses.filter( (s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt), @@ -179,31 +182,27 @@ export function createLspTool(manager: LspManager): ToolContract { if (!client) { return { content: "Language server client not available.", isError: true }; } - const result = await client.waitForDiagnostics(absolutePath); + const result = await client.waitForDiagnostics(absolutePath, { + timeoutMs: DIAGNOSTICS_TIMEOUT_MS, + }); + if (result.timedOut) { + return { + content: `⚠️ [${connected.name}] LSP took too long (>10s), diagnostics skipped — please raise this to the user.`, + }; + } return { content: result.formatted || "No diagnostics found." }; } - // Query each matching server and merge results, tagged by source. - const parts: string[] = []; - let anyTimedOut = false; - for (const s of matching) { - const client = manager.getClient(s.id, s.root); - if (!client) continue; - const result = await client.waitForDiagnostics(absolutePath, { timeoutMs: 60_000 }); - if (result.timedOut) anyTimedOut = true; - if (result.slow) { - parts.push( - `⚠️ LSP is taking unusually long. If this happens more than once, raise it to the user.`, - ); - } - if (result.formatted) { - parts.push(`[${s.name}]\n${result.formatted}`); - } - } - if (anyTimedOut && parts.length === 0) { - parts.push("Diagnostics timed out (server may still be indexing)."); - } - return { content: parts.length > 0 ? parts.join("\n\n") : "No diagnostics found." }; + // Query matching servers concurrently, each capped at 10s; + // a non-responding server is skipped with a notice. + const agg = await aggregateDiagnostics( + (id, root) => manager.getClient(id, root), + matching, + absolutePath, + DIAGNOSTICS_TIMEOUT_MS, + {}, + ); + return { content: agg.formatted || "No diagnostics found." }; } case "hover": { const client = await getFirstMatchingClient(manager, statuses, fileExt); diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts index fa8d9e9..aaafb76 100644 --- a/packages/session-orchestrator/src/index.ts +++ b/packages/session-orchestrator/src/index.ts @@ -12,6 +12,7 @@ export { conversationOpened, conversationStatusChanged, createCompactionService, + createRetryStrategy, createSessionOrchestrator, createWarmService, type EnqueueInput, @@ -34,8 +35,13 @@ export { } from "./orchestrator.js"; export { buildUserMessage, + cumulativeSleepMs, defaultDispatchPolicy, + delayFor, generateTurnId, + RETRY_BUDGET_MS, + RETRY_SCHEDULE_MS, + RETRY_TAIL_MS, resolveReasoningEffort, selectFirstProvider, } from "./pure.js"; diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index ae27e59..a1401d6 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -11,6 +11,7 @@ import type { ProviderEvent, ProviderStreamOptions, ReasoningEffort, + RetryStrategy, RunTurnInput, RunTurnResult, ToolContract, @@ -24,6 +25,7 @@ import { createMetricsAccumulator } from "./metrics.js"; import { buildUserMessage, defaultDispatchPolicy, + delayFor, generateTurnId, resolveModelName, resolveReasoningEffort, @@ -342,12 +344,45 @@ export interface SessionOrchestratorBundle { readonly activeConversations: ReadonlySet<string>; } +/** + * The concrete retry strategy wired into every turn's `RunTurnInput.retry`. + * + * `delayFor` is the pure schedule (`5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, + * then repeat 30m until 8h cumulative scheduled sleep) — no I/O, no clock. + * `sleep` is the abortable I/O effect: a `setTimeout`-based promise that + * rejects when the turn's abort signal fires (so a retry in flight seals the + * turn `aborted`). The kernel imports no timer; this is the shell-provided I/O. + */ +export function createRetryStrategy(): RetryStrategy { + const sleep = (ms: number, signal: AbortSignal): Promise<void> => { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new Error("aborted")); + return; + } + const timer = setTimeout(() => { + signal.removeEventListener("abort", onAbort); + resolve(); + }, ms); + const onAbort = () => { + clearTimeout(timer); + reject(new Error("aborted")); + }; + signal.addEventListener("abort", onAbort, { once: true }); + }); + }; + return { delayFor, sleep }; +} + export function createSessionOrchestrator( deps: SessionOrchestratorDeps, ): SessionOrchestratorBundle { const activeConversations = new Set<string>(); const subscribers = new Map<string, Set<TurnEventListener>>(); const activeTurns = new Map<string, ActiveTurn>(); + // One stateless retry strategy shared by every turn (delayFor is pure; sleep + // is a stateless setTimeout closure). Wired into each RunTurnInput.retry. + const retryStrategy = createRetryStrategy(); function emitToHub(conversationId: string, event: AgentEvent): void { const turn = activeTurns.get(conversationId); @@ -640,6 +675,7 @@ export function createSessionOrchestrator( turnId, signal: controller.signal, providerOpts, + retry: retryStrategy, ...(turnLogger !== undefined ? { logger: turnLogger } : {}), ...(effectiveCwd !== undefined ? { cwd: effectiveCwd } : {}), ...(effectiveComputerId !== undefined ? { computerId: effectiveComputerId } : {}), diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts index 9e5d3c4..2cbe15f 100644 --- a/packages/session-orchestrator/src/pure.test.ts +++ b/packages/session-orchestrator/src/pure.test.ts @@ -2,8 +2,13 @@ import type { ProviderContract } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; import { buildUserMessage, + cumulativeSleepMs, defaultDispatchPolicy, + delayFor, generateTurnId, + RETRY_BUDGET_MS, + RETRY_SCHEDULE_MS, + RETRY_TAIL_MS, resolveReasoningEffort, selectFirstProvider, } from "./pure.js"; @@ -100,3 +105,59 @@ describe("resolveReasoningEffort", () => { expect(resolveReasoningEffort(undefined, "max")).toBe("max"); }); }); + +describe("retry backoff schedule (delayFor)", () => { + it("emits the stepped head: 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m", () => { + expect(delayFor(0)).toBe(5_000); + expect(delayFor(1)).toBe(10_000); + expect(delayFor(2)).toBe(30_000); + expect(delayFor(3)).toBe(60_000); + expect(delayFor(4)).toBe(300_000); + expect(delayFor(5)).toBe(600_000); + expect(delayFor(6)).toBe(900_000); + expect(delayFor(7)).toBe(1_800_000); + }); + + it("repeats 30m after the head", () => { + expect(delayFor(8)).toBe(RETRY_TAIL_MS); + expect(delayFor(9)).toBe(RETRY_TAIL_MS); + expect(delayFor(20)).toBe(RETRY_TAIL_MS); + }); + + it("gives up (returns undefined) once cumulative sleep exceeds 8h", () => { + // Head sums to 3,705,000 ms; +1,800,000 per extra step. 8h = 28,800,000. + // attempt 20 cumulative = 3,705,000 + 13*1,800,000 = 27,105,000 (< 8h) → retry. + expect(delayFor(20)).toBe(RETRY_TAIL_MS); + // attempt 21 cumulative = 27,105,000 + 1,800,000 = 28,905,000 (> 8h) → stop. + expect(delayFor(21)).toBeUndefined(); + }); + + it("cumulativeSleepMs matches the sum of the schedule", () => { + expect(cumulativeSleepMs(0)).toBe(5_000); + expect(cumulativeSleepMs(1)).toBe(15_000); + expect(cumulativeSleepMs(7)).toBe(RETRY_SCHEDULE_MS.reduce((a, b) => a + b, 0)); + // 8h budget is 28,800,000 ms. + expect(RETRY_BUDGET_MS).toBe(8 * 60 * 60 * 1000); + // The last retry (attempt 20) keeps cumulative under budget. + expect(cumulativeSleepMs(20)).toBeLessThanOrEqual(RETRY_BUDGET_MS); + // The next (attempt 21) exceeds it. + expect(cumulativeSleepMs(21)).toBeGreaterThan(RETRY_BUDGET_MS); + }); + + it("the full schedule has 21 retries then stops", () => { + const schedule: number[] = []; + let attempt = 0; + while (true) { + const delay = delayFor(attempt); + if (delay === undefined) break; + schedule.push(delay); + attempt++; + } + expect(schedule).toHaveLength(21); + expect(schedule[0]).toBe(5_000); + expect(schedule.at(-1)).toBe(RETRY_TAIL_MS); + // 8 stepped head + 13 tail repeats. + expect(schedule.slice(0, 8)).toEqual([...RETRY_SCHEDULE_MS]); + expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true); + }); +}); diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts index 9a31e17..a028cbe 100644 --- a/packages/session-orchestrator/src/pure.ts +++ b/packages/session-orchestrator/src/pure.ts @@ -9,6 +9,53 @@ export function buildUserMessage(text: string): ChatMessage { return { role: "user", chunks: [{ type: "text", text }] }; } +// ── Provider-error retry backoff schedule ─────────────────────────────────── +// +// Pure, deterministic delay decision (no I/O, no clock) for retrying retryable +// provider errors (HTTP 429 / 5xx "overloaded"). The concrete `sleep` (I/O) +// is wired in the orchestrator; this owns only the policy. + +/** + * Stepped backoff schedule (ms): 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m. + * After the head is exhausted, {@link RETRY_TAIL_MS} (30m) repeats. + */ +export const RETRY_SCHEDULE_MS = [ + 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000, +] as const; + +/** Tail delay (ms) repeated after the stepped head: 30 minutes. */ +export const RETRY_TAIL_MS = 1_800_000; + +/** Cumulative scheduled-sleep budget (ms) after which retrying gives up: 8h. */ +export const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; + +/** + * Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). + * Pure — no I/O, no clock. + */ +export function cumulativeSleepMs(attempt: number): number { + let sum = 0; + for (let i = 0; i <= attempt; i++) { + sum += i < RETRY_SCHEDULE_MS.length ? (RETRY_SCHEDULE_MS[i] ?? RETRY_TAIL_MS) : RETRY_TAIL_MS; + } + return sum; +} + +/** + * Pure, deterministic delay decision for the retry strategy: given the + * 0-based attempt index, return the delay in ms to sleep before the next + * retry, or `undefined` to stop (cumulative budget exhausted). No I/O, no + * clock — fully testable. Matches the plan's schedule: + * `5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, then repeat 30m until 8h of + * cumulative scheduled sleep is reached, then give up. + */ +export function delayFor(attempt: number): number | undefined { + const scheduled = RETRY_SCHEDULE_MS[attempt]; + const delay = scheduled !== undefined ? scheduled : RETRY_TAIL_MS; + if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop + return delay; +} + /** * Resolve the reasoning-effort level for a turn: * per-turn override → persisted per-conversation value → default `"high"`. diff --git a/packages/tool-edit-file/src/extension.ts b/packages/tool-edit-file/src/extension.ts index 2eaa0e9..9dbebda 100644 --- a/packages/tool-edit-file/src/extension.ts +++ b/packages/tool-edit-file/src/extension.ts @@ -41,7 +41,10 @@ export const extension: Extension = { filePath: opts.filePath, text: opts.text, cwd: opts.cwd, - timeoutMs: 60_000, + // 10s matches the LSP service's per-server cap (see packages/lsp). + // The service clamps this anyway; stated explicitly so the call + // site is honest about the effective live-diagnostics budget. + timeoutMs: 10_000, minSeverity: 2, // errors + warnings only }); }; diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index f6a95cf..8dc3a72 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -273,6 +273,7 @@ export type AgentEvent = | TurnUsageEvent | TurnStepCompleteEvent | TurnErrorEvent + | TurnProviderRetryEvent | TurnDoneEvent | TurnSealedEvent | TurnSteeringEvent; @@ -429,6 +430,31 @@ export interface TurnErrorEvent { readonly code?: string; } +/** + * A retryable provider error is being retried with backoff. Emitted once per + * scheduled retry, BEFORE the sleep, so the UI can show "⚠ Server overloaded — + * retrying in 5s…" immediately. TRANSIENT: emitted to the frontend but NOT + * persisted into the model's message history (it never pollutes the prompt). + * + * When the retry budget is exhausted, the existing `error` event is emitted and + * the turn seals — so the final failure is still a persisted error. `attempt` is + * 0-based (the Nth retry about to happen); `delayMs` is the scheduled sleep + * before that retry fires. + */ +export interface TurnProviderRetryEvent { + readonly type: "provider-retry"; + readonly conversationId: string; + readonly turnId: string; + /** 0-based: this is the Nth retry about to happen. */ + readonly attempt: number; + /** ms the client should expect to wait before the retry fires. */ + readonly delayMs: number; + /** The endpoint's error verbatim (e.g. "HTTP 429: {…overloaded_error…}"). */ + readonly message: string; + /** The HTTP code when known (e.g. "429"). */ + readonly code?: string; +} + /** The turn has completed (model finished generating). */ export interface TurnDoneEvent { readonly type: "done"; @@ -5,10 +5,36 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **1549 vitest** green. (worktree `feature/ssh-support`; -baseline re-verified after `bun install`.) - -## SSH support — transparent remote execution (IN PROGRESS) +`tsc -b` EXIT 0 · biome clean · **1730 vitest** pass (+6 sshd-integration skipped). (worktree `feature/ssh-support`; +merged `dev` — brings retry-with-backoff (`provider-retry` AgentEvent) + the LSP-dead-server fix alongside the +SSH waves below.) + +## Retry with backoff on retryable provider errors (DONE — from dev) +When the upstream LLM API returns a retryable error (HTTP 429 / 5xx "overloaded"), +the kernel now retries `provider.stream()` with a stepped backoff, visibly, until +the 8h cumulative-sleep budget is exhausted — then emits the final error and +seals the turn. Retries fire ONLY when no content was emitted yet this step (the +safety invariant — never duplicate partial output). Plan: +`notes/retry-with-backoff-plan.md`; report: `reports/retry-with-backoff.md`. +- **Architecture (kernel hook + shell policy/I/O):** kernel provides the hook + (`RetryStrategy` contract + the retry loop in `runTurn`); the shell + (session-orchestrator) provides the policy (the schedule) + the I/O (an + abortable `setTimeout` sleep). Kernel imports no timer. `retry?` is optional + → omit = no retry (backward-compatible). +- **New transient `AgentEvent` variant** `provider-retry` (`@dispatch/wire`), + emitted once per scheduled retry BEFORE the sleep so the UI can show + "⚠ retrying in Ns…" immediately; NOT persisted to model history (never + pollutes the prompt). Final failure is still a persisted `error` + seal. +- **Schedule:** `5s,10s,30s,60s,5m,10m,15m,30m`, then repeat 30m until 8h of + cumulative scheduled sleep → ~21 retries then give up. Pure `delayFor(attempt)`. +- **Retry trigger:** emitted `error` with `retryable===true` → retry; + `retryable` false/absent → give up; a THROWN error → retryable-by-default + ONLY when pre-content. All gated on `!hadContent` (text/reasoning/tool-call/usage). +- **Frontend handoff (5d3f, separate repo `../dispatch-web`):** render + `provider-retry` as a yellow warning system-message bubble showing `message` + (+`code`) with the `delayMs` countdown. + +## SSH support — transparent remote execution (DONE — waves 0-5c) Plan: `notes/ssh-support-plan.md` (decisions locked in §0.5/§13). Orchestrated in waves (ORCHESTRATOR.md §2a — pre-author the contract seam, then parallel owner-agents on disjoint packages). @@ -29,8 +55,6 @@ owner-agents on disjoint packages). + remote tool-drop filter: drops `lsp` + `__`-namespaced MCP tools when remote) + `transport-contract` (ChatRequest.computerId + computer endpoint API types). `tsc -b` EXIT 0, biome clean, **1620 vitest** (was 1599). - CR-1 (non-blocking): MCP filter doesn't preserve `computerId` on - ToolAssembly — fix folded into wave 4. - [x] **Wave 4** (parallel): `transport-http` (computer endpoints + `/chat` threading + the `ComputerService` seam the ssh package will provide) + `transport-ws` (computerId through chat.send/queue) + `mcp` (CR-1: preserve @@ -50,6 +74,9 @@ owner-agents on disjoint packages). computerServiceHandle. orchestrator added missing @dispatch/exec-backend dep to host-bin + bun install. **LIVE-VERIFIED**: server boots clean ("Dispatch booted", no disabled extensions). tsc -b EXIT 0, biome clean, 1690 vitest (+6 sshd skipped). +- [x] **Merge dev**: brought retry-with-backoff (`provider-retry` AgentEvent — what + the FE consumes) + LSP-dead-server fix into the SSH branch. All code files + auto-merged cleanly; only `tasks.md` conflicted (orchestrator-resolved). - [ ] **DEFERRED — CR-6 usageCount**: `listComputers()` returns `usageCount: 0` until a conversation-store count-by-alias helper + host-bin wiring is added (non-blocking — discovery/connect/execute all work; only the count badge shows 0). Follow-up. @@ -63,7 +90,6 @@ Key decisions: ssh2 + ssh-config (project-local deps); key-only auth from `~/.ssh/config` (no CRUD entity); computerId persisted per-conversation; LSP/MCP silently dropped on remote turns; edit_file works w/o diagnostics remotely. - ## Per-edit LSP diagnostics auto-append (DONE) After a successful `edit_file`, the extension now calls LSP `getDiagnostics` on the post-edit buffer and appends any errors/warnings (severity ≤ 2) to the tool result — |
