summaryrefslogtreecommitdiffhomepage
path: root/packages/lsp/src/client.ts
blob: ac7d025d1c6fe87aee20e8207d778bf7644b9e8b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
/**
 * Language-server client — wires codec + rpc + edges; runs the
 * initialize handshake, honors server→client requests, runs the
 * FileWatcher, and forwards matching disk changes.
 */

import { DiagnosticsStore, diagnosticKey, type PublishDiagnosticsParams } from "./diagnostics.js";
import { computeChangeRange } from "./diff.js";
import { FrameDecoder } from "./framing.js";
import { languageId as resolveLanguageId } from "./language.js";
import { JsonRpcConnection, type WriteFn } from "./rpc.js";
import { FileChangeType, WatchedFilesRegistry } from "./watched-files.js";

/** Info delivered to an `onExit` handler when the child process terminates. */
export interface ProcessExitInfo {
	readonly code: number | null;
	readonly signal?: string;
}

/** A handler registered to be called when the child process exits. */
export type ProcessExitHandler = (info: ProcessExitInfo) => void;

export interface SpawnedProcess {
	readonly stdin: { readonly write: (bytes: Uint8Array) => void };
	readonly stdout:
		| AsyncIterable<Uint8Array>
		| { readonly on: (event: string, cb: (data: Uint8Array) => void) => void };
	readonly stderr?:
		| AsyncIterable<Uint8Array>
		| { readonly on: (event: string, cb: (data: Uint8Array) => void) => void }
		| undefined;
	readonly pid: number | undefined;
	readonly kill: () => void;
	/**
	 * Register a handler fired when the child process exits (code|signal).
	 * Optional: when absent, death is detected via stdout-end instead. Wires
	 * Bun's `proc.exited` in production; tests invoke it directly to simulate
	 * a crash. Lets the client stop querying a dead server (no per-edit hang).
	 */
	readonly onExit?: ((handler: ProcessExitHandler) => void) | undefined;
}

export type SpawnProcess = (
	command: string[],
	opts: { readonly cwd: string; readonly env?: Readonly<Record<string, string>> | undefined },
) => SpawnedProcess;

export interface FileWatcherHandle {
	readonly close: () => void;
}

export type FileWatcher = (
	root: string,
	onEvent: (e: { readonly type: "create" | "change" | "delete"; readonly path: string }) => void,
) => FileWatcherHandle;

export interface FsAccess {
	readonly readText: (path: string) => Promise<string>;
	readonly exists: (path: string) => Promise<boolean>;
}

export interface ClientCapabilities {
	readonly window: { readonly workDoneProgress: boolean };
	readonly workspace: {
		readonly configuration: boolean;
		readonly didChangeWatchedFiles: { readonly dynamicRegistration: boolean };
		readonly diagnostics: { readonly refreshSupport: boolean };
	};
	readonly textDocument: {
		readonly synchronization: { readonly didOpen: boolean; readonly didChange: boolean };
		readonly diagnostic: {
			readonly dynamicRegistration: boolean;
			readonly relatedDocumentSupport: boolean;
		};
		readonly publishDiagnostics: { readonly versionSupport: boolean };
	};
}

export const CLIENT_CAPABILITIES: ClientCapabilities = {
	window: { workDoneProgress: true },
	workspace: {
		configuration: true,
		didChangeWatchedFiles: { dynamicRegistration: true },
		diagnostics: { refreshSupport: false },
	},
	textDocument: {
		synchronization: { didOpen: true, didChange: true },
		diagnostic: { dynamicRegistration: true, relatedDocumentSupport: true },
		publishDiagnostics: { versionSupport: false },
	},
};

export interface ClientDeps {
	readonly spawn: SpawnProcess;
	readonly fileWatcher: FileWatcher;
	readonly fs: FsAccess;
	readonly command: readonly string[];
	readonly env?: Readonly<Record<string, string>> | undefined;
	readonly root: string;
	readonly initialization?: Readonly<Record<string, unknown>> | undefined;
	readonly serverId: string;
}

