summaryrefslogtreecommitdiffhomepage
path: root/packages/mcp/src/transport.ts
blob: b1e3ec56d05936233076585f192ea90d47cf50ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
 * StdioTransport — spawns a child process and pipes stdin/stdout through
 * Content-Length framing + JSON-RPC.
 */

import { FrameDecoder } from "./framing.js";
import { JsonRpcClient, type WriteFn } from "./rpc.js";

export interface SpawnedProcess {
	readonly stdin: { readonly write: (bytes: Uint8Array) => void };
	readonly stdout:
		| AsyncIterable<Uint8Array>
		| { readonly on: (event: string, cb: (data: Uint8Array) => void) => void };
	readonly stderr?:
		| AsyncIterable<Uint8Array>
		| { readonly on: (event: string, cb: (data: Uint8Array) => void) => void }
		| undefined;
	readonly pid: number | undefined;
	readonly kill: () => void;
}

export type SpawnProcess = (
	command: readonly string[],
	opts: { readonly cwd: string; readonly env?: Readonly<Record<string, string>> | undefined },
) => SpawnedProcess;

export interface Connection {
	readonly send: (method: string, params?: unknown) => Promise<unknown>;
	readonly notify: (method: string, params?: unknown) => void;
	readonly onNotification: (method: string, handler: (params: unknown) => void) => void;
	readonly close: () => void;
	readonly pid: number | undefined;
}

export interface StdioTransportDeps {
	readonly spawn: SpawnProcess;
	readonly command: readonly string[];
	readonly env?: Readonly<Record<string, string>>;
}

export function createStdioTransport(
	deps: StdioTransportDeps,
	cwd: string,
): { connection: Connection; promise: Promise<void> } {
	const spawnOpts: { readonly cwd: string; readonly env?: Readonly<Record<string, string>> } = {
		cwd,
	};
	if (deps.env) {
		(spawnOpts as { env?: Readonly<Record<string, string>> }).env = deps.env;
	}

	const proc = deps.spawn(deps.command, spawnOpts);
	const decoder = new FrameDecoder();

	const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes);
	const rpc = new JsonRpcClient(writeFn);

	const stdoutSource = proc.stdout;
	const promise = new Promise<void>((resolve, reject) => {
		if (Symbol.asyncIterator in stdoutSource) {
			(async () => {
				try {
					for await (const chunk of stdoutSource as AsyncIterable<Uint8Array>) {
						const messages = decoder.decode(chunk);
						for (const msg of messages) {
							rpc.handleMessage(msg);
						}
					}
					resolve();
				} catch (err: unknown) {
					reject(err);
				}
			})();
		} else {
			const source = stdoutSource as {
				readonly on: (event: string, cb: (data: Uint8Array) => void) => void;
			};
			source.on("data", (data: Uint8Array) => {
				const messages = decoder.decode(data);
				for (const msg of messages) {
					rpc.handleMessage(msg);
				}
			});
			source.on("end", () => resolve());
			source.on("error", (err: unknown) => reject(err));
		}
	});

	const connection: Connection = {
		send: (method, params) => rpc.request(method, params),
		notify: (method, params) => rpc.notify(method, params),
		onNotification: (method, handler) => rpc.onNotification(method, handler),
		close: () => {
			rpc.close();
			proc.kill();
		},
		pid: proc.pid,
	};

	return { connection, promise };
}