diff options
| author | Adam Malczewski <[email protected]> | 2026-06-25 18:36:08 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-25 18:36:08 +0900 |
| commit | de022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc (patch) | |
| tree | 041dcb1017e544a405526443cb578baa974bec0e /packages/lsp | |
| parent | fc1c3a54c3075990ec0dd0f97901bd46fe142923 (diff) | |
| parent | 649fc4f66f40f7743683546f81d3320e7394e597 (diff) | |
| download | dispatch-de022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc.tar.gz dispatch-de022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc.zip | |
Merge branch 'dev' into feature/ssh-support
Brings dev's retry-with-backoff (the transient `provider-retry` AgentEvent the
web frontend consumes) + the LSP-dead-server per-edit-hang fix into the SSH
feature branch, alongside the SSH waves 0-5c.
All code files auto-merged cleanly (run-turn.ts, orchestrator.ts, runtime.ts,
wire/index.ts, tool-edit-file/extension.ts, run-turn.test.ts — both computerId
threading and retry-with-backoff coexist). Only tasks.md conflicted (status
section — orchestrator-resolved; both feature sections kept).
Verified post-merge: tsc -b EXIT 0, biome clean (391 files), 1730 vitest pass
+6 sshd-integration skipped (was 1690; +40 from dev's retry/LSP tests).
Wire dist rebuilt so the FE can re-sync the pinned @dispatch/wire dep and pick
up BOTH provider-retry AND the SSH Computer/defaultComputerId types.
No merge or push (into dev or otherwise).
Diffstat (limited to 'packages/lsp')
| -rw-r--r-- | packages/lsp/src/aggregate.test.ts | 141 | ||||
| -rw-r--r-- | packages/lsp/src/aggregate.ts | 80 | ||||
| -rw-r--r-- | packages/lsp/src/client.test.ts | 146 | ||||
| -rw-r--r-- | packages/lsp/src/client.ts | 153 | ||||
| -rw-r--r-- | packages/lsp/src/diagnostics.ts | 2 | ||||
| -rw-r--r-- | packages/lsp/src/extension.ts | 54 | ||||
| -rw-r--r-- | packages/lsp/src/manager.test.ts | 88 | ||||
| -rw-r--r-- | packages/lsp/src/manager.ts | 13 | ||||
| -rw-r--r-- | packages/lsp/src/rpc.test.ts | 26 | ||||
| -rw-r--r-- | packages/lsp/src/rpc.ts | 29 | ||||
| -rw-r--r-- | packages/lsp/src/tool.test.ts | 97 | ||||
| -rw-r--r-- | packages/lsp/src/tool.ts | 43 |
12 files changed, 804 insertions, 68 deletions
diff --git a/packages/lsp/src/aggregate.test.ts b/packages/lsp/src/aggregate.test.ts new file mode 100644 index 0000000..4579a0a --- /dev/null +++ b/packages/lsp/src/aggregate.test.ts @@ -0,0 +1,141 @@ +import { describe, expect, it } from "vitest"; +import { type AggregateServer, aggregateDiagnostics } from "./aggregate.js"; +import type { LanguageServerClient } from "./client.js"; + +/** + * A minimal fake client: only `waitForDiagnostics` is exercised by + * aggregateDiagnostics, so we stub just that. Cast to the real type (mirrors + * tool.test.ts) — no real process, no internal mocks of our own modules. + */ +function fakeClient( + waitForDiagnostics: LanguageServerClient["waitForDiagnostics"], +): LanguageServerClient { + return { waitForDiagnostics } as unknown as LanguageServerClient; +} + +const SERVER_A: AggregateServer = { id: "a", name: "Ruby-LSP", root: "/p" }; +const SERVER_B: AggregateServer = { id: "b", name: "Steep", root: "/p" }; + +describe("aggregateDiagnostics", () => { + it("returns merged diagnostics from all responding servers, tagged by source", async () => { + const clients = new Map<string, LanguageServerClient>([ + [ + "a", + fakeClient(async () => ({ formatted: "ERROR L1:1: boom", slow: false, timedOut: false })), + ], + [ + "b", + fakeClient(async () => ({ formatted: "WARNING L2:3: meh", slow: false, timedOut: false })), + ], + ]); + + const result = await aggregateDiagnostics( + (id) => clients.get(id), + [SERVER_A, SERVER_B], + "/p/x.rb", + 10_000, + {}, + ); + + expect(result.timedOut).toBe(false); + expect(result.formatted).toContain("[Ruby-LSP]"); + expect(result.formatted).toContain("boom"); + expect(result.formatted).toContain("[Steep]"); + expect(result.formatted).toContain("meh"); + }); + + it("skips a server that times out with a raise-to-user notice, and still returns the fast server's result", async () => { + // Steep never resolves within the cap → timedOut; ruby-lsp answers fast. + const clients = new Map<string, LanguageServerClient>([ + ["a", fakeClient(async () => ({ formatted: "", slow: false, timedOut: false }))], + ["b", fakeClient(async () => ({ formatted: "", slow: false, timedOut: true }))], + ]); + + const result = await aggregateDiagnostics( + (id) => clients.get(id), + [SERVER_A, SERVER_B], + "/p/x.rb", + 10_000, + {}, + ); + + expect(result.timedOut).toBe(true); + // The skip notice names the offending server and the cap. + expect(result.formatted).toContain("[Steep]"); + expect(result.formatted).toContain("took too long"); + expect(result.formatted).toContain(">10s"); + expect(result.formatted).toContain("raise this to the user"); + // ruby-lsp answered cleanly (empty diagnostics) → no line for it. + expect(result.formatted).not.toContain("[Ruby-LSP]"); + }); + + it("runs servers concurrently: a slow server does not delay a fast one's contribution order", async () => { + const callOrder: string[] = []; + const clients = new Map<string, LanguageServerClient>([ + [ + "a", + fakeClient(async () => { + callOrder.push("a-start"); + await new Promise((r) => setTimeout(r, 5)); + callOrder.push("a-end"); + return { formatted: "from-a", slow: false, timedOut: false }; + }), + ], + [ + "b", + fakeClient(async () => { + callOrder.push("b-start"); + await new Promise((r) => setTimeout(r, 30)); + callOrder.push("b-end"); + return { formatted: "from-b", slow: false, timedOut: false }; + }), + ], + ]); + + const result = await aggregateDiagnostics( + (id) => clients.get(id), + [SERVER_A, SERVER_B], + "/p/x.rb", + 10_000, + {}, + ); + + // Both started before either ended → concurrent, not sequential. + expect(callOrder.slice(0, 2).sort()).toEqual(["a-start", "b-start"]); + expect(result.formatted).toContain("from-a"); + expect(result.formatted).toContain("from-b"); + }); + + it("a missing client (dead/excluded) contributes nothing and never rejects", async () => { + const result = await aggregateDiagnostics(() => undefined, [SERVER_A], "/p/x.rb", 10_000, {}); + expect(result.formatted).toBe(""); + expect(result.timedOut).toBe(false); + }); + + it("forwards text + minSeverity to each client's waitForDiagnostics", async () => { + const seen: Array<{ text?: string; minSeverity?: number; timeoutMs: number }> = []; + const clients = new Map<string, LanguageServerClient>([ + [ + "a", + fakeClient(async (_path, opts) => { + seen.push({ + text: opts?.text, + minSeverity: opts?.minSeverity, + timeoutMs: opts?.timeoutMs ?? -1, + }); + return { formatted: "", slow: false, timedOut: false }; + }), + ], + ]); + + await aggregateDiagnostics((id) => clients.get(id), [SERVER_A], "/p/x.rb", 7000, { + text: "post-edit buffer", + minSeverity: 2, + }); + + expect(seen).toHaveLength(1); + expect(seen[0]?.text).toBe("post-edit buffer"); + expect(seen[0]?.minSeverity).toBe(2); + expect(seen[0]?.timeoutMs).toBe(7000); + }); +}); diff --git a/packages/lsp/src/aggregate.ts b/packages/lsp/src/aggregate.ts new file mode 100644 index 0000000..b044e21 --- /dev/null +++ b/packages/lsp/src/aggregate.ts @@ -0,0 +1,80 @@ +/** + * Concurrent multi-server diagnostics aggregation. + * + * Queries every matching language server AT ONCE (not one-at-a-time), each + * capped at `timeoutMs`. A server that doesn't push diagnostics within the cap + * is SKIPPED with a per-server notice rather than blocking the others — so one + * slow/dead server (e.g. a corrupted Steep) can't hold up the fast one's + * (ruby-lsp) results for the full timeout on every edit. + * + * The only I/O here is `client.waitForDiagnostics` (injected via `getClient`), + * so this is unit-testable with a fake client and no real process. + */ + +import type { LanguageServerClient } from "./client.js"; + +export interface AggregateServer { + readonly id: string; + readonly name: string; + readonly root: string; +} + +export interface AggregateOpts { + /** Post-edit buffer; when omitted the server reads from disk. */ + readonly text?: string | undefined; + /** Only include diagnostics with severity ≤ this (1=Error, 2=Warning). */ + readonly minSeverity?: number | undefined; +} + +export interface AggregateResult { + /** Merged diagnostics tagged by source + a per-skipped-server notice. */ + readonly formatted: string; + /** True if at least one server was skipped for exceeding the cap. */ + readonly timedOut: boolean; +} + +/** + * Query `servers` concurrently, each capped at `timeoutMs`. Returns merged + * diagnostics tagged by source (`[name]\n…`) and, for any server that did not + * respond in time, a `⚠️ [name] LSP took too long (>Ns), skipped — please raise + * this to the user.` notice. Never rejects: a client error yields an empty + * contribution for that server. + */ +export async function aggregateDiagnostics( + getClient: (id: string, root: string) => LanguageServerClient | undefined, + servers: readonly AggregateServer[], + absolutePath: string, + timeoutMs: number, + opts: AggregateOpts, +): Promise<AggregateResult> { + const entries = await Promise.all( + servers.map(async (server) => { + const client = getClient(server.id, server.root); + if (!client) return null; + const waitOpts: { text?: string; timeoutMs: number; minSeverity?: number } = { timeoutMs }; + if (opts.text !== undefined) waitOpts.text = opts.text; + if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity; + const result = await client.waitForDiagnostics(absolutePath, waitOpts); + return { server, result }; + }), + ); + + const parts: string[] = []; + let timedOut = false; + const capSeconds = Math.round(timeoutMs / 1000); + + for (const entry of entries) { + if (!entry) continue; + const { server, result } = entry; + if (result.timedOut) { + timedOut = true; + parts.push( + `⚠️ [${server.name}] LSP took too long (>${capSeconds}s), diagnostics skipped — please raise this to the user.`, + ); + } else if (result.formatted) { + parts.push(`[${server.name}]\n${result.formatted}`); + } + } + + return { formatted: parts.join("\n\n"), timedOut }; +} diff --git a/packages/lsp/src/client.test.ts b/packages/lsp/src/client.test.ts index 681860f..338ef0b 100644 --- a/packages/lsp/src/client.test.ts +++ b/packages/lsp/src/client.test.ts @@ -3,6 +3,7 @@ import { type FileWatcher, type FsAccess, LanguageServerClient, + type ProcessExitHandler, type SpawnProcess, } from "./client.js"; import { encode } from "./framing.js"; @@ -288,4 +289,149 @@ describe("client", () => { client.shutdown(); expect(state.killed).toBe(true); }); + + it("onExit marks the client broken (error) so callers stop querying a corpse", async () => { + const state = { killed: false }; + let exitCb: ProcessExitHandler | null = null; + const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null }; + + const spawnWithExit: SpawnProcess = () => ({ + stdin: { write: () => {} }, + stdout: { + on: (_event: string, cb: (data: Uint8Array) => void) => { + stdoutHolder.cb = cb; + }, + }, + pid: 999, + kill: () => { + state.killed = true; + }, + onExit: (handler) => { + exitCb = handler; + }, + }); + + const { client } = makeClient({ spawn: spawnWithExit }); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + stdoutHolder.cb?.( + encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })), + ); + await startPromise; + expect(client.getState()).toBe("connected"); + + // Simulate the process dying (user kill / crash). + exitCb?.({ code: 1 }); + + expect(client.getState()).toBe("error"); + expect(client.getStateError()).toMatch(/process exited/); + // The (still-alive-in-test) process was killed to avoid a zombie. + expect(state.killed).toBe(true); + }); + + it("a dead client is skipped by waitForDiagnostics callers (state !== connected)", async () => { + // Build a client, connect, kill via onExit, then assert a diagnostics + // query would not block: getState() is "error" so the matching filter + // (state === "connected") excludes it. We assert the state guard. + const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null }; + let exitCb: ProcessExitHandler | null = null; + const spawnWithExit: SpawnProcess = () => ({ + stdin: { write: () => {} }, + stdout: { + on: (_e, cb) => { + stdoutHolder.cb = cb; + }, + }, + pid: 1, + kill: () => {}, + onExit: (handler) => { + exitCb = handler; + }, + }); + + const { client } = makeClient({ spawn: spawnWithExit }); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + stdoutHolder.cb?.( + encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })), + ); + await startPromise; + + exitCb?.({ code: null }); + expect(client.getState()).toBe("error"); + // The aggregate / getDiagnostics matching filter requires "connected". + expect(client.getState() === "connected").toBe(false); + }); + + it("corruption detector marks the client broken after repeated identical diagnostics despite text changes", async () => { + // A healthy server would change diagnostics as the file changes; a + // corrupted one re-emits the SAME non-empty set. Drive 5 edits with + // different text but identical diagnostics → client flips to error. + const { client, serverResponses } = makeClient(); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })); + await startPromise; + + const phantom = JSON.stringify({ + jsonrpc: "2.0", + method: "textDocument/publishDiagnostics", + params: { + uri: "file:///project/game.rb", + diagnostics: [ + { + range: { start: { line: 0, character: 27 }, end: { line: 0, character: 28 } }, + severity: 1, + message: "SyntaxError: unexpected token", + }, + ], + }, + }); + + const path = "/project/game.rb"; + // The first call establishes the baseline snapshot (no increment). + // Each subsequent call with identical diagnostics + changed text + // increments; the 6th call (5th increment) trips the threshold. + for (let i = 1; i <= 5; i++) { + const p = client.waitForDiagnostics(path, { text: `buf-v${i}`, timeoutMs: 2000 }); + // Push the identical phantom diagnostics so the poll resolves. + await new Promise((r) => setTimeout(r, 30)); + serverResponses(phantom); + await p; + expect(client.getState()).toBe("connected"); + } + // 6th identical-across-changed-text repeat trips the threshold. + const p6 = client.waitForDiagnostics(path, { text: "buf-v6", timeoutMs: 2000 }); + await new Promise((r) => setTimeout(r, 30)); + serverResponses(phantom); + await p6; + + expect(client.getState()).toBe("error"); + expect(client.getStateError()).toMatch(/repeated stale diagnostics/i); + }); + + it("corruption detector does NOT trip on a clean file (empty diagnostics stay identical)", async () => { + const { client, serverResponses } = makeClient(); + const startPromise = client.start(); + await new Promise((r) => setTimeout(r, 50)); + serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })); + await startPromise; + + const clean = JSON.stringify({ + jsonrpc: "2.0", + method: "textDocument/publishDiagnostics", + params: { uri: "file:///project/game.rb", diagnostics: [] }, + }); + const path = "/project/game.rb"; + + for (let i = 1; i <= 6; i++) { + const p = client.waitForDiagnostics(path, { text: `clean-v${i}`, timeoutMs: 2000 }); + await new Promise((r) => setTimeout(r, 30)); + serverResponses(clean); + await p; + } + // Empty diagnostics never count as "stale" — a clean file staying clean + // is normal, not corruption. + expect(client.getState()).toBe("connected"); + }); }); diff --git a/packages/lsp/src/client.ts b/packages/lsp/src/client.ts index 677a22a..ac7d025 100644 --- a/packages/lsp/src/client.ts +++ b/packages/lsp/src/client.ts @@ -4,13 +4,22 @@ * FileWatcher, and forwards matching disk changes. */ -import { DiagnosticsStore, type PublishDiagnosticsParams } from "./diagnostics.js"; +import { DiagnosticsStore, diagnosticKey, type PublishDiagnosticsParams } from "./diagnostics.js"; import { computeChangeRange } from "./diff.js"; import { FrameDecoder } from "./framing.js"; import { languageId as resolveLanguageId } from "./language.js"; import { JsonRpcConnection, type WriteFn } from "./rpc.js"; import { FileChangeType, WatchedFilesRegistry } from "./watched-files.js"; +/** Info delivered to an `onExit` handler when the child process terminates. */ +export interface ProcessExitInfo { + readonly code: number | null; + readonly signal?: string; +} + +/** A handler registered to be called when the child process exits. */ +export type ProcessExitHandler = (info: ProcessExitInfo) => void; + export interface SpawnedProcess { readonly stdin: { readonly write: (bytes: Uint8Array) => void }; readonly stdout: @@ -22,6 +31,13 @@ export interface SpawnedProcess { | undefined; readonly pid: number | undefined; readonly kill: () => void; + /** + * Register a handler fired when the child process exits (code|signal). + * Optional: when absent, death is detected via stdout-end instead. Wires + * Bun's `proc.exited` in production; tests invoke it directly to simulate + * a crash. Lets the client stop querying a dead server (no per-edit hang). + */ + readonly onExit?: ((handler: ProcessExitHandler) => void) | undefined; } export type SpawnProcess = ( @@ -102,6 +118,18 @@ export class LanguageServerClient { private openDocuments = new Map<string, { version: number; text: string }>(); /** Sync mode captured from the server's initialize capabilities: 1=Full, 2=Incremental. */ private textDocumentChange: 1 | 2 = 1; + /** + * Corruption detection: the last diagnostic-key set + synced text per URI. + * A healthy server's diagnostics change when the file changes; a corrupted + * one (e.g. Steep's ~3h phantom-SyntaxError drift) re-emits the identical + * non-empty set across edits. `staleRepeat` counts consecutive such repeats + * across URIs; at the threshold the client is marked broken (→ respawn). + */ + private lastDiagSnapshot = new Map<string, { keys: Set<string>; text: string }>(); + private staleRepeat = 0; + private static readonly STALE_REPEAT_THRESHOLD = 5; + /** Default timeout for outbound requests (hover/definition/references). */ + private static readonly REQUEST_TIMEOUT_MS = 10_000; constructor(deps: ClientDeps) { this.deps = deps; @@ -128,6 +156,12 @@ export class LanguageServerClient { } const proc = this.deps.spawn(this.deps.command as string[], spawnOpts); this.process = proc; + // Detect process death so we stop querying a corpse (fixes the + // per-edit hang after a server is killed/crashes). onExit is the + // primary signal; stdout-end is the defence-in-depth fallback. + if (proc.onExit) { + proc.onExit((info) => this.handleExit(info)); + } const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes); const rpc = new JsonRpcConnection(writeFn); @@ -158,8 +192,11 @@ export class LanguageServerClient { for await (const chunk of source) { this.handleBytes(chunk); } + // stdout closed — the process is gone (defence-in-depth alongside onExit, + // which some edges never call). Idempotent via handleExit's guard. + this.handleExit({ code: null }); } catch { - // process exited + this.handleExit({ code: null }); } })(); } @@ -172,6 +209,65 @@ export class LanguageServerClient { }); } + /** + * The server process exited (onExit or stdout-end). Transition to a broken + * state so callers skip it and the manager re-spawns after backoff — instead + * of polling a corpse for the full timeout on every edit. Idempotent. + */ + private handleExit(info: ProcessExitInfo): void { + if (this.state === "error" || this.state === "not-started") return; + const detail = info.signal !== undefined ? `signal ${info.signal}` : `code ${info.code ?? "?"}`; + this.markBroken(`language server process exited (${detail})`); + } + + /** + * Mark this client permanently broken: kill the process if still alive + * (corruption case), dispose the rpc (rejects pending requests), and drop + * edge handles. The manager's status() observes state:"error" and re-spawns + * after the bounded backoff. Called on process death AND on corruption. + */ + private markBroken(reason: string): void { + if (this.state === "error") return; + this.state = "error"; + this.stateError = reason; + this.fileWatcherHandle?.close(); + this.fileWatcherHandle = null; + this.process?.kill(); + this.process = null; + this.rpc?.dispose(); + this.rpc = null; + } + + /** + * Detect a server stuck re-emitting identical non-empty diagnostics + * despite the file content changing between calls — the signature of a + * corrupted parse/type-check state (e.g. Steep's ~3h phantom-SyntaxError + * drift, where a fresh CLI reports green on the same project). After + * STALE_REPEAT_THRESHOLD consecutive such repeats, mark the client broken + * so it is skipped + re-spawned. A clean file (empty diagnostics) or a + * genuinely changing diagnostic set resets the counter. Note the + * tradeoff: a real, unfixed error on an untouched line also "stays the + * same across edits", so this can false-positive on a healthy server — + * the threshold is set conservatively and the CLI type-check gate remains + * authoritative either way. + */ + private detectStaleDiagnostics(uri: string, text: string): void { + const merged = this.diagnostics.getMerged(uri); + const keys = new Set(merged.map((d) => diagnosticKey(d))); + const prev = this.lastDiagSnapshot.get(uri); + if (prev && keys.size > 0 && setsEqual(keys, prev.keys) && text !== prev.text) { + this.staleRepeat++; + } else { + this.staleRepeat = 0; + } + this.lastDiagSnapshot.set(uri, { keys, text }); + if (this.staleRepeat >= LanguageServerClient.STALE_REPEAT_THRESHOLD) { + this.markBroken( + "language server emitting repeated stale diagnostics despite file changes — likely corrupted; restarting", + ); + } + } + private handleBytes(chunk: Uint8Array): void { const messages = this.decoder.decode(chunk); for (const msg of messages) { @@ -403,26 +499,37 @@ export class LanguageServerClient { await this.open(filePath); } - const slowThreshold = 10_000; const start = Date.now(); - // Poll until the server pushes diagnostics (even empty = done) or timeout. - return new Promise((resolve) => { + // Poll until the server pushes diagnostics (even empty = done) or the + // per-server cap elapses (then we skip it — see aggregateDiagnostics). + const received = await new Promise<boolean>((resolve) => { const check = () => { const elapsed = Date.now() - start; - const received = this.diagnostics.hasReceivedPush(uri); - if (received || elapsed >= timeoutMs) { - resolve({ - formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity), - slow: elapsed > slowThreshold, - timedOut: !received, - }); + const got = this.diagnostics.hasReceivedPush(uri); + if (got || elapsed >= timeoutMs) { + resolve(got); return; } setTimeout(check, 100); }; check(); }); + + // Only a server that actually pushed can be corruption-checked. + if (received) { + this.detectStaleDiagnostics(uri, opts?.text ?? ""); + } + + // `slow` is structurally false now: the per-server cap is 10s, so + // elapsed can never exceed the old "unusually long" threshold. That + // warning is superseded by the timeout→skip notice produced in + // aggregateDiagnostics. The field is kept for contract compatibility. + return { + formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity), + slow: false, + timedOut: !received, + }; } getWatchedFilesRegistry(): WatchedFilesRegistry { @@ -433,11 +540,21 @@ export class LanguageServerClient { return this.diagnostics; } - async request(method: string, params?: unknown): Promise<unknown> { + /** + * Send a request (hover/definition/references/documentSymbol). Capped at + * REQUEST_TIMEOUT_MS so a dead/slow server can't hang the turn — the + * initialize handshake bypasses this (it calls rpc.sendRequest directly + * with its own 45s race). + */ + async request( + method: string, + params?: unknown, + timeoutMs: number = LanguageServerClient.REQUEST_TIMEOUT_MS, + ): Promise<unknown> { if (!this.rpc || this.state !== "connected") { throw new Error("Client not connected"); } - return this.rpc.sendRequest(method, params); + return this.rpc.sendRequest(method, params, timeoutMs); } shutdown(): void { @@ -450,3 +567,11 @@ export class LanguageServerClient { this.state = "not-started"; } } + +function setsEqual<T>(a: Set<T>, b: Set<T>): boolean { + if (a.size !== b.size) return false; + for (const v of a) { + if (!b.has(v)) return false; + } + return true; +} diff --git a/packages/lsp/src/diagnostics.ts b/packages/lsp/src/diagnostics.ts index bc7ac0a..50beca9 100644 --- a/packages/lsp/src/diagnostics.ts +++ b/packages/lsp/src/diagnostics.ts @@ -91,7 +91,7 @@ export class DiagnosticsStore { } } -function diagnosticKey(d: Diagnostic): string { +export function diagnosticKey(d: Diagnostic): string { const r = d.range; return `${r.start.line}:${r.start.character}-${r.end.line}:${r.end.character}:${d.severity ?? 0}:${d.message}`; } diff --git a/packages/lsp/src/extension.ts b/packages/lsp/src/extension.ts index 8e3178a..c0fee44 100644 --- a/packages/lsp/src/extension.ts +++ b/packages/lsp/src/extension.ts @@ -8,6 +8,7 @@ import { extname, join } from "node:path"; import type { Extension, HostAPI, ServiceHandle } from "@dispatch/kernel"; import { defineService } from "@dispatch/kernel"; +import { aggregateDiagnostics } from "./aggregate.js"; import type { SpawnedProcess } from "./client.js"; import { LspManager } from "./manager.js"; import { createLspTool } from "./tool.js"; @@ -43,6 +44,14 @@ function realSpawn( stderr: proc.stderr, pid: proc.pid, kill: () => proc.kill(), + // Surface process exit so the client can stop querying a dead server + // and self-heal (respawn). Bun's Subprocess.exited resolves with the + // exit code (or rejects if killed by signal — treat as code:null). + onExit: (handler) => { + (proc as { exited: Promise<number | null> }).exited + .then((code) => handler({ code })) + .catch(() => handler({ code: null })); + }, }; } @@ -108,14 +117,20 @@ export const extension: Extension = { return manager.status(cwd); }, async getDiagnostics(opts: GetDiagnosticsOpts): Promise<DiagnosticsResult> { - const timeoutMs = opts.timeoutMs ?? 60_000; - const slowThreshold = 10_000; + // 10s hard ceiling per server, regardless of what the caller + // passes (the edit hook still passes 60_000 — clamped here, so + // no other-unit edit is needed). A server that doesn't respond + // in 10s is skipped with a notice instead of waited out. + const PER_SERVER_CAP_MS = 10_000; + const timeoutMs = Math.min(opts.timeoutMs ?? PER_SERVER_CAP_MS, PER_SERVER_CAP_MS); const fileExt = extname(opts.filePath).toLowerCase(); const absolutePath = opts.filePath.startsWith("/") ? opts.filePath : join(opts.cwd, opts.filePath); // Get all connected servers matching this file's extension. + // A dead/corrupted server has state:"error" and is excluded — + // no per-edit hang on a corpse. const statuses = await manager.status(opts.cwd); const matching = statuses.filter( (s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt), @@ -125,34 +140,15 @@ export const extension: Extension = { return { formatted: "", slow: false, timedOut: false }; } - const parts: string[] = []; - let anySlow = false; - let anyTimedOut = false; - const start = Date.now(); - - for (const s of matching) { - const client = manager.getClient(s.id, s.root); - if (!client) continue; - const waitOpts: { text?: string; timeoutMs?: number; minSeverity?: number } = { - timeoutMs, - }; - if (opts.text !== undefined) waitOpts.text = opts.text; - if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity; - const result = await client.waitForDiagnostics(absolutePath, waitOpts); - if (result.slow) anySlow = true; - if (result.timedOut) anyTimedOut = true; - if (result.formatted) { - parts.push(`[${s.name}]\n${result.formatted}`); - } - } - - const elapsed = Date.now() - start; + const agg = await aggregateDiagnostics( + (id, root) => manager.getClient(id, root), + matching, + absolutePath, + timeoutMs, + { text: opts.text, minSeverity: opts.minSeverity }, + ); - return { - formatted: parts.join("\n\n"), - slow: anySlow || elapsed > slowThreshold, - timedOut: anyTimedOut, - }; + return { formatted: agg.formatted, slow: false, timedOut: agg.timedOut }; }, }; host.provideService(lspServiceHandle, service); diff --git a/packages/lsp/src/manager.test.ts b/packages/lsp/src/manager.test.ts index 1649111..d8cba4f 100644 --- a/packages/lsp/src/manager.test.ts +++ b/packages/lsp/src/manager.test.ts @@ -361,4 +361,92 @@ describe("manager", () => { expect(s[0]?.error).toContain("[from .dispatch/lsp.json]"); expect(s[0]?.error).toContain("spawn failed"); }); + + it("a client that dies after connecting is skipped + re-spawned after backoff (no storm, no eternal hang)", async () => { + // A spawn that completes the initialize handshake AND lets the test + // simulate process death via the captured onExit handler. + const exitHandlers: Array<(info: { code: number | null; signal?: string }) => void> = []; + let spawnCount = 0; + const spawn: SpawnProcess = () => { + spawnCount++; + let messageHandler: ((data: Uint8Array) => void) | null = null; + const proc: SpawnedProcess = { + stdin: { + write: (bytes: Uint8Array) => { + const decoded = new TextDecoder().decode(bytes); + const headerEnd = decoded.indexOf("\r\n\r\n"); + if (headerEnd === -1) return; + const json = decoded.slice(headerEnd + 4); + try { + const msg = JSON.parse(json); + if (msg.method === "initialize") { + setTimeout(() => { + const response = JSON.stringify({ + jsonrpc: "2.0", + id: msg.id, + result: { capabilities: {} }, + }); + messageHandler?.(encode(response)); + }, 1); + } + } catch { + // ignore + } + }, + }, + stdout: { + on: (_event: string, cb: (data: Uint8Array) => void) => { + messageHandler = cb; + }, + }, + pid: 1000 + spawnCount, + kill: () => {}, + onExit: (handler) => { + exitHandlers.push(handler); + }, + }; + return proc; + }; + + const clock = { now: 0 }; + const manager = new LspManager({ + spawn, + fileWatcher: noopFileWatcher(), + fs: fakeFs({ + "/project/.dispatch/lsp.json": JSON.stringify({ + servers: { + steep: { + command: ["steep", "--stdio"], + extensions: [".rb"], + rootMarkers: [], + }, + }, + }), + }), + now: () => clock.now, + }); + + // 1) Connects. + const s1 = await manager.status("/project"); + expect(s1[0]?.state).toBe("connected"); + expect(spawnCount).toBe(1); + + // 2) Simulate the process dying (user kill / crash) via onExit. + exitHandlers[0]?.({ code: 1 }); + const clientAfterDeath = manager.getClient("steep", "/project"); + expect(clientAfterDeath?.getState()).toBe("error"); + + // 3) status() now reports error (and seeds a broken entry for backoff). + // Backoff not elapsed yet (clock frozen at 0) → NOT re-spawned. + const s2 = await manager.status("/project"); + expect(s2[0]?.state).toBe("error"); + expect(s2[0]?.error).toMatch(/process exited/); + expect(spawnCount).toBe(1); // no retry storm before backoff + + // 4) After the backoff elapses, status() re-spawns a fresh server. + clock.now = 31_000; + const s3 = await manager.status("/project"); + expect(s3[0]?.state).toBe("connected"); + expect(spawnCount).toBe(2); // re-spawned exactly once + }); }); diff --git a/packages/lsp/src/manager.ts b/packages/lsp/src/manager.ts index 7153956..bc84479 100644 --- a/packages/lsp/src/manager.ts +++ b/packages/lsp/src/manager.ts @@ -134,6 +134,19 @@ export class LspManager { if (existing) { const state = existing.client.getState(); const stateError = existing.client.getStateError(); + // A client that died or corrupted AFTER connecting flipped its + // own state to "error" (client.ts handleExit/markBroken). Spawn + // succeeded so there's no broken entry yet — seed one so the + // bounded-backoff path above re-spawns it, instead of reporting + // error forever (and so getDiagnostics' "connected" filter skips + // it, avoiding a per-edit hang on the corpse). + if (state === "error" && !this.broken.has(key)) { + this.broken.set(key, { + configFingerprint: configFingerprint(server), + brokenAt: this.now(), + error: enrichError(server, stateError ?? "server unavailable"), + }); + } const status: LspServerStatus = { id: server.id, name: server.name, diff --git a/packages/lsp/src/rpc.test.ts b/packages/lsp/src/rpc.test.ts index 05ce924..7b22ec5 100644 --- a/packages/lsp/src/rpc.test.ts +++ b/packages/lsp/src/rpc.test.ts @@ -84,3 +84,29 @@ it("handleMessage does not throw on malformed JSON", async () => { await expect(conn.handleMessage("")).resolves.toBeUndefined(); await expect(conn.handleMessage("not json at all")).resolves.toBeUndefined(); }); + +describe("sendRequest timeout", () => { + it("rejects with a timeout error when no response arrives within timeoutMs", async () => { + const { conn } = makeConnection(); + const promise = conn.sendRequest("textDocument/hover", {}, 50); + await expect(promise).rejects.toThrow(/LSP request timed out after 50ms: textDocument\/hover/); + }); + + it("clears the timer on a normal response (no unhandled rejection)", async () => { + const { conn } = makeConnection(); + const promise = conn.sendRequest("textDocument/hover", {}, 5000); + conn.handleMessage(frameResponse(1, { ok: true })); + await expect(promise).resolves.toEqual({ ok: true }); + // Give the (now-cleared) timer window ample time to prove it never fires. + await new Promise((r) => setTimeout(r, 80)); + }); + + it("does not time out when no timeoutMs is given (initialize handshake path)", async () => { + const { conn } = makeConnection(); + const promise = conn.sendRequest("initialize", {}); + // A late response well past any plausible default still resolves. + await new Promise((r) => setTimeout(r, 60)); + conn.handleMessage(frameResponse(1, { capabilities: {} })); + await expect(promise).resolves.toEqual({ capabilities: {} }); + }); +}); diff --git a/packages/lsp/src/rpc.ts b/packages/lsp/src/rpc.ts index 6b82624..95157de 100644 --- a/packages/lsp/src/rpc.ts +++ b/packages/lsp/src/rpc.ts @@ -39,11 +39,36 @@ export class JsonRpcConnection { this.write = write; } - sendRequest(method: string, params?: unknown): Promise<unknown> { + /** + * Send a request and await the correlated response. If `timeoutMs` is given, + * the promise rejects with a timeout error after that long — so a dead/slow + * server can't hang the caller forever (hover/definition/references). + * No `timeoutMs` = wait indefinitely (used by the initialize handshake, which + * has its own 45s race). + */ + sendRequest(method: string, params?: unknown, timeoutMs?: number): Promise<unknown> { const id = this.nextId++; const msg: JsonRpcMessage = { jsonrpc: "2.0", id, method, params }; return new Promise((resolve, reject) => { - this.pending.set(id, { resolve, reject }); + let timer: ReturnType<typeof setTimeout> | undefined; + // Wrap resolve/reject so the timer is cleared on a normal response + // (or on dispose) — no dangling timer after completion. + const finish = (fn: () => void): void => { + if (timer) clearTimeout(timer); + fn(); + }; + const entry: PendingRequest = { + resolve: (value: unknown) => finish(() => resolve(value)), + reject: (reason: unknown) => finish(() => reject(reason)), + }; + if (timeoutMs !== undefined) { + timer = setTimeout(() => { + if (this.pending.delete(id)) { + reject(new Error(`LSP request timed out after ${timeoutMs}ms: ${method}`)); + } + }, timeoutMs); + } + this.pending.set(id, entry); this.sendMessage(msg); }); } diff --git a/packages/lsp/src/tool.test.ts b/packages/lsp/src/tool.test.ts index 03787ae..efd1514 100644 --- a/packages/lsp/src/tool.test.ts +++ b/packages/lsp/src/tool.test.ts @@ -176,4 +176,101 @@ describe("tool", () => { expect(result.isError).toBe(true); expect(result.content).toContain("requires both"); }); + + it("diagnostics op: a server that times out is skipped with a raise-to-user notice", async () => { + const mockClient = { + getState: () => "connected" as const, + getStateError: () => undefined, + request: async () => null, + waitForDiagnostics: async () => ({ formatted: "", slow: false, timedOut: true }), + }; + + const tool = createLspTool( + stubManager({ + status: async () => [ + { + id: "steep", + name: "Steep", + root: "/project", + extensions: [".rb"], + state: "connected", + }, + ], + getClient: () => mockClient as never, + }), + ); + + const result = await tool.execute( + { operation: "diagnostics", path: "game.rb" }, + { + toolCallId: "test", + onOutput: () => {}, + signal: AbortSignal.timeout(5000), + log: { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + child: () => ({}) as never, + span: () => ({}) as never, + }, + cwd: "/project", + }, + ); + + expect(result.isError).not.toBe(true); + expect(result.content).toContain("[Steep]"); + expect(result.content).toContain("took too long"); + expect(result.content).toContain(">10s"); + expect(result.content).toContain("raise this to the user"); + }); + + it("diagnostics op: responding servers' diagnostics are merged, tagged by source", async () => { + const mockClient = { + getState: () => "connected" as const, + getStateError: () => undefined, + request: async () => null, + waitForDiagnostics: async () => ({ + formatted: "ERROR L1:1: boom", + slow: false, + timedOut: false, + }), + }; + + const tool = createLspTool( + stubManager({ + status: async () => [ + { + id: "steep", + name: "Steep", + root: "/project", + extensions: [".rb"], + state: "connected", + }, + ], + getClient: () => mockClient as never, + }), + ); + + const result = await tool.execute( + { operation: "diagnostics", path: "game.rb" }, + { + toolCallId: "test", + onOutput: () => {}, + signal: AbortSignal.timeout(5000), + log: { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + child: () => ({}) as never, + span: () => ({}) as never, + }, + cwd: "/project", + }, + ); + + expect(result.content).toContain("[Steep]"); + expect(result.content).toContain("boom"); + }); }); diff --git a/packages/lsp/src/tool.ts b/packages/lsp/src/tool.ts index 8d282ec..be0d269 100644 --- a/packages/lsp/src/tool.ts +++ b/packages/lsp/src/tool.ts @@ -6,6 +6,7 @@ import { extname, resolve } from "node:path"; import type { ToolContract, ToolExecuteContext, ToolResult } from "@dispatch/kernel"; +import { aggregateDiagnostics } from "./aggregate.js"; import type { LspManager } from "./manager.js"; type Operation = "diagnostics" | "hover" | "definition" | "references" | "documentSymbol"; @@ -157,6 +158,8 @@ export function createLspTool(manager: LspManager): ToolContract { switch (operation) { case "diagnostics": { + // 10s hard ceiling per server (same policy as the edit path). + const DIAGNOSTICS_TIMEOUT_MS = 10_000; // Query ALL connected servers whose extensions match this file. const matching = statuses.filter( (s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt), @@ -179,31 +182,27 @@ export function createLspTool(manager: LspManager): ToolContract { if (!client) { return { content: "Language server client not available.", isError: true }; } - const result = await client.waitForDiagnostics(absolutePath); + const result = await client.waitForDiagnostics(absolutePath, { + timeoutMs: DIAGNOSTICS_TIMEOUT_MS, + }); + if (result.timedOut) { + return { + content: `⚠️ [${connected.name}] LSP took too long (>10s), diagnostics skipped — please raise this to the user.`, + }; + } return { content: result.formatted || "No diagnostics found." }; } - // Query each matching server and merge results, tagged by source. - const parts: string[] = []; - let anyTimedOut = false; - for (const s of matching) { - const client = manager.getClient(s.id, s.root); - if (!client) continue; - const result = await client.waitForDiagnostics(absolutePath, { timeoutMs: 60_000 }); - if (result.timedOut) anyTimedOut = true; - if (result.slow) { - parts.push( - `⚠️ LSP is taking unusually long. If this happens more than once, raise it to the user.`, - ); - } - if (result.formatted) { - parts.push(`[${s.name}]\n${result.formatted}`); - } - } - if (anyTimedOut && parts.length === 0) { - parts.push("Diagnostics timed out (server may still be indexing)."); - } - return { content: parts.length > 0 ? parts.join("\n\n") : "No diagnostics found." }; + // Query matching servers concurrently, each capped at 10s; + // a non-responding server is skipped with a notice. + const agg = await aggregateDiagnostics( + (id, root) => manager.getClient(id, root), + matching, + absolutePath, + DIAGNOSTICS_TIMEOUT_MS, + {}, + ); + return { content: agg.formatted || "No diagnostics found." }; } case "hover": { const client = await getFirstMatchingClient(manager, statuses, fileExt); |