export type ClientState = "starting" | "connected" | "error" | "not-started";

export class LanguageServerClient {
	readonly serverId: string;
	readonly root: string;
	private process: SpawnedProcess | null = null;
	private rpc: JsonRpcConnection | null = null;
	private decoder = new FrameDecoder();
	private diagnostics = new DiagnosticsStore();
	private watchedFiles = new WatchedFilesRegistry();
	private fileWatcherHandle: FileWatcherHandle | null = null;
	private state: ClientState = "not-started";
	private stateError: string | undefined;
	private deps: ClientDeps;
	private openDocuments = new Map<string, { version: number; text: string }>();
	/** Sync mode captured from the server's initialize capabilities: 1=Full, 2=Incremental. */
	private textDocumentChange: 1 | 2 = 1;
	/**
	 * Corruption detection: the last diagnostic-key set + synced text per URI.
	 * A healthy server's diagnostics change when the file changes; a corrupted
	 * one (e.g. Steep's ~3h phantom-SyntaxError drift) re-emits the identical
	 * non-empty set across edits. `staleRepeat` counts consecutive such repeats
	 * across URIs; at the threshold the client is marked broken (→ respawn).
	 */
	private lastDiagSnapshot = new Map<string, { keys: Set<string>; text: string }>();
	private staleRepeat = 0;
	private static readonly STALE_REPEAT_THRESHOLD = 5;
	/** Default timeout for outbound requests (hover/definition/references). */
	private static readonly REQUEST_TIMEOUT_MS = 10_000;

	constructor(deps: ClientDeps) {
		this.deps = deps;
		this.serverId = deps.serverId;
		this.root = deps.root;
	}

	getState(): ClientState {
		return this.state;
	}

	getStateError(): string | undefined {
		return this.stateError;
	}

