summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--packages/lsp/src/aggregate.test.ts141
-rw-r--r--packages/lsp/src/aggregate.ts80
-rw-r--r--packages/lsp/src/client.test.ts146
-rw-r--r--packages/lsp/src/client.ts153
-rw-r--r--packages/lsp/src/diagnostics.ts2
-rw-r--r--packages/lsp/src/extension.ts54
-rw-r--r--packages/lsp/src/manager.test.ts88
-rw-r--r--packages/lsp/src/manager.ts13
-rw-r--r--packages/lsp/src/rpc.test.ts26
-rw-r--r--packages/lsp/src/rpc.ts29
-rw-r--r--packages/lsp/src/tool.test.ts97
-rw-r--r--packages/lsp/src/tool.ts43
-rw-r--r--packages/tool-edit-file/src/extension.ts5
13 files changed, 808 insertions, 69 deletions
diff --git a/packages/lsp/src/aggregate.test.ts b/packages/lsp/src/aggregate.test.ts
new file mode 100644
index 0000000..4579a0a
--- /dev/null
+++ b/packages/lsp/src/aggregate.test.ts
@@ -0,0 +1,141 @@
+import { describe, expect, it } from "vitest";
+import { type AggregateServer, aggregateDiagnostics } from "./aggregate.js";
+import type { LanguageServerClient } from "./client.js";
+
+/**
+ * A minimal fake client: only `waitForDiagnostics` is exercised by
+ * aggregateDiagnostics, so we stub just that. Cast to the real type (mirrors
+ * tool.test.ts) — no real process, no internal mocks of our own modules.
+ */
+function fakeClient(
+ waitForDiagnostics: LanguageServerClient["waitForDiagnostics"],
+): LanguageServerClient {
+ return { waitForDiagnostics } as unknown as LanguageServerClient;
+}
+
+const SERVER_A: AggregateServer = { id: "a", name: "Ruby-LSP", root: "/p" };
+const SERVER_B: AggregateServer = { id: "b", name: "Steep", root: "/p" };
+
+describe("aggregateDiagnostics", () => {
+ it("returns merged diagnostics from all responding servers, tagged by source", async () => {
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async () => ({ formatted: "ERROR L1:1: boom", slow: false, timedOut: false })),
+ ],
+ [
+ "b",
+ fakeClient(async () => ({ formatted: "WARNING L2:3: meh", slow: false, timedOut: false })),
+ ],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ expect(result.timedOut).toBe(false);
+ expect(result.formatted).toContain("[Ruby-LSP]");
+ expect(result.formatted).toContain("boom");
+ expect(result.formatted).toContain("[Steep]");
+ expect(result.formatted).toContain("meh");
+ });
+
+ it("skips a server that times out with a raise-to-user notice, and still returns the fast server's result", async () => {
+ // Steep never resolves within the cap → timedOut; ruby-lsp answers fast.
+ const clients = new Map<string, LanguageServerClient>([
+ ["a", fakeClient(async () => ({ formatted: "", slow: false, timedOut: false }))],
+ ["b", fakeClient(async () => ({ formatted: "", slow: false, timedOut: true }))],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ expect(result.timedOut).toBe(true);
+ // The skip notice names the offending server and the cap.
+ expect(result.formatted).toContain("[Steep]");
+ expect(result.formatted).toContain("took too long");
+ expect(result.formatted).toContain(">10s");
+ expect(result.formatted).toContain("raise this to the user");
+ // ruby-lsp answered cleanly (empty diagnostics) → no line for it.
+ expect(result.formatted).not.toContain("[Ruby-LSP]");
+ });
+
+ it("runs servers concurrently: a slow server does not delay a fast one's contribution order", async () => {
+ const callOrder: string[] = [];
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async () => {
+ callOrder.push("a-start");
+ await new Promise((r) => setTimeout(r, 5));
+ callOrder.push("a-end");
+ return { formatted: "from-a", slow: false, timedOut: false };
+ }),
+ ],
+ [
+ "b",
+ fakeClient(async () => {
+ callOrder.push("b-start");
+ await new Promise((r) => setTimeout(r, 30));
+ callOrder.push("b-end");
+ return { formatted: "from-b", slow: false, timedOut: false };
+ }),
+ ],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ // Both started before either ended → concurrent, not sequential.
+ expect(callOrder.slice(0, 2).sort()).toEqual(["a-start", "b-start"]);
+ expect(result.formatted).toContain("from-a");
+ expect(result.formatted).toContain("from-b");
+ });
+
+ it("a missing client (dead/excluded) contributes nothing and never rejects", async () => {
+ const result = await aggregateDiagnostics(() => undefined, [SERVER_A], "/p/x.rb", 10_000, {});
+ expect(result.formatted).toBe("");
+ expect(result.timedOut).toBe(false);
+ });
+
+ it("forwards text + minSeverity to each client's waitForDiagnostics", async () => {
+ const seen: Array<{ text?: string; minSeverity?: number; timeoutMs: number }> = [];
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async (_path, opts) => {
+ seen.push({
+ text: opts?.text,
+ minSeverity: opts?.minSeverity,
+ timeoutMs: opts?.timeoutMs ?? -1,
+ });
+ return { formatted: "", slow: false, timedOut: false };
+ }),
+ ],
+ ]);
+
+ await aggregateDiagnostics((id) => clients.get(id), [SERVER_A], "/p/x.rb", 7000, {
+ text: "post-edit buffer",
+ minSeverity: 2,
+ });
+
+ expect(seen).toHaveLength(1);
+ expect(seen[0]?.text).toBe("post-edit buffer");
+ expect(seen[0]?.minSeverity).toBe(2);
+ expect(seen[0]?.timeoutMs).toBe(7000);
+ });
+});
diff --git a/packages/lsp/src/aggregate.ts b/packages/lsp/src/aggregate.ts
new file mode 100644
index 0000000..b044e21
--- /dev/null
+++ b/packages/lsp/src/aggregate.ts
@@ -0,0 +1,80 @@
+/**
+ * Concurrent multi-server diagnostics aggregation.
+ *
+ * Queries every matching language server AT ONCE (not one-at-a-time), each
+ * capped at `timeoutMs`. A server that doesn't push diagnostics within the cap
+ * is SKIPPED with a per-server notice rather than blocking the others — so one
+ * slow/dead server (e.g. a corrupted Steep) can't hold up the fast one's
+ * (ruby-lsp) results for the full timeout on every edit.
+ *
+ * The only I/O here is `client.waitForDiagnostics` (injected via `getClient`),
+ * so this is unit-testable with a fake client and no real process.
+ */
+
+import type { LanguageServerClient } from "./client.js";
+
+export interface AggregateServer {
+ readonly id: string;
+ readonly name: string;
+ readonly root: string;
+}
+
+export interface AggregateOpts {
+ /** Post-edit buffer; when omitted the server reads from disk. */
+ readonly text?: string | undefined;
+ /** Only include diagnostics with severity ≤ this (1=Error, 2=Warning). */
+ readonly minSeverity?: number | undefined;
+}
+
+export interface AggregateResult {
+ /** Merged diagnostics tagged by source + a per-skipped-server notice. */
+ readonly formatted: string;
+ /** True if at least one server was skipped for exceeding the cap. */
+ readonly timedOut: boolean;
+}
+
+/**
+ * Query `servers` concurrently, each capped at `timeoutMs`. Returns merged
+ * diagnostics tagged by source (`[name]\n…`) and, for any server that did not
+ * respond in time, a `⚠️ [name] LSP took too long (>Ns), skipped — please raise
+ * this to the user.` notice. Never rejects: a client error yields an empty
+ * contribution for that server.
+ */
+export async function aggregateDiagnostics(
+ getClient: (id: string, root: string) => LanguageServerClient | undefined,
+ servers: readonly AggregateServer[],
+ absolutePath: string,
+ timeoutMs: number,
+ opts: AggregateOpts,
+): Promise<AggregateResult> {
+ const entries = await Promise.all(
+ servers.map(async (server) => {
+ const client = getClient(server.id, server.root);
+ if (!client) return null;
+ const waitOpts: { text?: string; timeoutMs: number; minSeverity?: number } = { timeoutMs };
+ if (opts.text !== undefined) waitOpts.text = opts.text;
+ if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity;
+ const result = await client.waitForDiagnostics(absolutePath, waitOpts);
+ return { server, result };
+ }),
+ );
+
+ const parts: string[] = [];
+ let timedOut = false;
+ const capSeconds = Math.round(timeoutMs / 1000);
+
+ for (const entry of entries) {
+ if (!entry) continue;
+ const { server, result } = entry;
+ if (result.timedOut) {
+ timedOut = true;
+ parts.push(
+ `⚠️ [${server.name}] LSP took too long (>${capSeconds}s), diagnostics skipped — please raise this to the user.`,
+ );
+ } else if (result.formatted) {
+ parts.push(`[${server.name}]\n${result.formatted}`);
+ }
+ }
+
+ return { formatted: parts.join("\n\n"), timedOut };
+}
diff --git a/packages/lsp/src/client.test.ts b/packages/lsp/src/client.test.ts
index 681860f..338ef0b 100644
--- a/packages/lsp/src/client.test.ts
+++ b/packages/lsp/src/client.test.ts
@@ -3,6 +3,7 @@ import {
type FileWatcher,
type FsAccess,
LanguageServerClient,
+ type ProcessExitHandler,
type SpawnProcess,
} from "./client.js";
import { encode } from "./framing.js";
@@ -288,4 +289,149 @@ describe("client", () => {
client.shutdown();
expect(state.killed).toBe(true);
});
+
+ it("onExit marks the client broken (error) so callers stop querying a corpse", async () => {
+ const state = { killed: false };
+ let exitCb: ProcessExitHandler | null = null;
+ const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null };
+
+ const spawnWithExit: SpawnProcess = () => ({
+ stdin: { write: () => {} },
+ stdout: {
+ on: (_event: string, cb: (data: Uint8Array) => void) => {
+ stdoutHolder.cb = cb;
+ },
+ },
+ pid: 999,
+ kill: () => {
+ state.killed = true;
+ },
+ onExit: (handler) => {
+ exitCb = handler;
+ },
+ });
+
+ const { client } = makeClient({ spawn: spawnWithExit });
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ stdoutHolder.cb?.(
+ encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })),
+ );
+ await startPromise;
+ expect(client.getState()).toBe("connected");
+
+ // Simulate the process dying (user kill / crash).
+ exitCb?.({ code: 1 });
+
+ expect(client.getState()).toBe("error");
+ expect(client.getStateError()).toMatch(/process exited/);
+ // The (still-alive-in-test) process was killed to avoid a zombie.
+ expect(state.killed).toBe(true);
+ });
+
+ it("a dead client is skipped by waitForDiagnostics callers (state !== connected)", async () => {
+ // Build a client, connect, kill via onExit, then assert a diagnostics
+ // query would not block: getState() is "error" so the matching filter
+ // (state === "connected") excludes it. We assert the state guard.
+ const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null };
+ let exitCb: ProcessExitHandler | null = null;
+ const spawnWithExit: SpawnProcess = () => ({
+ stdin: { write: () => {} },
+ stdout: {
+ on: (_e, cb) => {
+ stdoutHolder.cb = cb;
+ },
+ },
+ pid: 1,
+ kill: () => {},
+ onExit: (handler) => {
+ exitCb = handler;
+ },
+ });
+
+ const { client } = makeClient({ spawn: spawnWithExit });
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ stdoutHolder.cb?.(
+ encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })),
+ );
+ await startPromise;
+
+ exitCb?.({ code: null });
+ expect(client.getState()).toBe("error");
+ // The aggregate / getDiagnostics matching filter requires "connected".
+ expect(client.getState() === "connected").toBe(false);
+ });
+
+ it("corruption detector marks the client broken after repeated identical diagnostics despite text changes", async () => {
+ // A healthy server would change diagnostics as the file changes; a
+ // corrupted one re-emits the SAME non-empty set. Drive 5 edits with
+ // different text but identical diagnostics → client flips to error.
+ const { client, serverResponses } = makeClient();
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } }));
+ await startPromise;
+
+ const phantom = JSON.stringify({
+ jsonrpc: "2.0",
+ method: "textDocument/publishDiagnostics",
+ params: {
+ uri: "file:///project/game.rb",
+ diagnostics: [
+ {
+ range: { start: { line: 0, character: 27 }, end: { line: 0, character: 28 } },
+ severity: 1,
+ message: "SyntaxError: unexpected token",
+ },
+ ],
+ },
+ });
+
+ const path = "/project/game.rb";
+ // The first call establishes the baseline snapshot (no increment).
+ // Each subsequent call with identical diagnostics + changed text
+ // increments; the 6th call (5th increment) trips the threshold.
+ for (let i = 1; i <= 5; i++) {
+ const p = client.waitForDiagnostics(path, { text: `buf-v${i}`, timeoutMs: 2000 });
+ // Push the identical phantom diagnostics so the poll resolves.
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(phantom);
+ await p;
+ expect(client.getState()).toBe("connected");
+ }
+ // 6th identical-across-changed-text repeat trips the threshold.
+ const p6 = client.waitForDiagnostics(path, { text: "buf-v6", timeoutMs: 2000 });
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(phantom);
+ await p6;
+
+ expect(client.getState()).toBe("error");
+ expect(client.getStateError()).toMatch(/repeated stale diagnostics/i);
+ });
+
+ it("corruption detector does NOT trip on a clean file (empty diagnostics stay identical)", async () => {
+ const { client, serverResponses } = makeClient();
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } }));
+ await startPromise;
+
+ const clean = JSON.stringify({
+ jsonrpc: "2.0",
+ method: "textDocument/publishDiagnostics",
+ params: { uri: "file:///project/game.rb", diagnostics: [] },
+ });
+ const path = "/project/game.rb";
+
+ for (let i = 1; i <= 6; i++) {
+ const p = client.waitForDiagnostics(path, { text: `clean-v${i}`, timeoutMs: 2000 });
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(clean);
+ await p;
+ }
+ // Empty diagnostics never count as "stale" — a clean file staying clean
+ // is normal, not corruption.
+ expect(client.getState()).toBe("connected");
+ });
});
diff --git a/packages/lsp/src/client.ts b/packages/lsp/src/client.ts
index 677a22a..ac7d025 100644
--- a/packages/lsp/src/client.ts
+++ b/packages/lsp/src/client.ts
@@ -4,13 +4,22 @@
* FileWatcher, and forwards matching disk changes.
*/
-import { DiagnosticsStore, type PublishDiagnosticsParams } from "./diagnostics.js";
+import { DiagnosticsStore, diagnosticKey, type PublishDiagnosticsParams } from "./diagnostics.js";
import { computeChangeRange } from "./diff.js";
import { FrameDecoder } from "./framing.js";
import { languageId as resolveLanguageId } from "./language.js";
import { JsonRpcConnection, type WriteFn } from "./rpc.js";
import { FileChangeType, WatchedFilesRegistry } from "./watched-files.js";
+/** Info delivered to an `onExit` handler when the child process terminates. */
+export interface ProcessExitInfo {
+ readonly code: number | null;
+ readonly signal?: string;
+}
+
+/** A handler registered to be called when the child process exits. */
+export type ProcessExitHandler = (info: ProcessExitInfo) => void;
+
export interface SpawnedProcess {
readonly stdin: { readonly write: (bytes: Uint8Array) => void };
readonly stdout:
@@ -22,6 +31,13 @@ export interface SpawnedProcess {
| undefined;
readonly pid: number | undefined;
readonly kill: () => void;
+ /**
+ * Register a handler fired when the child process exits (code|signal).
+ * Optional: when absent, death is detected via stdout-end instead. Wires
+ * Bun's `proc.exited` in production; tests invoke it directly to simulate
+ * a crash. Lets the client stop querying a dead server (no per-edit hang).
+ */
+ readonly onExit?: ((handler: ProcessExitHandler) => void) | undefined;
}
export type SpawnProcess = (
@@ -102,6 +118,18 @@ export class LanguageServerClient {
private openDocuments = new Map<string, { version: number; text: string }>();
/** Sync mode captured from the server's initialize capabilities: 1=Full, 2=Incremental. */
private textDocumentChange: 1 | 2 = 1;
+ /**
+ * Corruption detection: the last diagnostic-key set + synced text per URI.
+ * A healthy server's diagnostics change when the file changes; a corrupted
+ * one (e.g. Steep's ~3h phantom-SyntaxError drift) re-emits the identical
+ * non-empty set across edits. `staleRepeat` counts consecutive such repeats
+ * across URIs; at the threshold the client is marked broken (→ respawn).
+ */
+ private lastDiagSnapshot = new Map<string, { keys: Set<string>; text: string }>();
+ private staleRepeat = 0;
+ private static readonly STALE_REPEAT_THRESHOLD = 5;
+ /** Default timeout for outbound requests (hover/definition/references). */
+ private static readonly REQUEST_TIMEOUT_MS = 10_000;
constructor(deps: ClientDeps) {
this.deps = deps;
@@ -128,6 +156,12 @@ export class LanguageServerClient {
}
const proc = this.deps.spawn(this.deps.command as string[], spawnOpts);
this.process = proc;
+ // Detect process death so we stop querying a corpse (fixes the
+ // per-edit hang after a server is killed/crashes). onExit is the
+ // primary signal; stdout-end is the defence-in-depth fallback.
+ if (proc.onExit) {
+ proc.onExit((info) => this.handleExit(info));
+ }
const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes);
const rpc = new JsonRpcConnection(writeFn);
@@ -158,8 +192,11 @@ export class LanguageServerClient {
for await (const chunk of source) {
this.handleBytes(chunk);
}
+ // stdout closed — the process is gone (defence-in-depth alongside onExit,
+ // which some edges never call). Idempotent via handleExit's guard.
+ this.handleExit({ code: null });
} catch {
- // process exited
+ this.handleExit({ code: null });
}
})();
}
@@ -172,6 +209,65 @@ export class LanguageServerClient {
});
}
+ /**
+ * The server process exited (onExit or stdout-end). Transition to a broken
+ * state so callers skip it and the manager re-spawns after backoff — instead
+ * of polling a corpse for the full timeout on every edit. Idempotent.
+ */
+ private handleExit(info: ProcessExitInfo): void {
+ if (this.state === "error" || this.state === "not-started") return;
+ const detail = info.signal !== undefined ? `signal ${info.signal}` : `code ${info.code ?? "?"}`;
+ this.markBroken(`language server process exited (${detail})`);
+ }
+
+ /**
+ * Mark this client permanently broken: kill the process if still alive
+ * (corruption case), dispose the rpc (rejects pending requests), and drop
+ * edge handles. The manager's status() observes state:"error" and re-spawns
+ * after the bounded backoff. Called on process death AND on corruption.
+ */
+ private markBroken(reason: string): void {
+ if (this.state === "error") return;
+ this.state = "error";
+ this.stateError = reason;
+ this.fileWatcherHandle?.close();
+ this.fileWatcherHandle = null;
+ this.process?.kill();
+ this.process = null;
+ this.rpc?.dispose();
+ this.rpc = null;
+ }
+
+ /**
+ * Detect a server stuck re-emitting identical non-empty diagnostics
+ * despite the file content changing between calls — the signature of a
+ * corrupted parse/type-check state (e.g. Steep's ~3h phantom-SyntaxError
+ * drift, where a fresh CLI reports green on the same project). After
+ * STALE_REPEAT_THRESHOLD consecutive such repeats, mark the client broken
+ * so it is skipped + re-spawned. A clean file (empty diagnostics) or a
+ * genuinely changing diagnostic set resets the counter. Note the
+ * tradeoff: a real, unfixed error on an untouched line also "stays the
+ * same across edits", so this can false-positive on a healthy server —
+ * the threshold is set conservatively and the CLI type-check gate remains
+ * authoritative either way.
+ */
+ private detectStaleDiagnostics(uri: string, text: string): void {
+ const merged = this.diagnostics.getMerged(uri);
+ const keys = new Set(merged.map((d) => diagnosticKey(d)));
+ const prev = this.lastDiagSnapshot.get(uri);
+ if (prev && keys.size > 0 && setsEqual(keys, prev.keys) && text !== prev.text) {
+ this.staleRepeat++;
+ } else {
+ this.staleRepeat = 0;
+ }
+ this.lastDiagSnapshot.set(uri, { keys, text });
+ if (this.staleRepeat >= LanguageServerClient.STALE_REPEAT_THRESHOLD) {
+ this.markBroken(
+ "language server emitting repeated stale diagnostics despite file changes — likely corrupted; restarting",
+ );
+ }
+ }
+
private handleBytes(chunk: Uint8Array): void {
const messages = this.decoder.decode(chunk);
for (const msg of messages) {
@@ -403,26 +499,37 @@ export class LanguageServerClient {
await this.open(filePath);
}
- const slowThreshold = 10_000;
const start = Date.now();
- // Poll until the server pushes diagnostics (even empty = done) or timeout.
- return new Promise((resolve) => {
+ // Poll until the server pushes diagnostics (even empty = done) or the
+ // per-server cap elapses (then we skip it — see aggregateDiagnostics).
+ const received = await new Promise<boolean>((resolve) => {
const check = () => {
const elapsed = Date.now() - start;
- const received = this.diagnostics.hasReceivedPush(uri);
- if (received || elapsed >= timeoutMs) {
- resolve({
- formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity),
- slow: elapsed > slowThreshold,
- timedOut: !received,
- });
+ const got = this.diagnostics.hasReceivedPush(uri);
+ if (got || elapsed >= timeoutMs) {
+ resolve(got);
return;
}
setTimeout(check, 100);
};
check();
});
+
+ // Only a server that actually pushed can be corruption-checked.
+ if (received) {
+ this.detectStaleDiagnostics(uri, opts?.text ?? "");
+ }
+
+ // `slow` is structurally false now: the per-server cap is 10s, so
+ // elapsed can never exceed the old "unusually long" threshold. That
+ // warning is superseded by the timeout→skip notice produced in
+ // aggregateDiagnostics. The field is kept for contract compatibility.
+ return {
+ formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity),
+ slow: false,
+ timedOut: !received,
+ };
}
getWatchedFilesRegistry(): WatchedFilesRegistry {
@@ -433,11 +540,21 @@ export class LanguageServerClient {
return this.diagnostics;
}
- async request(method: string, params?: unknown): Promise<unknown> {
+ /**
+ * Send a request (hover/definition/references/documentSymbol). Capped at
+ * REQUEST_TIMEOUT_MS so a dead/slow server can't hang the turn — the
+ * initialize handshake bypasses this (it calls rpc.sendRequest directly
+ * with its own 45s race).
+ */
+ async request(
+ method: string,
+ params?: unknown,
+ timeoutMs: number = LanguageServerClient.REQUEST_TIMEOUT_MS,
+ ): Promise<unknown> {
if (!this.rpc || this.state !== "connected") {
throw new Error("Client not connected");
}
- return this.rpc.sendRequest(method, params);
+ return this.rpc.sendRequest(method, params, timeoutMs);
}
shutdown(): void {
@@ -450,3 +567,11 @@ export class LanguageServerClient {
this.state = "not-started";
}
}
+
+function setsEqual<T>(a: Set<T>, b: Set<T>): boolean {
+ if (a.size !== b.size) return false;
+ for (const v of a) {
+ if (!b.has(v)) return false;
+ }
+ return true;
+}
diff --git a/packages/lsp/src/diagnostics.ts b/packages/lsp/src/diagnostics.ts
index bc7ac0a..50beca9 100644
--- a/packages/lsp/src/diagnostics.ts
+++ b/packages/lsp/src/diagnostics.ts
@@ -91,7 +91,7 @@ export class DiagnosticsStore {
}
}
-function diagnosticKey(d: Diagnostic): string {
+export function diagnosticKey(d: Diagnostic): string {
const r = d.range;
return `${r.start.line}:${r.start.character}-${r.end.line}:${r.end.character}:${d.severity ?? 0}:${d.message}`;
}
diff --git a/packages/lsp/src/extension.ts b/packages/lsp/src/extension.ts
index 8e3178a..c0fee44 100644
--- a/packages/lsp/src/extension.ts
+++ b/packages/lsp/src/extension.ts
@@ -8,6 +8,7 @@
import { extname, join } from "node:path";
import type { Extension, HostAPI, ServiceHandle } from "@dispatch/kernel";
import { defineService } from "@dispatch/kernel";
+import { aggregateDiagnostics } from "./aggregate.js";
import type { SpawnedProcess } from "./client.js";
import { LspManager } from "./manager.js";
import { createLspTool } from "./tool.js";
@@ -43,6 +44,14 @@ function realSpawn(
stderr: proc.stderr,
pid: proc.pid,
kill: () => proc.kill(),
+ // Surface process exit so the client can stop querying a dead server
+ // and self-heal (respawn). Bun's Subprocess.exited resolves with the
+ // exit code (or rejects if killed by signal — treat as code:null).
+ onExit: (handler) => {
+ (proc as { exited: Promise<number | null> }).exited
+ .then((code) => handler({ code }))
+ .catch(() => handler({ code: null }));
+ },
};
}
@@ -108,14 +117,20 @@ export const extension: Extension = {
return manager.status(cwd);
},
async getDiagnostics(opts: GetDiagnosticsOpts): Promise<DiagnosticsResult> {
- const timeoutMs = opts.timeoutMs ?? 60_000;
- const slowThreshold = 10_000;
+ // 10s hard ceiling per server, regardless of what the caller
+ // passes (the edit hook still passes 60_000 — clamped here, so
+ // no other-unit edit is needed). A server that doesn't respond
+ // in 10s is skipped with a notice instead of waited out.
+ const PER_SERVER_CAP_MS = 10_000;
+ const timeoutMs = Math.min(opts.timeoutMs ?? PER_SERVER_CAP_MS, PER_SERVER_CAP_MS);
const fileExt = extname(opts.filePath).toLowerCase();
const absolutePath = opts.filePath.startsWith("/")
? opts.filePath
: join(opts.cwd, opts.filePath);
// Get all connected servers matching this file's extension.
+ // A dead/corrupted server has state:"error" and is excluded —
+ // no per-edit hang on a corpse.
const statuses = await manager.status(opts.cwd);
const matching = statuses.filter(
(s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt),
@@ -125,34 +140,15 @@ export const extension: Extension = {
return { formatted: "", slow: false, timedOut: false };
}
- const parts: string[] = [];
- let anySlow = false;
- let anyTimedOut = false;
- const start = Date.now();
-
- for (const s of matching) {
- const client = manager.getClient(s.id, s.root);
- if (!client) continue;
- const waitOpts: { text?: string; timeoutMs?: number; minSeverity?: number } = {
- timeoutMs,
- };
- if (opts.text !== undefined) waitOpts.text = opts.text;
- if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity;
- const result = await client.waitForDiagnostics(absolutePath, waitOpts);
- if (result.slow) anySlow = true;
- if (result.timedOut) anyTimedOut = true;
- if (result.formatted) {
- parts.push(`[${s.name}]\n${result.formatted}`);
- }
- }
-
- const elapsed = Date.now() - start;
+ const agg = await aggregateDiagnostics(
+ (id, root) => manager.getClient(id, root),
+ matching,
+ absolutePath,
+ timeoutMs,
+ { text: opts.text, minSeverity: opts.minSeverity },
+ );
- return {
- formatted: parts.join("\n\n"),
- slow: anySlow || elapsed > slowThreshold,
- timedOut: anyTimedOut,
- };
+ return { formatted: agg.formatted, slow: false, timedOut: agg.timedOut };
},
};
host.provideService(lspServiceHandle, service);
diff --git a/packages/lsp/src/manager.test.ts b/packages/lsp/src/manager.test.ts
index 1649111..d8cba4f 100644
--- a/packages/lsp/src/manager.test.ts
+++ b/packages/lsp/src/manager.test.ts
@@ -361,4 +361,92 @@ describe("manager", () => {
expect(s[0]?.error).toContain("[from .dispatch/lsp.json]");
expect(s[0]?.error).toContain("spawn failed");
});
+
+ it("a client that dies after connecting is skipped + re-spawned after backoff (no storm, no eternal hang)", async () => {
+ // A spawn that completes the initialize handshake AND lets the test
+ // simulate process death via the captured onExit handler.
+ const exitHandlers: Array<(info: { code: number | null; signal?: string }) => void> = [];
+ let spawnCount = 0;
+ const spawn: SpawnProcess = () => {
+ spawnCount++;
+ let messageHandler: ((data: Uint8Array) => void) | null = null;
+ const proc: SpawnedProcess = {
+ stdin: {
+ write: (bytes: Uint8Array) => {
+ const decoded = new TextDecoder().decode(bytes);
+ const headerEnd = decoded.indexOf("\r\n\r\n");
+ if (headerEnd === -1) return;
+ const json = decoded.slice(headerEnd + 4);
+ try {
+ const msg = JSON.parse(json);
+ if (msg.method === "initialize") {
+ setTimeout(() => {
+ const response = JSON.stringify({
+ jsonrpc: "2.0",
+ id: msg.id,
+ result: { capabilities: {} },
+ });
+ messageHandler?.(encode(response));
+ }, 1);
+ }
+ } catch {
+ // ignore
+ }
+ },
+ },
+ stdout: {
+ on: (_event: string, cb: (data: Uint8Array) => void) => {
+ messageHandler = cb;
+ },
+ },
+ pid: 1000 + spawnCount,
+ kill: () => {},
+ onExit: (handler) => {
+ exitHandlers.push(handler);
+ },
+ };
+ return proc;
+ };
+
+ const clock = { now: 0 };
+ const manager = new LspManager({
+ spawn,
+ fileWatcher: noopFileWatcher(),
+ fs: fakeFs({
+ "/project/.dispatch/lsp.json": JSON.stringify({
+ servers: {
+ steep: {
+ command: ["steep", "--stdio"],
+ extensions: [".rb"],
+ rootMarkers: [],
+ },
+ },
+ }),
+ }),
+ now: () => clock.now,
+ });
+
+ // 1) Connects.
+ const s1 = await manager.status("/project");
+ expect(s1[0]?.state).toBe("connected");
+ expect(spawnCount).toBe(1);
+
+ // 2) Simulate the process dying (user kill / crash) via onExit.
+ exitHandlers[0]?.({ code: 1 });
+ const clientAfterDeath = manager.getClient("steep", "/project");
+ expect(clientAfterDeath?.getState()).toBe("error");
+
+ // 3) status() now reports error (and seeds a broken entry for backoff).
+ // Backoff not elapsed yet (clock frozen at 0) → NOT re-spawned.
+ const s2 = await manager.status("/project");
+ expect(s2[0]?.state).toBe("error");
+ expect(s2[0]?.error).toMatch(/process exited/);
+ expect(spawnCount).toBe(1); // no retry storm before backoff
+
+ // 4) After the backoff elapses, status() re-spawns a fresh server.
+ clock.now = 31_000;
+ const s3 = await manager.status("/project");
+ expect(s3[0]?.state).toBe("connected");
+ expect(spawnCount).toBe(2); // re-spawned exactly once
+ });
});
diff --git a/packages/lsp/src/manager.ts b/packages/lsp/src/manager.ts
index 7153956..bc84479 100644
--- a/packages/lsp/src/manager.ts
+++ b/packages/lsp/src/manager.ts
@@ -134,6 +134,19 @@ export class LspManager {
if (existing) {
const state = existing.client.getState();
const stateError = existing.client.getStateError();
+ // A client that died or corrupted AFTER connecting flipped its
+ // own state to "error" (client.ts handleExit/markBroken). Spawn
+ // succeeded so there's no broken entry yet — seed one so the
+ // bounded-backoff path above re-spawns it, instead of reporting
+ // error forever (and so getDiagnostics' "connected" filter skips
+ // it, avoiding a per-edit hang on the corpse).
+ if (state === "error" && !this.broken.has(key)) {
+ this.broken.set(key, {
+ configFingerprint: configFingerprint(server),
+ brokenAt: this.now(),
+ error: enrichError(server, stateError ?? "server unavailable"),
+ });
+ }
const status: LspServerStatus = {
id: server.id,
name: server.name,
diff --git a/packages/lsp/src/rpc.test.ts b/packages/lsp/src/rpc.test.ts
index 05ce924..7b22ec5 100644
--- a/packages/lsp/src/rpc.test.ts
+++ b/packages/lsp/src/rpc.test.ts
@@ -84,3 +84,29 @@ it("handleMessage does not throw on malformed JSON", async () => {
await expect(conn.handleMessage("")).resolves.toBeUndefined();
await expect(conn.handleMessage("not json at all")).resolves.toBeUndefined();
});
+
+describe("sendRequest timeout", () => {
+ it("rejects with a timeout error when no response arrives within timeoutMs", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("textDocument/hover", {}, 50);
+ await expect(promise).rejects.toThrow(/LSP request timed out after 50ms: textDocument\/hover/);
+ });
+
+ it("clears the timer on a normal response (no unhandled rejection)", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("textDocument/hover", {}, 5000);
+ conn.handleMessage(frameResponse(1, { ok: true }));
+ await expect(promise).resolves.toEqual({ ok: true });
+ // Give the (now-cleared) timer window ample time to prove it never fires.
+ await new Promise((r) => setTimeout(r, 80));
+ });
+
+ it("does not time out when no timeoutMs is given (initialize handshake path)", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("initialize", {});
+ // A late response well past any plausible default still resolves.
+ await new Promise((r) => setTimeout(r, 60));
+ conn.handleMessage(frameResponse(1, { capabilities: {} }));
+ await expect(promise).resolves.toEqual({ capabilities: {} });
+ });
+});
diff --git a/packages/lsp/src/rpc.ts b/packages/lsp/src/rpc.ts
index 6b82624..95157de 100644
--- a/packages/lsp/src/rpc.ts
+++ b/packages/lsp/src/rpc.ts
@@ -39,11 +39,36 @@ export class JsonRpcConnection {
this.write = write;
}
- sendRequest(method: string, params?: unknown): Promise<unknown> {
+ /**
+ * Send a request and await the correlated response. If `timeoutMs` is given,
+ * the promise rejects with a timeout error after that long — so a dead/slow
+ * server can't hang the caller forever (hover/definition/references).
+ * No `timeoutMs` = wait indefinitely (used by the initialize handshake, which
+ * has its own 45s race).
+ */
+ sendRequest(method: string, params?: unknown, timeoutMs?: number): Promise<unknown> {
const id = this.nextId++;
const msg: JsonRpcMessage = { jsonrpc: "2.0", id, method, params };
return new Promise((resolve, reject) => {
- this.pending.set(id, { resolve, reject });
+ let timer: ReturnType<typeof setTimeout> | undefined;
+ // Wrap resolve/reject so the timer is cleared on a normal response
+ // (or on dispose) — no dangling timer after completion.
+ const finish = (fn: () => void): void => {
+ if (timer) clearTimeout(timer);
+ fn();
+ };
+ const entry: PendingRequest = {
+ resolve: (value: unknown) => finish(() => resolve(value)),
+ reject: (reason: unknown) => finish(() => reject(reason)),
+ };
+ if (timeoutMs !== undefined) {
+ timer = setTimeout(() => {
+ if (this.pending.delete(id)) {
+ reject(new Error(`LSP request timed out after ${timeoutMs}ms: ${method}`));
+ }
+ }, timeoutMs);
+ }
+ this.pending.set(id, entry);
this.sendMessage(msg);
});
}
diff --git a/packages/lsp/src/tool.test.ts b/packages/lsp/src/tool.test.ts
index 03787ae..efd1514 100644
--- a/packages/lsp/src/tool.test.ts
+++ b/packages/lsp/src/tool.test.ts
@@ -176,4 +176,101 @@ describe("tool", () => {
expect(result.isError).toBe(true);
expect(result.content).toContain("requires both");
});
+
+ it("diagnostics op: a server that times out is skipped with a raise-to-user notice", async () => {
+ const mockClient = {
+ getState: () => "connected" as const,
+ getStateError: () => undefined,
+ request: async () => null,
+ waitForDiagnostics: async () => ({ formatted: "", slow: false, timedOut: true }),
+ };
+
+ const tool = createLspTool(
+ stubManager({
+ status: async () => [
+ {
+ id: "steep",
+ name: "Steep",
+ root: "/project",
+ extensions: [".rb"],
+ state: "connected",
+ },
+ ],
+ getClient: () => mockClient as never,
+ }),
+ );
+
+ const result = await tool.execute(
+ { operation: "diagnostics", path: "game.rb" },
+ {
+ toolCallId: "test",
+ onOutput: () => {},
+ signal: AbortSignal.timeout(5000),
+ log: {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: () => ({}) as never,
+ span: () => ({}) as never,
+ },
+ cwd: "/project",
+ },
+ );
+
+ expect(result.isError).not.toBe(true);
+ expect(result.content).toContain("[Steep]");
+ expect(result.content).toContain("took too long");
+ expect(result.content).toContain(">10s");
+ expect(result.content).toContain("raise this to the user");
+ });
+
+ it("diagnostics op: responding servers' diagnostics are merged, tagged by source", async () => {
+ const mockClient = {
+ getState: () => "connected" as const,
+ getStateError: () => undefined,
+ request: async () => null,
+ waitForDiagnostics: async () => ({
+ formatted: "ERROR L1:1: boom",
+ slow: false,
+ timedOut: false,
+ }),
+ };
+
+ const tool = createLspTool(
+ stubManager({
+ status: async () => [
+ {
+ id: "steep",
+ name: "Steep",
+ root: "/project",
+ extensions: [".rb"],
+ state: "connected",
+ },
+ ],
+ getClient: () => mockClient as never,
+ }),
+ );
+
+ const result = await tool.execute(
+ { operation: "diagnostics", path: "game.rb" },
+ {
+ toolCallId: "test",
+ onOutput: () => {},
+ signal: AbortSignal.timeout(5000),
+ log: {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: () => ({}) as never,
+ span: () => ({}) as never,
+ },
+ cwd: "/project",
+ },
+ );
+
+ expect(result.content).toContain("[Steep]");
+ expect(result.content).toContain("boom");
+ });
});
diff --git a/packages/lsp/src/tool.ts b/packages/lsp/src/tool.ts
index 8d282ec..be0d269 100644
--- a/packages/lsp/src/tool.ts
+++ b/packages/lsp/src/tool.ts
@@ -6,6 +6,7 @@
import { extname, resolve } from "node:path";
import type { ToolContract, ToolExecuteContext, ToolResult } from "@dispatch/kernel";
+import { aggregateDiagnostics } from "./aggregate.js";
import type { LspManager } from "./manager.js";
type Operation = "diagnostics" | "hover" | "definition" | "references" | "documentSymbol";
@@ -157,6 +158,8 @@ export function createLspTool(manager: LspManager): ToolContract {
switch (operation) {
case "diagnostics": {
+ // 10s hard ceiling per server (same policy as the edit path).
+ const DIAGNOSTICS_TIMEOUT_MS = 10_000;
// Query ALL connected servers whose extensions match this file.
const matching = statuses.filter(
(s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt),
@@ -179,31 +182,27 @@ export function createLspTool(manager: LspManager): ToolContract {
if (!client) {
return { content: "Language server client not available.", isError: true };
}
- const result = await client.waitForDiagnostics(absolutePath);
+ const result = await client.waitForDiagnostics(absolutePath, {
+ timeoutMs: DIAGNOSTICS_TIMEOUT_MS,
+ });
+ if (result.timedOut) {
+ return {
+ content: `⚠️ [${connected.name}] LSP took too long (>10s), diagnostics skipped — please raise this to the user.`,
+ };
+ }
return { content: result.formatted || "No diagnostics found." };
}
- // Query each matching server and merge results, tagged by source.
- const parts: string[] = [];
- let anyTimedOut = false;
- for (const s of matching) {
- const client = manager.getClient(s.id, s.root);
- if (!client) continue;
- const result = await client.waitForDiagnostics(absolutePath, { timeoutMs: 60_000 });
- if (result.timedOut) anyTimedOut = true;
- if (result.slow) {
- parts.push(
- `⚠️ LSP is taking unusually long. If this happens more than once, raise it to the user.`,
- );
- }
- if (result.formatted) {
- parts.push(`[${s.name}]\n${result.formatted}`);
- }
- }
- if (anyTimedOut && parts.length === 0) {
- parts.push("Diagnostics timed out (server may still be indexing).");
- }
- return { content: parts.length > 0 ? parts.join("\n\n") : "No diagnostics found." };
+ // Query matching servers concurrently, each capped at 10s;
+ // a non-responding server is skipped with a notice.
+ const agg = await aggregateDiagnostics(
+ (id, root) => manager.getClient(id, root),
+ matching,
+ absolutePath,
+ DIAGNOSTICS_TIMEOUT_MS,
+ {},
+ );
+ return { content: agg.formatted || "No diagnostics found." };
}
case "hover": {
const client = await getFirstMatchingClient(manager, statuses, fileExt);
diff --git a/packages/tool-edit-file/src/extension.ts b/packages/tool-edit-file/src/extension.ts
index 3929634..2a58fac 100644
--- a/packages/tool-edit-file/src/extension.ts
+++ b/packages/tool-edit-file/src/extension.ts
@@ -33,7 +33,10 @@ export const extension: Extension = {
filePath: opts.filePath,
text: opts.text,
cwd: opts.cwd,
- timeoutMs: 60_000,
+ // 10s matches the LSP service's per-server cap (see packages/lsp).
+ // The service clamps this anyway; stated explicitly so the call
+ // site is honest about the effective live-diagnostics budget.
+ timeoutMs: 10_000,
minSeverity: 2, // errors + warnings only
});
};