	async start(): Promise<void> {
		this.state = "starting";
		try {
			const spawnOpts: { readonly cwd: string; readonly env?: Readonly<Record<string, string>> } = {
				cwd: this.root,
			};
			if (this.deps.env) {
				(spawnOpts as { env?: Readonly<Record<string, string>> }).env = this.deps.env;
			}
			const proc = this.deps.spawn(this.deps.command as string[], spawnOpts);
			this.process = proc;
			// Detect process death so we stop querying a corpse (fixes the
			// per-edit hang after a server is killed/crashes). onExit is the
			// primary signal; stdout-end is the defence-in-depth fallback.
			if (proc.onExit) {
				proc.onExit((info) => this.handleExit(info));
			}

			const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes);
			const rpc = new JsonRpcConnection(writeFn);
			this.rpc = rpc;

			this.setupServerHandlers(rpc);

			const stdoutSource = proc.stdout;
			if (Symbol.asyncIterator in stdoutSource) {
				this.readFromAsyncIterable(stdoutSource as AsyncIterable<Uint8Array>);
			} else {
				this.readFromEventSource(
					stdoutSource as { readonly on: (event: string, cb: (data: Uint8Array) => void) => void },
				);
			}

			await this.initialize(rpc);
			this.state = "connected";
		} catch (err: unknown) {
			this.state = "error";
			this.stateError = err instanceof Error ? err.message : String(err);
		}
	}

	private readFromAsyncIterable(source: AsyncIterable<Uint8Array>): void {
		(async () => {
			try {
				for await (const chunk of source) {
					this.handleBytes(chunk);
				}
				// stdout closed — the process is gone (defence-in-depth alongside onExit,
				// which some edges never call). Idempotent via handleExit's guard.
				this.handleExit({ code: null });
			} catch {
				this.handleExit({ code: null });
			}
		})();
	}

	private readFromEventSource(source: {
		readonly on: (event: string, cb: (data: Uint8Array) => void) => void;
	}): void {
		source.on("data", (data: Uint8Array) => {
			this.handleBytes(data);
		});
	}

	/**
	 * The server process exited (onExit or stdout-end). Transition to a broken
	 * state so callers skip it and the manager re-spawns after backoff — instead
	 * of polling a corpse for the full timeout on every edit. Idempotent.
	 */
	private handleExit(info: ProcessExitInfo): void {
		if (this.state === "error" || this.state === "not-started") return;
		const detail = info.signal !== undefined ? `signal ${info.signal}` : `code ${info.code ?? "?"}`;
		this.markBroken(`language server process exited (${detail})`);
	}

	/**
	 * Mark this client permanently broken: kill the process if still alive
	 * (corruption case), dispose the rpc (rejects pending requests), and drop
	 * edge handles. The manager's status() observes state:"error" and re-spawns
	 * after the bounded backoff. Called on process death AND on corruption.
	 */
	private markBroken(reason: string): void {
		if (this.state === "error") return;
		this.state = "error";
		this.stateError = reason;
		this.fileWatcherHandle?.close();
		this.fileWatcherHandle = null;
		this.process?.kill();
		this.process = null;
		this.rpc?.dispose();
		this.rpc = null;
	}

	/**
	 * Detect a server stuck re-emitting identical non-empty diagnostics
	 * despite the file content changing between calls — the signature of a
	 * corrupted parse/type-check state (e.g. Steep's ~3h phantom-SyntaxError
	 * drift, where a fresh CLI reports green on the same project). After
	 * STALE_REPEAT_THRESHOLD consecutive such repeats, mark the client broken
	 * so it is skipped + re-spawned. A clean file (empty diagnostics) or a
	 * genuinely changing diagnostic set resets the counter. Note the
	 * tradeoff: a real, unfixed error on an untouched line also "stays the
	 * same across edits", so this can false-positive on a healthy server —
	 * the threshold is set conservatively and the CLI type-check gate remains
	 * authoritative either way.
	 */
	private detectStaleDiagnostics(uri: string, text: string): void {
		const merged = this.diagnostics.getMerged(uri);
		const keys = new Set(merged.map((d) => diagnosticKey(d)));
		const prev = this.lastDiagSnapshot.get(uri);
		if (prev && keys.size > 0 && setsEqual(keys, prev.keys) && text !== prev.text) {
			this.staleRepeat++;
		} else {
			this.staleRepeat = 0;
		}
		this.lastDiagSnapshot.set(uri, { keys, text });
		if (this.staleRepeat >= LanguageServerClient.STALE_REPEAT_THRESHOLD) {
			this.markBroken(
				"language server emitting repeated stale diagnostics despite file changes — likely corrupted; restarting",
			);
		}
	}

	private handleBytes(chunk: Uint8Array): void {
		const messages = this.decoder.decode(chunk);
		for (const msg of messages) {
			// handleMessage is async — catch rejections so a malformed
			// message never becomes an unhandled rejection that crashes
			// the server. (handleMessage also has its own try/catch around
			// JSON.parse, but this is the defence-in-depth boundary.)
			void this.rpc?.handleMessage(msg).catch(() => {});
		}
	}

	private setupServerHandlers(rpc: JsonRpcConnection): void {
		rpc.onNotification("textDocument/publishDiagnostics", (params) => {
			this.diagnostics.setPushDiagnostics(params as PublishDiagnosticsParams);
		});

		rpc.onRequest("workspace/configuration", (params) => {
			const { items } = params as { readonly items: readonly { readonly section?: string }[] };
			const init = this.deps.initialization ?? {};
			return items.map((item) => {
				if (item.section) {
					const keys = item.section.split(".");
					let value: unknown = init;
					for (const key of keys) {
						if (value && typeof value === "object" && key in value) {
							value = (value as Record<string, unknown>)[key];
						} else {
							return undefined;
						}
					}
					return value;
				}
				return init;
			});
		});

		rpc.onRequest("workspace/workspaceFolders", () => {
			return [{ uri: `file://${this.root}`, name: this.root }];
		});

		rpc.onRequest("window/workDoneProgress/create", () => null);
		rpc.onRequest("workspace/diagnostic/refresh", () => null);

		rpc.onRequest("client/registerCapability", (params) => {
			const { registrations } = params as {
				readonly registrations: readonly {
					readonly id: string;
					readonly method: string;
					readonly registerOptions?: unknown;
				}[];
			};
			for (const reg of registrations) {
				if (reg.method === "textDocument/diagnostic") {
					// Store diagnostic registration (future use)
				} else if (reg.method === "workspace/didChangeWatchedFiles") {
					const opts = reg.registerOptions as
						| import("./watched-files.js").DidChangeWatchedFilesRegistrationOptions
						| undefined;
					if (opts) {
						this.watchedFiles.applyRegister({
							id: reg.id,
							method: reg.method,
							registerOptions: opts,
						});
					}
				}
			}
			return null;
		});

		rpc.onRequest("client/unregisterCapability", (params) => {
			const { unregistrations } = params as {
				readonly unregistrations: readonly {
					readonly id: string;
					readonly method: string;
				}[];
			};
			for (const unreg of unregistrations) {
				this.watchedFiles.applyUnregister(unreg);
			}
			return null;
		});
	}

	private async initialize(rpc: JsonRpcConnection): Promise<void> {
		const timeout = 45_000;

		const initPromise = rpc.sendRequest("initialize", {
			processId: this.process?.pid ?? null,
			rootUri: `file://${this.root}`,
			workspaceFolders: [{ uri: `file://${this.root}`, name: this.root }],
			capabilities: CLIENT_CAPABILITIES,
		});

		const timeoutPromise = new Promise<never>((_, reject) => {
			setTimeout(() => reject(new Error("Initialize timeout")), timeout);
		});

		const result = (await Promise.race([initPromise, timeoutPromise])) as {
			readonly capabilities?: {
				readonly textDocumentSync?:
					| number
					| { readonly openClose?: boolean; readonly change?: number }
					| undefined;
			};
		};

		// Capture the server's text document sync mode for didChange.
		const sync = result.capabilities?.textDocumentSync;
		if (typeof sync === "number") {
			this.textDocumentChange = sync as 1 | 2;
		} else if (sync && typeof sync === "object" && sync.change !== undefined) {
			this.textDocumentChange = sync.change as 1 | 2;
		}

		rpc.sendNotification("initialized", {});

		if (this.deps.initialization) {
			rpc.sendNotification("workspace/didChangeConfiguration", {
				settings: this.deps.initialization,
			});
		}

		this.startFileWatcher();
	}

	private startFileWatcher(): void {
		const rootPrefix = this.root.endsWith("/") ? this.root : `${this.root}/`;
		this.fileWatcherHandle = this.deps.fileWatcher(this.root, (event) => {
			const changeType =
				event.type === "create"
					? FileChangeType.Created
					: event.type === "delete"
						? FileChangeType.Deleted
						: FileChangeType.Changed;

			const relativePath = event.path.startsWith(rootPrefix)
				? event.path.slice(rootPrefix.length)
				: event.path.replace(/^\/+/, "");

			if (this.watchedFiles.matches(relativePath)) {
				this.rpc?.sendNotification("workspace/didChangeWatchedFiles", {
					changes: [{ uri: `file://${event.path}`, type: changeType }],
				});
			}
		});
	}

	async open(filePath: string): Promise<void> {
		const rpc = this.rpc;
		if (!rpc || this.state !== "connected") return;

		try {
			const text = await this.deps.fs.readText(filePath);
			await this.openWithText(filePath, text);
		} catch {
			// file may not exist
		}
	}

	async openWithText(filePath: string, text: string, langId?: string): Promise<void> {
		const rpc = this.rpc;
		if (!rpc || this.state !== "connected") return;

		// If already open, use didChange instead of re-opening.
		if (this.openDocuments.has(filePath)) {
			await this.change(filePath, text);
			return;
		}

		const version = 1;
		this.openDocuments.set(filePath, { version, text });

		rpc.sendNotification("textDocument/didOpen", {
			textDocument: {
				uri: `file://${filePath}`,
				languageId: langId ?? resolveLanguageId(filePath),
				version,
				text,
			},
		});
	}

	async change(filePath: string, newText: string): Promise<void> {
		const rpc = this.rpc;
		if (!rpc || this.state !== "connected") return;

		const existing = this.openDocuments.get(filePath);
		if (!existing) {
			// Not open yet — didOpen instead.
			await this.openWithText(filePath, newText);
			return;
		}

		const version = existing.version + 1;
		this.openDocuments.set(filePath, { version, text: newText });

		if (this.textDocumentChange === 2) {
			// Incremental sync — compute the minimal change range.
			const changeEvent = computeChangeRange(existing.text, newText);
			rpc.sendNotification("textDocument/didChange", {
				textDocument: { uri: `file://${filePath}`, version },
				contentChanges: [changeEvent],
			});
		} else {
			// Full sync — send the entire content.
			rpc.sendNotification("textDocument/didChange", {
				textDocument: { uri: `file://${filePath}`, version },
				contentChanges: [{ text: newText }],
			});
		}
	}

	async waitForDiagnostics(
		filePath: string,
		opts?: { readonly text?: string; readonly timeoutMs?: number; readonly minSeverity?: number },
	): Promise<{ readonly formatted: string; readonly slow: boolean; readonly timedOut: boolean }> {
		const timeoutMs = opts?.timeoutMs ?? 10_000;
		const uri = `file://${filePath}`;

		// Clear the "received" flag so we detect fresh publishDiagnostics after our sync.
		this.diagnostics.clearReceived(uri);

		// Sync the document: use didChange with the provided text (post-edit buffer)
		// or fall back to didOpen reading from disk.
		if (opts?.text !== undefined) {
			await this.change(filePath, opts.text);
		} else {
			await this.open(filePath);
		}

		const start = Date.now();

		// Poll until the server pushes diagnostics (even empty = done) or the
		// per-server cap elapses (then we skip it — see aggregateDiagnostics).
		const received = await new Promise<boolean>((resolve) => {
			const check = () => {
				const elapsed = Date.now() - start;
				const got = this.diagnostics.hasReceivedPush(uri);
				if (got || elapsed >= timeoutMs) {
					resolve(got);
					return;
				}
				setTimeout(check, 100);
			};
			check();
		});

		// Only a server that actually pushed can be corruption-checked.
		if (received) {
			this.detectStaleDiagnostics(uri, opts?.text ?? "");
		}

		// `slow` is structurally false now: the per-server cap is 10s, so
		// elapsed can never exceed the old "unusually long" threshold. That
		// warning is superseded by the timeout→skip notice produced in
		// aggregateDiagnostics. The field is kept for contract compatibility.
		return {
			formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity),
			slow: false,
			timedOut: !received,
		};
	}

	getWatchedFilesRegistry(): WatchedFilesRegistry {
		return this.watchedFiles;
	}

	getDiagnosticsStore(): DiagnosticsStore {
		return this.diagnostics;
	}

	/**
	 * Send a request (hover/definition/references/documentSymbol). Capped at
	 * REQUEST_TIMEOUT_MS so a dead/slow server can't hang the turn — the
	 * initialize handshake bypasses this (it calls rpc.sendRequest directly
	 * with its own 45s race).
	 */
	async request(
		method: string,
		params?: unknown,
		timeoutMs: number = LanguageServerClient.REQUEST_TIMEOUT_MS,
	): Promise<unknown> {
		if (!this.rpc || this.state !== "connected") {
			throw new Error("Client not connected");
		}
		return this.rpc.sendRequest(method, params, timeoutMs);
	}

	shutdown(): void {
		this.fileWatcherHandle?.close();
		this.fileWatcherHandle = null;
		this.process?.kill();
		this.process = null;
		this.rpc?.dispose();
		this.rpc = null;
		this.state = "not-started";
	}
}

function setsEqual<T>(a: Set<T>, b: Set<T>): boolean {
	if (a.size !== b.size) return false;
	for (const v of a) {
		if (!b.has(v)) return false;
	}
	return true;
}