diff options
| author | Adam Malczewski <[email protected]> | 2026-06-24 18:50:24 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-24 18:50:24 +0900 |
| commit | 94363d4e5b4fe7f026e06b65fb847342224428d8 (patch) | |
| tree | 92b2016bfc3edb3a958d3e06b67bf2230db7576d | |
| parent | 74a779331041b5aaf0350806a066cdb3444ca0a7 (diff) | |
| download | dispatch-94363d4e5b4fe7f026e06b65fb847342224428d8.tar.gz dispatch-94363d4e5b4fe7f026e06b65fb847342224428d8.zip | |
feat(mcp): Model Context Protocol host extension
New `mcp` standard extension (`packages/mcp/`) that makes Dispatch an MCP
host: spawns configured MCP servers (stdio child processes), performs the MCP
handshake (initialize → notifications/initialized), discovers tools via
tools/list, and registers each as a first-class Dispatch ToolContract via
host.defineTool. When the model calls an MCP tool, the extension proxies the
call to tools/call on the MCP server and returns the flattened result.
Architecture (sibling of `lsp` extension):
- Config: .dispatch/mcp.json (servers key) → opencode.json mcp key fallback,
resolved per-cwd (mirrors LSP config resolution)
- Transport: StdioTransport (spawn child, Content-Length framing + JSON-RPC 2.0)
- Client: initialize → tools/list → tools/call; handles list_changed
notifications for dynamic tool updates
- Registry: tool name namespacing (<serverId>__<toolName>), ToolContract
adapter that proxies execute → callTool, content flattening (text/image/
resource → string)
- Manager: one client per server, lazy-spawn, status(), shutdownAll()
- Extension: manifest (dependsOn session-orchestrator, capabilities spawn),
registers tools + a toolsFilter (drops disconnected server's tools),
mcpServiceHandle, deactivate kills all child processes
Phase 1 scope: stdio only, Tools only (no Resources/Prompts/HTTP/sampling).
Hand-rolled JSON-RPC + framing (zero external deps, adapts LSP patterns).
Wave 1 (agent): 12 source + 8 test files, 69 new tests.
Wave 2 (orchestrator): root tsconfig ref, host-bin CORE_EXTENSIONS
registration + package.json dep, bun install.
Verified: tsc -b EXIT 0, biome clean, 1537 vitest pass (was 1468, +69).
| -rw-r--r-- | bun.lock | 11 | ||||
| -rw-r--r-- | packages/host-bin/package.json | 1 | ||||
| -rw-r--r-- | packages/host-bin/src/main.ts | 2 | ||||
| -rw-r--r-- | packages/mcp/package.json | 12 | ||||
| -rw-r--r-- | packages/mcp/src/client.test.ts | 206 | ||||
| -rw-r--r-- | packages/mcp/src/client.ts | 123 | ||||
| -rw-r--r-- | packages/mcp/src/config.test.ts | 114 | ||||
| -rw-r--r-- | packages/mcp/src/config.ts | 89 | ||||
| -rw-r--r-- | packages/mcp/src/extension.test.ts | 341 | ||||
| -rw-r--r-- | packages/mcp/src/extension.ts | 227 | ||||
| -rw-r--r-- | packages/mcp/src/framing.test.ts | 92 | ||||
| -rw-r--r-- | packages/mcp/src/framing.ts | 100 | ||||
| -rw-r--r-- | packages/mcp/src/index.ts | 31 | ||||
| -rw-r--r-- | packages/mcp/src/manager.test.ts | 216 | ||||
| -rw-r--r-- | packages/mcp/src/manager.ts | 205 | ||||
| -rw-r--r-- | packages/mcp/src/registry.test.ts | 237 | ||||
| -rw-r--r-- | packages/mcp/src/registry.ts | 79 | ||||
| -rw-r--r-- | packages/mcp/src/rpc.test.ts | 117 | ||||
| -rw-r--r-- | packages/mcp/src/rpc.ts | 103 | ||||
| -rw-r--r-- | packages/mcp/src/transport.test.ts | 154 | ||||
| -rw-r--r-- | packages/mcp/src/transport.ts | 101 | ||||
| -rw-r--r-- | packages/mcp/src/types.ts | 87 | ||||
| -rw-r--r-- | packages/mcp/tsconfig.json | 6 | ||||
| -rw-r--r-- | tsconfig.json | 143 |
24 files changed, 2762 insertions, 35 deletions
@@ -62,6 +62,7 @@ "@dispatch/journal-sink": "workspace:*", "@dispatch/kernel": "workspace:*", "@dispatch/lsp": "workspace:*", + "@dispatch/mcp": "workspace:*", "@dispatch/message-queue": "workspace:*", "@dispatch/provider-openai-compat": "workspace:*", "@dispatch/provider-umans": "workspace:*", @@ -104,6 +105,14 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/mcp": { + "name": "@dispatch/mcp", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*", + }, + }, "packages/message-queue": { "name": "@dispatch/message-queue", "version": "0.0.0", @@ -349,6 +358,8 @@ "@dispatch/lsp": ["@dispatch/lsp@workspace:packages/lsp"], + "@dispatch/mcp": ["@dispatch/mcp@workspace:packages/mcp"], + "@dispatch/message-queue": ["@dispatch/message-queue@workspace:packages/message-queue"], "@dispatch/observability-collector": ["@dispatch/observability-collector@workspace:packages/observability-collector"], diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json index abc4a3e..568fd6c 100644 --- a/packages/host-bin/package.json +++ b/packages/host-bin/package.json @@ -13,6 +13,7 @@ "@dispatch/provider-openai-compat": "workspace:*", "@dispatch/provider-umans": "workspace:*", "@dispatch/message-queue": "workspace:*", + "@dispatch/mcp": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", "@dispatch/skills": "workspace:*", "@dispatch/throughput-store": "workspace:*", diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index c045e6b..bd79497 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -20,6 +20,7 @@ import { type StorageNamespace, } from "@dispatch/kernel"; import { extension as lspExt } from "@dispatch/lsp"; +import { extension as mcpExt } from "@dispatch/mcp"; import { extension as messageQueueExt } from "@dispatch/message-queue"; import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat"; import { extension as providerUmansExt } from "@dispatch/provider-umans"; @@ -84,6 +85,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [ throughputStoreExt, todoExt, messageQueueExt, + mcpExt, sessionOrchestratorExt, skillsExt, systemPromptExt, diff --git a/packages/mcp/package.json b/packages/mcp/package.json new file mode 100644 index 0000000..9f862fa --- /dev/null +++ b/packages/mcp/package.json @@ -0,0 +1,12 @@ +{ + "name": "@dispatch/mcp", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*" + } +} diff --git a/packages/mcp/src/client.test.ts b/packages/mcp/src/client.test.ts new file mode 100644 index 0000000..695bdcc --- /dev/null +++ b/packages/mcp/src/client.test.ts @@ -0,0 +1,206 @@ +import { describe, expect, it } from "vitest"; +import { McpClient } from "./client.js"; +import type { Connection } from "./transport.js"; + +function makeMockConnection(): Connection & { + responses: Map<string, unknown>; + feedResponse: (method: string, result: unknown) => void; + notifications: Array<{ method: string; params: unknown }>; +} { + const responses = new Map<string, unknown>(); + const pendingRequests = new Map<number, { method: string; resolve: (v: unknown) => void }>(); + let nextId = 1; + const notifications: Array<{ method: string; params: unknown }> = []; + const notificationHandlers = new Map<string, (params: unknown) => void>(); + + return { + responses, + notifications, + feedResponse: (_method: string, _result: unknown) => {}, + send: (method: string, _params?: unknown) => { + const id = nextId++; + return new Promise((resolve) => { + pendingRequests.set(id, { method, resolve }); + // Auto-respond for initialize + if (method === "initialize") { + resolve({ + protocolVersion: "2025-11-25", + capabilities: { tools: { listChanged: true } }, + serverInfo: { name: "test-server", version: "1.0.0" }, + }); + } else if (method === "tools/list") { + resolve({ + tools: [ + { + name: "test_tool", + description: "A test tool", + inputSchema: { type: "object", properties: { input: { type: "string" } } }, + }, + ], + }); + } else if (method === "tools/call") { + resolve({ + content: [{ type: "text", text: "result from tool" }], + isError: false, + }); + } + }); + }, + notify: (method: string, params?: unknown) => { + notifications.push({ method, params }); + }, + onNotification: (method: string, handler: (params: unknown) => void) => { + notificationHandlers.set(method, handler); + }, + close: () => {}, + pid: 999, + }; +} + +describe("McpClient", () => { + it("initialize sends correct protocolVersion + capabilities", async () => { + const conn = makeMockConnection(); + const client = new McpClient({ connection: conn }); + + const result = await client.initialize(); + + expect(result.protocolVersion).toBe("2025-11-25"); + expect(result.capabilities.tools?.listChanged).toBe(true); + expect(result.serverInfo.name).toBe("test-server"); + expect(client.getState()).toBe("connected"); + + // Should have sent notifications/initialized + expect(conn.notifications.length).toBe(1); + expect(conn.notifications[0].method).toBe("notifications/initialized"); + }); + + it("listTools returns parsed tools", async () => { + const conn = makeMockConnection(); + const client = new McpClient({ connection: conn }); + + await client.initialize(); + const tools = await client.listTools(); + + expect(tools.length).toBe(1); + expect(tools[0].name).toBe("test_tool"); + expect(tools[0].description).toBe("A test tool"); + }); + + it("callTool sends name + arguments", async () => { + const conn = makeMockConnection(); + let callParams: unknown = null; + const origSend = conn.send.bind(conn); + conn.send = (method: string, params?: unknown) => { + if (method === "tools/call") callParams = params; + return origSend(method, params); + }; + + const client = new McpClient({ connection: conn }); + + await client.initialize(); + const result = await client.callTool("test_tool", { input: "hello" }); + + expect(callParams).toEqual({ name: "test_tool", arguments: { input: "hello" } }); + expect(result.content).toEqual([{ type: "text", text: "result from tool" }]); + expect(result.isError).toBe(false); + }); + + it("list_changed triggers re-list", async () => { + const conn = makeMockConnection(); + const notificationHandlers = new Map<string, (params: unknown) => void>(); + conn.onNotification = (method: string, handler: (params: unknown) => void) => { + notificationHandlers.set(method, handler); + }; + + const client = new McpClient({ connection: conn }); + + let toolsChangedFired = false; + client.onToolsChanged(() => { + toolsChangedFired = true; + }); + + await client.initialize(); + + // Simulate list_changed notification + const handler = notificationHandlers.get("notifications/tools/list_changed"); + expect(handler).toBeDefined(); + handler?.(undefined); + + expect(toolsChangedFired).toBe(true); + }); + + it("handles server error on initialize", async () => { + const conn = makeMockConnection(); + conn.send = (method: string) => { + if (method === "initialize") { + return Promise.reject(new Error("Server startup failed")); + } + return Promise.resolve({}); + }; + + const client = new McpClient({ connection: conn }); + + await expect(client.initialize()).rejects.toThrow("Server startup failed"); + expect(client.getState()).toBe("error"); + }); + + it("callTool rejects when not connected", async () => { + const conn = makeMockConnection(); + const client = new McpClient({ connection: conn }); + + await expect(client.callTool("test", {})).rejects.toThrow("Client not connected"); + }); + + it("listTools rejects when not connected", async () => { + const conn = makeMockConnection(); + const client = new McpClient({ connection: conn }); + + await expect(client.listTools()).rejects.toThrow("Client not connected"); + }); + + it("close sets state to disconnected", async () => { + const conn = makeMockConnection(); + const client = new McpClient({ connection: conn }); + + await client.initialize(); + expect(client.getState()).toBe("connected"); + + client.close(); + expect(client.getState()).toBe("disconnected"); + }); + + it("callTool with abort signal", async () => { + const conn = makeMockConnection(); + let resolveRequest: ((v: unknown) => void) | null = null; + conn.send = (method: string) => { + if (method === "tools/call") { + return new Promise((resolve) => { + resolveRequest = resolve; + }); + } + if (method === "initialize") { + return Promise.resolve({ + protocolVersion: "2025-11-25", + capabilities: {}, + serverInfo: { name: "test", version: "1.0.0" }, + }); + } + return Promise.resolve({}); + }; + + const client = new McpClient({ connection: conn }); + await client.initialize(); + + const controller = new AbortController(); + const callPromise = client.callTool("test", {}, controller.signal); + + controller.abort(); + + await expect(callPromise).rejects.toThrow("Aborted"); + + // Clean up + resolveRequest?.({ + content: [{ type: "text", text: "too late" }], + }); + }); +}); diff --git a/packages/mcp/src/client.ts b/packages/mcp/src/client.ts new file mode 100644 index 0000000..17463d9 --- /dev/null +++ b/packages/mcp/src/client.ts @@ -0,0 +1,123 @@ +/** + * McpClient — MCP protocol client. + * + * Manages a single MCP server connection: initialize handshake, + * tool discovery, tool invocation, and list_changed notifications. + */ + +import type { Connection } from "./transport.js"; +import type { + McpCallResult, + McpInitializeResult, + McpListToolsResult, + McpServerCapabilities, + McpToolInfo, +} from "./types.js"; + +export type McpClientState = "disconnected" | "connecting" | "connected" | "error"; + +export interface McpClientDeps { + readonly connection: Connection; +} + +export class McpClient { + private state: McpClientState = "disconnected"; + private capabilities: McpServerCapabilities = {}; + private tools: readonly McpToolInfo[] = []; + private connection: Connection; + private toolsChangedHandler: (() => void) | null = null; + + constructor(deps: McpClientDeps) { + this.connection = deps.connection; + } + + getState(): McpClientState { + return this.state; + } + + getCapabilities(): McpServerCapabilities { + return this.capabilities; + } + + getTools(): readonly McpToolInfo[] { + return this.tools; + } + + onToolsChanged(handler: () => void): void { + this.toolsChangedHandler = handler; + } + + async initialize(): Promise<McpInitializeResult> { + this.state = "connecting"; + try { + const result = (await this.connection.send("initialize", { + protocolVersion: "2025-11-25", + capabilities: {}, + clientInfo: { name: "dispatch", version: "0.0.0" }, + })) as McpInitializeResult; + + this.capabilities = result.capabilities; + this.connection.notify("notifications/initialized", {}); + + this.connection.onNotification("notifications/tools/list_changed", () => { + if (this.toolsChangedHandler) { + this.toolsChangedHandler(); + } + }); + + this.state = "connected"; + return result; + } catch (err: unknown) { + this.state = "error"; + throw err; + } + } + + async listTools(): Promise<readonly McpToolInfo[]> { + if (this.state !== "connected") { + throw new Error("Client not connected"); + } + const result = (await this.connection.send("tools/list")) as McpListToolsResult; + this.tools = result.tools; + return this.tools; + } + + async callTool(name: string, args: unknown, signal?: AbortSignal): Promise<McpCallResult> { + if (this.state !== "connected") { + throw new Error("Client not connected"); + } + + if (signal?.aborted) { + throw new Error("Aborted"); + } + + const resultPromise = this.connection.send("tools/call", { + name, + arguments: args, + }) as Promise<McpCallResult>; + + if (!signal) { + return resultPromise; + } + + return new Promise<McpCallResult>((resolve, reject) => { + const onAbort = () => reject(new Error("Aborted")); + signal.addEventListener("abort", onAbort, { once: true }); + resultPromise.then( + (result) => { + signal.removeEventListener("abort", onAbort); + resolve(result); + }, + (err) => { + signal.removeEventListener("abort", onAbort); + reject(err); + }, + ); + }); + } + + close(): void { + this.state = "disconnected"; + this.connection.close(); + } +} diff --git a/packages/mcp/src/config.test.ts b/packages/mcp/src/config.test.ts new file mode 100644 index 0000000..38c9b50 --- /dev/null +++ b/packages/mcp/src/config.test.ts @@ -0,0 +1,114 @@ +import { describe, expect, it } from "vitest"; +import { resolveServers } from "./config.js"; + +describe("resolveServers", () => { + it("resolves from .dispatch/mcp.json", () => { + const dispatchConfig = JSON.stringify({ + servers: { + freecad: { command: "uvx", args: ["freecad-mcp"], env: { KEY: "val" } }, + }, + }); + + const result = resolveServers({ dispatchMcpJson: dispatchConfig, opencodeJson: null }); + + expect(result.servers.length).toBe(1); + expect(result.servers[0].id).toBe("freecad"); + expect(result.servers[0].command).toEqual(["uvx", "freecad-mcp"]); + expect(result.servers[0].env).toEqual({ KEY: "val" }); + expect(result.servers[0].configSource).toBe(".dispatch/mcp.json"); + expect(result.shadowed).toBe(false); + }); + + it("falls back to opencode.json mcp key", () => { + const opencodeConfig = JSON.stringify({ + mcp: { + chrome: { command: "npx", args: ["chrome-devtools-mcp@latest"] }, + }, + }); + + const result = resolveServers({ dispatchMcpJson: null, opencodeJson: opencodeConfig }); + + expect(result.servers.length).toBe(1); + expect(result.servers[0].id).toBe("chrome"); + expect(result.servers[0].command).toEqual(["npx", "chrome-devtools-mcp@latest"]); + expect(result.servers[0].configSource).toBe("opencode.json"); + expect(result.shadowed).toBe(false); + }); + + it("shadow warning when both present", () => { + const dispatchConfig = JSON.stringify({ + servers: { + freecad: { command: "uvx", args: ["freecad-mcp"] }, + }, + }); + const opencodeConfig = JSON.stringify({ + mcp: { + chrome: { command: "npx", args: ["chrome-devtools-mcp@latest"] }, + }, + }); + + const result = resolveServers({ + dispatchMcpJson: dispatchConfig, + opencodeJson: opencodeConfig, + }); + + expect(result.servers.length).toBe(1); + expect(result.servers[0].id).toBe("freecad"); + expect(result.shadowed).toBe(true); + }); + + it("empty when neither present", () => { + const result = resolveServers({ dispatchMcpJson: null, opencodeJson: null }); + + expect(result.servers.length).toBe(0); + expect(result.shadowed).toBe(false); + }); + + it("empty when dispatch has no servers key", () => { + const result = resolveServers({ + dispatchMcpJson: JSON.stringify({}), + opencodeJson: null, + }); + + expect(result.servers.length).toBe(0); + expect(result.shadowed).toBe(false); + }); + + it("handles malformed JSON gracefully", () => { + const result = resolveServers({ + dispatchMcpJson: "not valid json", + opencodeJson: "{ also bad", + }); + + expect(result.servers.length).toBe(0); + expect(result.shadowed).toBe(false); + }); + + it("server without args", () => { + const dispatchConfig = JSON.stringify({ + servers: { + simple: { command: "my-server" }, + }, + }); + + const result = resolveServers({ dispatchMcpJson: dispatchConfig, opencodeJson: null }); + + expect(result.servers.length).toBe(1); + expect(result.servers[0].command).toEqual(["my-server"]); + }); + + it("multiple servers from dispatch", () => { + const dispatchConfig = JSON.stringify({ + servers: { + a: { command: "server-a" }, + b: { command: "server-b", args: ["--port", "3000"] }, + }, + }); + + const result = resolveServers({ dispatchMcpJson: dispatchConfig, opencodeJson: null }); + + expect(result.servers.length).toBe(2); + const ids = result.servers.map((s) => s.id).sort(); + expect(ids).toEqual(["a", "b"]); + }); +}); diff --git a/packages/mcp/src/config.ts b/packages/mcp/src/config.ts new file mode 100644 index 0000000..20ed749 --- /dev/null +++ b/packages/mcp/src/config.ts @@ -0,0 +1,89 @@ +/** + * PURE config resolution — resolve MCP server configurations. + * + * Sources, in precedence order: + * 1. cwd/.dispatch/mcp.json servers + * 2. fallback cwd/opencode.json mcp key + * + * No I/O — callers pass the file contents as strings. + */ + +import type { McpServerConfig, ResolvedMcpServer, ResolveResult } from "./types.js"; + +export interface ResolveServersDeps { + readonly dispatchMcpJson: string | null; + readonly opencodeJson: string | null; +} + +export interface DispatchMcpConfig { + readonly servers?: Readonly<Record<string, McpServerConfig>>; +} + +export interface OpencodeJsonConfig { + readonly mcp?: Readonly<Record<string, McpServerConfig>>; +} + +export function resolveServers(deps: ResolveServersDeps): ResolveResult { + const result = new Map<string, ResolvedMcpServer>(); + + // Parse opencode.json once — used both as the fallback source and to detect + // whether a present `.dispatch/mcp.json` silently shadows its `mcp` key. + let opencodeConfig: OpencodeJsonConfig | null = null; + if (deps.opencodeJson) { + try { + opencodeConfig = JSON.parse(deps.opencodeJson) as OpencodeJsonConfig; + } catch { + // ignore parse errors + } + } + const opencodeHasMcp = !!opencodeConfig?.mcp && Object.keys(opencodeConfig.mcp).length > 0; + + // 1. cwd/.dispatch/mcp.json (highest precedence) + let dispatchHadServers = false; + if (deps.dispatchMcpJson) { + try { + const config = JSON.parse(deps.dispatchMcpJson) as DispatchMcpConfig; + if (config.servers) { + for (const [key, server] of Object.entries(config.servers)) { + const resolved = resolveServer(key, server, ".dispatch/mcp.json"); + result.set(resolved.id, resolved); + } + dispatchHadServers = result.size > 0; + } + } catch { + // ignore parse errors + } + } + + // 2. fallback cwd/opencode.json mcp key (only when dispatch yielded nothing) + if (result.size === 0 && opencodeConfig?.mcp) { + for (const [key, server] of Object.entries(opencodeConfig.mcp)) { + const resolved = resolveServer(key, server, "opencode.json"); + result.set(resolved.id, resolved); + } + } + + // No built-in servers — MCP has no built-in registry. + + // `.dispatch/mcp.json` silently shadows `opencode.json`'s mcp key when both + // declare servers — the opencode entry is skipped with no warning otherwise. + const shadowed = dispatchHadServers && opencodeHasMcp; + return { servers: [...result.values()], shadowed }; +} + +function resolveServer( + key: string, + config: McpServerConfig, + configSource: ".dispatch/mcp.json" | "opencode.json", +): ResolvedMcpServer { + const command = [config.command, ...(config.args ?? [])]; + const result: ResolvedMcpServer = { + id: key, + command, + configSource, + }; + if (config.env) { + (result as { env?: Readonly<Record<string, string>> }).env = config.env; + } + return result; +} diff --git a/packages/mcp/src/extension.test.ts b/packages/mcp/src/extension.test.ts new file mode 100644 index 0000000..e2d2eab --- /dev/null +++ b/packages/mcp/src/extension.test.ts @@ -0,0 +1,341 @@ +import type { Extension, HostAPI, Logger, ToolContract } from "@dispatch/kernel"; +import type { ToolAssembly, toolsFilter } from "@dispatch/session-orchestrator"; +import { describe, expect, it } from "vitest"; +import { filterMcpTools, makeMcpExtension } from "./extension.js"; +import { encode, FrameDecoder } from "./framing.js"; +import type { SpawnedProcess, SpawnProcess } from "./transport.js"; +import type { McpToolInfo } from "./types.js"; + +// --------------------------------------------------------------------------- +// Pure filterMcpTools +// --------------------------------------------------------------------------- + +const stubTool = (name: string): ToolContract => ({ + name, + description: "", + parameters: { type: "object" }, + execute: async () => ({ content: "" }), +}); + +describe("filterMcpTools (pure)", () => { + it("keeps non-MCP tools and connected-server tools, removes disconnected-server tools", () => { + const toolToServer = new Map<string, string>([ + ["a__x", "a"], + ["b__y", "b"], + ]); + const connected = new Set<string>(["a"]); + + const result = filterMcpTools( + { + tools: [stubTool("a__x"), stubTool("b__y"), stubTool("other")], + cwd: "/p", + conversationId: "c", + }, + toolToServer, + connected, + ); + + expect(result.tools.map((t) => t.name).sort()).toEqual(["a__x", "other"]); + expect(result.cwd).toBe("/p"); + expect(result.conversationId).toBe("c"); + }); + + it("removes all MCP tools when no server is connected", () => { + const result = filterMcpTools( + { tools: [stubTool("a__x")], conversationId: "c" }, + new Map<string, string>([["a__x", "a"]]), + new Set<string>(), + ); + expect(result.tools).toHaveLength(0); + expect(result.conversationId).toBe("c"); + expect(result.cwd).toBeUndefined(); + }); +}); + +// --------------------------------------------------------------------------- +// In-memory MCP server + fake spawn (integration) +// --------------------------------------------------------------------------- + +interface FakeServer { + tools: McpToolInfo[]; + failInitialize: boolean; + emitListChanged: () => void; +} + +/** + * Build a fake spawn backed by a shared `FakeServer`. Incoming framed + * JSON-RPC on stdin is answered with framed responses on stdout, exercising + * transport → framing → rpc → client → manager end to end. + */ +function makeFakeSpawn(server: FakeServer): SpawnProcess { + const decoder = new FrameDecoder(); + let dataListeners: Array<(data: Uint8Array) => void> = []; + + const emit = (frame: Uint8Array) => { + for (const cb of dataListeners) cb(frame); + }; + + const spawn: SpawnProcess = (_command, _opts) => { + // Each spawn is a fresh process; reset listeners so a reconnect (after + // shutdown) doesn't feed closed rpc instances. + dataListeners = []; + const process: SpawnedProcess = { + stdin: { + write: (bytes: Uint8Array) => { + for (const msg of decoder.decode(bytes)) { + const parsed = JSON.parse(msg) as { + id?: number; + method?: string; + params?: unknown; + }; + const id = parsed.id ?? 0; + const method = parsed.method; + if (method === "initialize") { + if (server.failInitialize) { + emit( + encode( + JSON.stringify({ + jsonrpc: "2.0", + id, + error: { code: -32603, message: "initialize failed" }, + }), + ), + ); + } else { + emit( + encode( + JSON.stringify({ + jsonrpc: "2.0", + id, + result: { + protocolVersion: "2025-11-25", + capabilities: { tools: { listChanged: true } }, + serverInfo: { name: "fake", version: "0.0.0" }, + }, + }), + ), + ); + } + } else if (method === "tools/list") { + emit(encode(JSON.stringify({ jsonrpc: "2.0", id, result: { tools: server.tools } }))); + } else if (method === "tools/call") { + emit( + encode( + JSON.stringify({ + jsonrpc: "2.0", + id, + result: { content: [{ type: "text", text: "ok" }], isError: false }, + }), + ), + ); + } + // notifications (notifications/initialized): no response. + } + }, + }, + stdout: { + on: (event: string, cb: (data: Uint8Array) => void) => { + if (event === "data") dataListeners.push(cb); + }, + }, + pid: 7000, + kill: () => {}, + }; + return process; + }; + + server.emitListChanged = () => { + emit(encode(JSON.stringify({ jsonrpc: "2.0", method: "notifications/tools/list_changed" }))); + }; + + return spawn; +} + +// --------------------------------------------------------------------------- +// Minimal fake HostAPI +// --------------------------------------------------------------------------- + +function makeFakeHost(): { + host: HostAPI; + tools: Map<string, ToolContract>; + getFilter: () => ((a: ToolAssembly) => Promise<ToolAssembly>) | null; + getService: () => unknown; +} { + const tools = new Map<string, ToolContract>(); + let filterFn: ((a: ToolAssembly) => Promise<ToolAssembly>) | null = null; + let service: unknown = null; + + const noopSpan = { + id: "s", + log: {} as Logger, + setAttributes: () => {}, + addLink: () => {}, + child: () => noopSpan, + end: () => {}, + }; + const noopLogger: Logger = { + info: () => {}, + warn: () => {}, + error: () => {}, + debug: () => {}, + child: () => noopLogger, + span: () => noopSpan, + }; + + const host = { + defineTool: (t: ToolContract) => { + tools.set(t.name, t); + }, + addFilter: (_hook: typeof toolsFilter, fn: (a: ToolAssembly) => Promise<ToolAssembly>) => { + filterFn = fn; + return () => { + filterFn = null; + }; + }, + provideService: (_handle: unknown, impl: unknown) => { + service = impl; + }, + getService: () => service, + getTools: () => tools, + logger: noopLogger, + } as unknown as HostAPI; + + return { host, tools, getFilter: () => filterFn, getService: () => service }; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const dispatchConfig = (servers: Record<string, unknown>): string => JSON.stringify({ servers }); + +const tool = (name: string, description = name): McpToolInfo => ({ + name, + description, + inputSchema: { type: "object" }, +}); + +const assembly = (tools: ToolContract[], cwd = "/proj"): ToolAssembly => ({ + tools, + cwd, + conversationId: "conv-1", +}); + +const flush = () => new Promise((r) => setTimeout(r, 0)); + +function makeServer(initialTools: McpToolInfo[]): FakeServer { + return { tools: [...initialTools], failInitialize: false, emitListChanged: () => {} }; +} + +function makeExt(server: FakeServer, configJson: string): Extension { + return makeMcpExtension({ + spawn: makeFakeSpawn(server), + readFile: async (path) => (path.endsWith(".dispatch/mcp.json") ? configJson : null), + getCwd: () => "/proj", + }); +} + +// --------------------------------------------------------------------------- +// Lifecycle tests +// --------------------------------------------------------------------------- +describe("mcp extension lifecycle", () => { + /** Get the registered filter, throwing if activation did not register one. */ + function requireFilter(getFilter: () => ((a: ToolAssembly) => Promise<ToolAssembly>) | null) { + const filter = getFilter(); + if (!filter) throw new Error("toolsFilter was not registered"); + return filter; + } + + /** Look up a registered tool, throwing if absent. */ + function requireTool(tools: Map<string, ToolContract>, name: string) { + const t = tools.get(name); + if (!t) throw new Error(`tool ${name} not registered`); + return t; + } + + it("registers tools on connect", async () => { + const server = makeServer([tool("create_object", "Create an object")]); + const ext = makeExt(server, dispatchConfig({ freecad: { command: "fake" } })); + const { host, tools, getFilter } = makeFakeHost(); + + ext.activate(host); + const filter = requireFilter(getFilter); + + // Running the filter triggers lazy connect + register. + await filter(assembly([])); + + expect(tools.has("freecad__create_object")).toBe(true); + const t = requireTool(tools, "freecad__create_object"); + expect(t.description).toBe("[freecad] Create an object"); + expect(t.concurrencySafe).toBe(false); + ext.deactivate?.(); + }); + + it("toolsFilter keeps connected-server tools and removes disconnected-server tools", async () => { + const server = makeServer([tool("create_object")]); + const ext = makeExt(server, dispatchConfig({ freecad: { command: "fake" } })); + const { host, tools, getFilter } = makeFakeHost(); + ext.activate(host); + const filter = requireFilter(getFilter); + + // Connect + register the tool. + await filter(assembly([])); + const registered = requireTool(tools, "freecad__create_object"); + + // Connected server → tool passes through the filter. + const kept = await filter(assembly([registered])); + expect(kept.tools.map((t) => t.name)).toContain("freecad__create_object"); + + // Disconnect: deactivate shuts down the client (clearing it), then make + // the server fail to reconnect. toolToServer still maps the tool, so the + // filter drops it because the server is no longer connected. + ext.deactivate?.(); + server.failInitialize = true; + + const removed = await filter(assembly([registered])); + expect(removed.tools.map((t) => t.name)).not.toContain("freecad__create_object"); + }); + + it("re-registers tools on list_changed", async () => { + const server = makeServer([tool("first_tool")]); + const ext = makeExt(server, dispatchConfig({ freecad: { command: "fake" } })); + const { host, tools, getFilter } = makeFakeHost(); + ext.activate(host); + const filter = requireFilter(getFilter); + + await filter(assembly([])); + expect(tools.has("freecad__first_tool")).toBe(true); + + // Server changes its tool set, then announces list_changed. + server.tools = [tool("first_tool"), tool("second_tool", "The second")]; + server.emitListChanged(); + + // Let the async onToolsChanged handler (re-list + re-register) flush. + await flush(); + + expect(tools.has("freecad__second_tool")).toBe(true); + expect(requireTool(tools, "freecad__second_tool").description).toBe("[freecad] The second"); + ext.deactivate?.(); + }); + + it("deactivate shuts down all clients", async () => { + const server = makeServer([tool("create_object")]); + const ext = makeExt(server, dispatchConfig({ freecad: { command: "fake" } })); + const { host, getFilter, getService } = makeFakeHost(); + ext.activate(host); + const filter = requireFilter(getFilter); + + await filter(assembly([])); + + const service = getService() as { + status: (cwd: string) => Promise<readonly { state: string }[]>; + }; + const before = await service.status("/proj"); + expect(before[0].state).toBe("connected"); + + ext.deactivate?.(); + + const after = await service.status("/proj"); + expect(after[0].state).toBe("disconnected"); + }); +}); diff --git a/packages/mcp/src/extension.ts b/packages/mcp/src/extension.ts new file mode 100644 index 0000000..9adb879 --- /dev/null +++ b/packages/mcp/src/extension.ts @@ -0,0 +1,227 @@ +/** + * MCP extension — manifest + activate(host). + * + * Builds the manager with real adapters, registers MCP tools via host.defineTool + * (on connect + on list_changed re-list), registers a toolsFilter that drops + * tools from disconnected/errored servers, and provides the mcpServiceHandle. + * + * The production `extension` wires real Bun adapters. `makeMcpExtension(deps)` + * accepts injectable spawn/readFile/getCwd so the lifecycle is testable against + * in-memory backends (mirrors the LSP extension's injected adapters). + */ + +import type { Extension, HostAPI, ServiceHandle } from "@dispatch/kernel"; +import { defineService } from "@dispatch/kernel"; +import { type ToolAssembly, toolsFilter } from "@dispatch/session-orchestrator"; +import type { McpClient } from "./client.js"; +import { resolveServers } from "./config.js"; +import type { Logger } from "./manager.js"; +import { McpManager } from "./manager.js"; +import { adaptTool, namespace } from "./registry.js"; +import type { SpawnedProcess, SpawnProcess } from "./transport.js"; +import { createStdioTransport } from "./transport.js"; +import type { McpServerStatus, McpService, ResolvedMcpServer } from "./types.js"; + +export const mcpServiceHandle: ServiceHandle<McpService> = defineService<McpService>("mcp"); + +/** Filesystem + process adapters injected into the extension for testability. */ +export interface McpExtensionDeps { + readonly spawn: SpawnProcess; + readonly readFile: (path: string) => Promise<string | null>; + readonly getCwd: () => string; +} + +/** + * Pure tool-filter logic: remove MCP tools whose owning server is not + * currently connected. Non-MCP tools (no entry in `toolToServer`) are kept. + * Extracted from the filter handler so it is unit-testable without I/O. + */ +export function filterMcpTools( + assembly: ToolAssembly, + toolToServer: ReadonlyMap<string, string>, + connectedServerIds: ReadonlySet<string>, +): ToolAssembly { + const filtered = assembly.tools.filter((tool) => { + const serverId = toolToServer.get(tool.name); + if (serverId === undefined) return true; + return connectedServerIds.has(serverId); + }); + return { + tools: filtered, + ...(assembly.cwd !== undefined && { cwd: assembly.cwd }), + conversationId: assembly.conversationId, + }; +} + +/** Map a host Logger to the manager's narrower Logger surface. */ +function wrapLogger(logger: HostAPI["logger"]): Logger { + return { + info: (msg, attrs) => logger.info(msg, attrs), + warn: (msg, attrs) => logger.warn(msg, attrs), + error: (msg, attrs) => logger.error(msg, attrs), + }; +} + +export function makeMcpExtension(deps: McpExtensionDeps): Extension { + // Module-scoped store so deactivate can reach the manager. Lives in the + // factory closure so each built extension has its own. + const store: { manager: McpManager | null } = { manager: null }; + + return { + manifest: { + id: "mcp", + name: "Model Context Protocol", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + activation: "eager", + dependsOn: ["session-orchestrator"], + capabilities: { spawn: true }, + contributes: { tools: [], services: ["mcp"] }, + }, + activate(host: HostAPI) { + const logger = host.logger; + + const connectionFactory = (server: ResolvedMcpServer, cwd: string) => { + return createStdioTransport( + { + spawn: deps.spawn, + command: server.command, + ...(server.env !== undefined && { env: server.env }), + }, + cwd, + ); + }; + + const manager = new McpManager( + { spawn: deps.spawn, logger: wrapLogger(logger) }, + connectionFactory, + ); + + // Track which tool names belong to which server for the filter. + const toolToServer = new Map<string, string>(); + + function registerToolsFromClient(serverId: string, client: McpClient): void { + const tools = client.getTools(); + for (const mcpTool of tools) { + const name = namespace(serverId, mcpTool.name); + toolToServer.set(name, serverId); + host.defineTool(adaptTool(serverId, mcpTool, client)); + } + } + + async function connectAndRegister(server: ResolvedMcpServer, cwd: string): Promise<void> { + const client = await manager.ensureConnected(server, cwd); + registerToolsFromClient(server.id, client); + + // Wire list_changed → re-list → re-register. onToolsChanged replaces + // the handler; ensureConnected returns the same cached client so this + // is idempotent across turns. + client.onToolsChanged(async () => { + try { + await client.listTools(); + registerToolsFromClient(server.id, client); + } catch (err: unknown) { + logger.error("MCP tools re-list failed", { + serverId: server.id, + error: err instanceof Error ? err.message : String(err), + }); + } + }); + } + + // Resolve config + ensure servers connected, then drop tools whose + // server is not connected. Lazy-spawn happens here (first turn). + host.addFilter(toolsFilter, async (assembly: ToolAssembly): Promise<ToolAssembly> => { + const cwd = assembly.cwd ?? deps.getCwd(); + const dispatchMcpJson = await deps.readFile(joinPath(cwd, ".dispatch", "mcp.json")); + const opencodeJson = await deps.readFile(joinPath(cwd, "opencode.json")); + const { servers } = resolveServers({ dispatchMcpJson, opencodeJson }); + + for (const server of servers) { + try { + await connectAndRegister(server, cwd); + } catch { + // Connection failure — the manager tracks broken state. + } + } + + const statuses = manager.status(servers); + const connectedIds = new Set( + statuses.filter((s) => s.state === "connected").map((s) => s.id), + ); + + return filterMcpTools(assembly, toolToServer, connectedIds); + }); + + // Provide the MCP service (status introspection). + const service: McpService = { + async status(cwd: string): Promise<readonly McpServerStatus[]> { + const dispatchMcpJson = await deps.readFile(joinPath(cwd, ".dispatch", "mcp.json")); + const opencodeJson = await deps.readFile(joinPath(cwd, "opencode.json")); + const { servers } = resolveServers({ dispatchMcpJson, opencodeJson }); + return manager.status(servers); + }, + }; + host.provideService(mcpServiceHandle, service); + + store.manager = manager; + logger.info("MCP extension activated"); + }, + deactivate() { + store.manager?.shutdownAll(); + store.manager = null; + }, + }; +} + +// --- real Bun-backed adapters (production wiring) --- + +function realSpawn( + command: readonly string[], + opts: { readonly cwd: string; readonly env?: Readonly<Record<string, string>> | undefined }, +): SpawnedProcess { + const env: Record<string, string | undefined> = { ...process.env }; + if (opts.env) { + for (const [key, value] of Object.entries(opts.env)) { + env[key] = value; + } + } + const proc = Bun.spawn(command as string[], { + cwd: opts.cwd, + env: env as Record<string, string>, + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }); + return { + stdin: proc.stdin, + stdout: proc.stdout, + stderr: proc.stderr, + pid: proc.pid, + kill: () => proc.kill(), + }; +} + +async function realReadFile(path: string): Promise<string | null> { + try { + const file = Bun.file(path); + if (await file.exists()) { + return file.text(); + } + return null; + } catch { + return null; + } +} + +function joinPath(...parts: readonly string[]): string { + return parts.join("/"); +} + +/** Production extension: real Bun spawn + filesystem reads. */ +export const extension: Extension = makeMcpExtension({ + spawn: realSpawn, + readFile: realReadFile, + getCwd: () => process.cwd(), +}); diff --git a/packages/mcp/src/framing.test.ts b/packages/mcp/src/framing.test.ts new file mode 100644 index 0000000..be8cb8e --- /dev/null +++ b/packages/mcp/src/framing.test.ts @@ -0,0 +1,92 @@ +import { describe, expect, it } from "vitest"; +import { encode, FrameDecoder } from "./framing.js"; + +describe("encode", () => { + it("produces correct Content-Length header", () => { + const msg = '{"jsonrpc":"2.0","id":1,"method":"initialize"}'; + const encoded = encode(msg); + const text = new TextDecoder().decode(encoded); + expect(text).toBe(`Content-Length: ${new TextEncoder().encode(msg).length}\r\n\r\n${msg}`); + }); + + it("handles empty message", () => { + const encoded = encode(""); + const text = new TextDecoder().decode(encoded); + expect(text).toBe("Content-Length: 0\r\n\r\n"); + }); +}); + +describe("FrameDecoder", () => { + it("reassembles a complete message from one chunk", () => { + const msg = '{"jsonrpc":"2.0","id":1}'; + const encoded = encode(msg); + const decoder = new FrameDecoder(); + const messages = decoder.decode(encoded); + expect(messages).toEqual([msg]); + }); + + it("handles split across chunks", () => { + const msg = '{"jsonrpc":"2.0","id":1,"method":"initialize"}'; + const encoded = encode(msg); + const mid = Math.floor(encoded.length / 2); + const chunk1 = encoded.slice(0, mid); + const chunk2 = encoded.slice(mid); + + const decoder = new FrameDecoder(); + const result1 = decoder.decode(chunk1); + expect(result1).toEqual([]); + + const result2 = decoder.decode(chunk2); + expect(result2).toEqual([msg]); + }); + + it("handles two messages in one chunk", () => { + const msg1 = '{"jsonrpc":"2.0","id":1}'; + const msg2 = '{"jsonrpc":"2.0","id":2}'; + const encoded1 = encode(msg1); + const encoded2 = encode(msg2); + const combined = new Uint8Array(encoded1.length + encoded2.length); + combined.set(encoded1); + combined.set(encoded2, encoded1.length); + + const decoder = new FrameDecoder(); + const messages = decoder.decode(combined); + expect(messages).toEqual([msg1, msg2]); + }); + + it("rejects negative Content-Length by skipping header", () => { + const header = "Content-Length: -5\r\n\r\n"; + const encoded = new TextEncoder().encode(`${header}extra`); + const decoder = new FrameDecoder(); + const messages = decoder.decode(encoded); + // Negative length does not match the digit capture, so the header is skipped. + expect(messages).toEqual([]); + }); + + it("rejects zero Content-Length", () => { + const encoded = encode(""); + const decoder = new FrameDecoder(); + const messages = decoder.decode(encoded); + expect(messages).toEqual([""]); + }); + + it("reassembles multi-byte UTF-8 content (byte-length, not char-length)", () => { + // "héllo" — é is two UTF-8 bytes; Content-Length counts bytes. + const msg = '{"text":"héllo 🚀"}'; + const encoded = encode(msg); + expect(new TextEncoder().encode(msg).length).toBeGreaterThan(msg.length); + + const decoder = new FrameDecoder(); + const messages = decoder.decode(encoded); + expect(messages).toEqual([msg]); + }); + + it("reassembles multi-byte content split across a chunk boundary", () => { + const msg = '{"text":"日本語のテスト"}'; + const encoded = encode(msg); + const mid = Math.floor(encoded.length / 2); + const decoder = new FrameDecoder(); + expect(decoder.decode(encoded.slice(0, mid))).toEqual([]); + expect(decoder.decode(encoded.slice(mid))).toEqual([msg]); + }); +}); diff --git a/packages/mcp/src/framing.ts b/packages/mcp/src/framing.ts new file mode 100644 index 0000000..a6e8b99 --- /dev/null +++ b/packages/mcp/src/framing.ts @@ -0,0 +1,100 @@ +/** + * Content-Length framing for MCP stdio transport. + * + * Each JSON-RPC message is framed as: + * Content-Length: <byte-length>\r\n\r\n<JSON bytes> + * + * Same framing as LSP — the MCP spec inherited this from the LSP base protocol. + * + * PURE: no I/O. Operates on bytes (Uint8Array) so multi-byte UTF-8 content is + * handled correctly — `Content-Length` is a *byte* count, not a character count. + */ + +const HEADER_SEP = "\r\n\r\n"; +const CONTENT_LENGTH_RE = /Content-Length:\s*(\d+)/i; + +const SEP_BYTES = new TextEncoder().encode(HEADER_SEP); + +/** + * Encode a JSON string into a single Content-Length-framed message. + * Returns the full frame (header + blank line + body) as bytes. + */ +export function encode(msg: string): Uint8Array { + const body = new TextEncoder().encode(msg); + const header = `Content-Length: ${body.length}\r\n\r\n`; + const frame = new TextEncoder().encode(header); + const result = new Uint8Array(frame.length + body.length); + result.set(frame); + result.set(body, frame.length); + return result; +} + +/** Find the first occurrence of `needle` in `haystack` at or after `from`. -1 if absent. */ +function indexOfBytes(haystack: Uint8Array, needle: Uint8Array, from: number): number { + if (needle.length === 0) return from; + const max = haystack.length - needle.length; + for (let i = from; i <= max; i++) { + let match = true; + for (let j = 0; j < needle.length; j++) { + if (haystack[i + j] !== needle[j]) { + match = false; + break; + } + } + if (match) return i; + } + return -1; +} + +/** + * Feed raw bytes into the decoder. Returns all complete JSON messages that can + * be extracted from the accumulated buffer. Buffers partial frames across calls. + */ +export class FrameDecoder { + private buf: Uint8Array = new Uint8Array(0); + private readonly decoder = new TextDecoder(); + + decode(chunk: Uint8Array): string[] { + // Append the incoming chunk to the internal byte buffer. + const next = new Uint8Array(this.buf.length + chunk.length); + next.set(this.buf); + next.set(chunk, this.buf.length); + this.buf = next; + + const messages: string[] = []; + + while (true) { + const sepIdx = indexOfBytes(this.buf, SEP_BYTES, 0); + if (sepIdx === -1) break; + + // The header block is everything before the separator; parse + // Content-Length from it (ASCII, so decoding the slice is safe). + const headerText = this.decoder.decode(this.buf.subarray(0, sepIdx)); + const match = CONTENT_LENGTH_RE.exec(headerText); + const bodyStart = sepIdx + SEP_BYTES.length; + + if (!match?.[1]) { + // No usable Content-Length — drop this header and continue scanning. + this.buf = this.buf.subarray(bodyStart); + continue; + } + + const length = Number.parseInt(match[1], 10); + if (length < 0) { + this.buf = this.buf.subarray(bodyStart); + continue; + } + + if (this.buf.length - bodyStart < length) { + // Body not fully received yet; wait for more bytes. + break; + } + + // Decode exactly `length` body bytes (preserves multi-byte UTF-8). + messages.push(this.decoder.decode(this.buf.subarray(bodyStart, bodyStart + length))); + this.buf = this.buf.subarray(bodyStart + length); + } + + return messages; + } +} diff --git a/packages/mcp/src/index.ts b/packages/mcp/src/index.ts new file mode 100644 index 0000000..1f2667d --- /dev/null +++ b/packages/mcp/src/index.ts @@ -0,0 +1,31 @@ +export { McpClient, type McpClientState } from "./client.js"; +export { type ResolveServersDeps, resolveServers } from "./config.js"; +export { + extension, + filterMcpTools, + type McpExtensionDeps, + makeMcpExtension, + mcpServiceHandle, +} from "./extension.js"; +export { encode, FrameDecoder } from "./framing.js"; +export { type Logger, McpManager, type McpManagerDeps } from "./manager.js"; +export { adaptTool, flattenContent, namespace } from "./registry.js"; +export { + type Connection, + createStdioTransport, + type SpawnedProcess, + type SpawnProcess, +} from "./transport.js"; +export type { + McpCallResult, + McpContentItem, + McpServerCapabilities, + McpServerConfig, + McpServerState, + McpServerStatus, + McpService, + McpToolCaller, + McpToolInfo, + ResolvedMcpServer, + ResolveResult, +} from "./types.js"; diff --git a/packages/mcp/src/manager.test.ts b/packages/mcp/src/manager.test.ts new file mode 100644 index 0000000..275b078 --- /dev/null +++ b/packages/mcp/src/manager.test.ts @@ -0,0 +1,216 @@ +import { describe, expect, it } from "vitest"; +import { McpManager, type McpManagerDeps } from "./manager.js"; +import type { Connection } from "./transport.js"; +import type { ResolvedMcpServer } from "./types.js"; + +function makeMockConnection(): Connection { + return { + send: async (method: string) => { + if (method === "initialize") { + return { + protocolVersion: "2025-11-25", + capabilities: { tools: { listChanged: false } }, + serverInfo: { name: "test", version: "1.0.0" }, + }; + } + if (method === "tools/list") { + return { + tools: [ + { + name: "tool_a", + description: "Tool A", + inputSchema: { type: "object" }, + }, + ], + }; + } + return {}; + }, + notify: () => {}, + onNotification: () => {}, + close: () => {}, + pid: 100, + }; +} + +function makeBrokenConnection(): Connection { + return { + send: async () => { + throw new Error("Connection refused"); + }, + notify: () => {}, + onNotification: () => {}, + close: () => {}, + pid: 101, + }; +} + +function makeManager( + connectionFactory: (_server: ResolvedMcpServer) => { + connection: Connection; + promise: Promise<void>; + }, +): McpManager { + const currentTime = 1000; + const deps: McpManagerDeps = { + spawn: () => ({ + stdin: { write: () => {} }, + stdout: { on: () => {} }, + pid: 1, + kill: () => {}, + }), + now: () => currentTime, + }; + + const factory = (_server: ResolvedMcpServer, _cwd: string) => connectionFactory(_server); + + const manager = new McpManager(deps, factory); + return manager; +} + +const testServer: ResolvedMcpServer = { + id: "test-server", + command: ["test-cmd"], + configSource: ".dispatch/mcp.json", +}; + +describe("McpManager", () => { + it("lazy-spawn on first access", async () => { + const conn = makeMockConnection(); + let spawnCount = 0; + const manager = makeManager((_server) => { + spawnCount++; + return { connection: conn, promise: Promise.resolve() }; + }); + + const client = await manager.ensureConnected(testServer, "/tmp"); + expect(client).toBeDefined(); + expect(spawnCount).toBe(1); + }); + + it("reuses existing client on second access", async () => { + const conn = makeMockConnection(); + let spawnCount = 0; + const manager = makeManager(() => { + spawnCount++; + return { connection: conn, promise: Promise.resolve() }; + }); + + const client1 = await manager.ensureConnected(testServer, "/tmp"); + const client2 = await manager.ensureConnected(testServer, "/tmp"); + expect(client1).toBe(client2); + expect(spawnCount).toBe(1); + }); + + it("status returns server states", async () => { + const conn = makeMockConnection(); + const manager = makeManager(() => { + return { connection: conn, promise: Promise.resolve() }; + }); + + // Before connecting + let statuses = manager.status([testServer]); + expect(statuses.length).toBe(1); + expect(statuses[0].state).toBe("disconnected"); + + // After connecting + await manager.ensureConnected(testServer, "/tmp"); + statuses = manager.status([testServer]); + expect(statuses.length).toBe(1); + expect(statuses[0].state).toBe("connected"); + expect(statuses[0].toolCount).toBe(1); + }); + + it("shutdownAll kills all clients", async () => { + let closed = false; + const conn: Connection = { + send: async (method: string) => { + if (method === "initialize") { + return { + protocolVersion: "2025-11-25", + capabilities: {}, + serverInfo: { name: "test", version: "1.0.0" }, + }; + } + if (method === "tools/list") return { tools: [] }; + return {}; + }, + notify: () => {}, + onNotification: () => {}, + close: () => { + closed = true; + }, + pid: 200, + }; + + const manager = makeManager(() => { + return { connection: conn, promise: Promise.resolve() }; + }); + + await manager.ensureConnected(testServer, "/tmp"); + manager.shutdownAll(); + + expect(closed).toBe(true); + const statuses = manager.status([testServer]); + expect(statuses[0].state).toBe("disconnected"); + }); + + it("broken server reports error state", async () => { + const manager = makeManager(() => { + return { connection: makeBrokenConnection(), promise: Promise.resolve() }; + }); + + await expect(manager.ensureConnected(testServer, "/tmp")).rejects.toThrow(); + + const statuses = manager.status([testServer]); + expect(statuses[0].state).toBe("error"); + expect(statuses[0].error).toContain("test-server"); + }); + + it("broken server retries after backoff", async () => { + let currentTime = 1000; + const brokenConn = makeBrokenConnection(); + const goodConn = makeMockConnection(); + let useGood = false; + + const deps: McpManagerDeps = { + spawn: () => ({ + stdin: { write: () => {} }, + stdout: { on: () => {} }, + pid: 1, + kill: () => {}, + }), + now: () => currentTime, + }; + + const factory = (_server: ResolvedMcpServer, _cwd: string) => { + const conn = useGood ? goodConn : brokenConn; + return { connection: conn, promise: Promise.resolve() }; + }; + + const manager = new McpManager(deps, factory); + + // First attempt fails + await expect(manager.ensureConnected(testServer, "/tmp")).rejects.toThrow(); + expect(manager.status([testServer])[0].state).toBe("error"); + + // Not enough time passed — still broken + currentTime += 29_000; + expect(manager.status([testServer])[0].state).toBe("error"); + + // After backoff — should allow retry + useGood = true; + currentTime += 2_000; + const statuses = manager.status([testServer]); + // After backoff, status() clears the broken entry + expect(statuses[0].state).toBe("disconnected"); + }); + + it("getClient returns undefined for unknown server", () => { + const manager = makeManager(() => { + return { connection: makeMockConnection(), promise: Promise.resolve() }; + }); + + expect(manager.getClient("nonexistent")).toBeUndefined(); + }); +}); diff --git a/packages/mcp/src/manager.ts b/packages/mcp/src/manager.ts new file mode 100644 index 0000000..a9c06de --- /dev/null +++ b/packages/mcp/src/manager.ts @@ -0,0 +1,205 @@ +/** + * McpManager — one McpClient per configured server; lazy-spawn on first + * tool access; status(servers); getClient(serverId); shutdownAll(). + * + * Mirrors the LSP manager lifecycle. Injected spawn + logger (no I/O of its + * own: config resolution happens in the extension layer; the manager receives + * already-resolved servers). + */ + +import { McpClient } from "./client.js"; +import type { Connection, SpawnProcess } from "./transport.js"; +import type { McpServerState, McpServerStatus, ResolvedMcpServer } from "./types.js"; + +export interface Logger { + readonly info: (msg: string, attrs?: Record<string, string | number | boolean | null>) => void; + readonly warn: (msg: string, attrs?: Record<string, string | number | boolean | null>) => void; + readonly error: (msg: string, attrs?: Record<string, unknown>) => void; +} + +export interface McpManagerDeps { + readonly spawn: SpawnProcess; + readonly logger?: Logger; + readonly now?: () => number; +} + +export type ConnectionFactory = ( + server: ResolvedMcpServer, + cwd: string, +) => { connection: Connection; promise: Promise<void> }; + +type ClientEntry = { + readonly client: McpClient; + readonly server: ResolvedMcpServer; + readonly promise: Promise<void>; +}; + +type BrokenEntry = { + readonly brokenAt: number; + readonly error: string; +}; + +const BACKOFF_MS = 30_000; + +export class McpManager { + private clients = new Map<string, ClientEntry>(); + private broken = new Map<string, BrokenEntry>(); + private spawning = new Map<string, Promise<void>>(); + private readonly deps: McpManagerDeps; + private readonly connectionFactory: ConnectionFactory; + private readonly now: () => number; + + constructor(deps: McpManagerDeps, connectionFactory: ConnectionFactory) { + this.deps = deps; + this.connectionFactory = connectionFactory; + this.now = deps.now ?? Date.now; + } + + getClient(serverId: string): McpClient | undefined { + return this.clients.get(serverId)?.client; + } + + getServerState(serverId: string): McpServerState { + const brokenEntry = this.broken.get(serverId); + if (brokenEntry) { + const backoffElapsed = this.now() - brokenEntry.brokenAt >= BACKOFF_MS; + if (backoffElapsed) { + this.broken.delete(serverId); + } else { + return "error"; + } + } + + const entry = this.clients.get(serverId); + if (!entry) return "disconnected"; + + const state = entry.client.getState(); + if (state === "error") return "error"; + if (state === "connecting") return "connecting"; + if (state === "connected") return "connected"; + return "disconnected"; + } + + status(servers: readonly ResolvedMcpServer[]): McpServerStatus[] { + const results: McpServerStatus[] = []; + + for (const server of servers) { + const state = this.getServerState(server.id); + const entry = this.clients.get(server.id); + const brokenEntry = this.broken.get(server.id); + + const status: McpServerStatus = { + id: server.id, + state, + toolCount: entry?.client.getTools().length ?? 0, + }; + if (state === "error" && brokenEntry) { + (status as { error?: string }).error = brokenEntry.error; + } else if (state === "error" && entry?.client.getState() === "error") { + (status as { error?: string }).error = brokenEntry?.error ?? `${server.id}: client error`; + } + results.push(status); + } + + return results; + } + + async ensureConnected(server: ResolvedMcpServer, cwd: string): Promise<McpClient> { + const existing = this.clients.get(server.id); + if (existing && existing.client.getState() === "connected") { + return existing.client; + } + + const brokenEntry = this.broken.get(server.id); + if (brokenEntry) { + const backoffElapsed = this.now() - brokenEntry.brokenAt >= BACKOFF_MS; + if (!backoffElapsed) { + throw new Error(brokenEntry.error); + } + this.broken.delete(server.id); + } + + await this.spawnClient(server, cwd); + const entry = this.clients.get(server.id); + if (!entry) { + throw new Error(`Failed to spawn MCP client for ${server.id}`); + } + if (entry.client.getState() === "error") { + const brokenNow = this.broken.get(server.id); + throw new Error(brokenNow?.error ?? `${server.id}: client error`); + } + return entry.client; + } + + private async spawnClient(server: ResolvedMcpServer, cwd: string): Promise<void> { + const existingSpawn = this.spawning.get(server.id); + if (existingSpawn) return existingSpawn; + + const spawnPromise = this.doSpawn(server, cwd); + this.spawning.set(server.id, spawnPromise); + + try { + await spawnPromise; + } finally { + this.spawning.delete(server.id); + } + } + + private async doSpawn(server: ResolvedMcpServer, cwd: string): Promise<void> { + const { connection, promise } = this.connectionFactory(server, cwd); + + const client = new McpClient({ connection }); + + const entry: ClientEntry = { + client, + server, + promise: this.initClient(client, server, promise), + }; + + this.clients.set(server.id, entry); + await entry.promise; + + // If initialization failed, the client is in an error state and broken[] + // is already populated. Drop the half-created client (and reap its child + // process) so a later retry spawns fresh instead of returning a dead entry. + if (client.getState() === "error") { + this.clients.delete(server.id); + client.close(); + } + } + + private async initClient( + client: McpClient, + server: ResolvedMcpServer, + _transportPromise: Promise<void>, + ): Promise<void> { + try { + await client.initialize(); + await client.listTools(); + this.deps.logger?.info("MCP server connected", { + serverId: server.id, + toolCount: String(client.getTools().length), + }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + this.broken.set(server.id, { + brokenAt: this.now(), + error: `${server.id}: ${message}`, + }); + this.deps.logger?.warn("MCP server failed to connect", { + serverId: server.id, + error: message, + }); + } + } + + shutdownAll(): void { + for (const [, entry] of this.clients) { + entry.client.close(); + } + this.clients.clear(); + this.broken.clear(); + this.spawning.clear(); + this.deps.logger?.info("All MCP servers shut down"); + } +} diff --git a/packages/mcp/src/registry.test.ts b/packages/mcp/src/registry.test.ts new file mode 100644 index 0000000..4c88828 --- /dev/null +++ b/packages/mcp/src/registry.test.ts @@ -0,0 +1,237 @@ +import type { Logger } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import { adaptTool, flattenContent, namespace } from "./registry.js"; +import type { McpCallResult, McpContentItem, McpToolCaller, McpToolInfo } from "./types.js"; + +const mockSpan = { + id: "span-1", + log: {} as Logger, + setAttributes: () => {}, + addLink: () => {}, + child: () => mockSpan, + end: () => {}, +}; + +const mockLogger: Logger = { + info: () => {}, + warn: () => {}, + error: () => {}, + debug: () => {}, + child: () => mockLogger, + span: () => mockSpan, +}; + +/** + * A real, minimal McpToolCaller collaborator (not a mock of McpClient). + * It records calls and returns a configurable result. + */ +function makeCaller( + result: McpCallResult, +): McpToolCaller & { calls: Array<{ name: string; args: unknown }> } { + const calls: Array<{ name: string; args: unknown }> = []; + return { + calls, + callTool: async (name: string, args: unknown) => { + calls.push({ name, args }); + return result; + }, + }; +} + +describe("namespace", () => { + it("produces <serverId>__<toolName>", () => { + expect(namespace("freecad", "create_object")).toBe("freecad__create_object"); + }); + + it("handles serverId with special chars", () => { + expect(namespace("chrome-devtools", "navigate")).toBe("chrome-devtools__navigate"); + }); + + it("handles empty toolName", () => { + expect(namespace("server", "")).toBe("server__"); + }); +}); + +describe("flattenContent", () => { + it("flattens text content", () => { + const content: McpContentItem[] = [{ type: "text", text: "hello world" }]; + expect(flattenContent(content)).toBe("hello world"); + }); + + it("flattens image content", () => { + const content: McpContentItem[] = [ + { type: "image", data: "base64data", mimeType: "image/png" }, + ]; + expect(flattenContent(content)).toBe("[image: image/png]"); + }); + + it("flattens resource content with text", () => { + const content: McpContentItem[] = [ + { type: "resource", resource: { uri: "file:///test", text: "resource text" } }, + ]; + expect(flattenContent(content)).toBe("resource text"); + }); + + it("flattens resource content without text", () => { + const content: McpContentItem[] = [{ type: "resource", resource: { uri: "file:///test" } }]; + expect(flattenContent(content)).toBe("[resource: file:///test]"); + }); + + it("joins multiple items with newline", () => { + const content: McpContentItem[] = [ + { type: "text", text: "first" }, + { type: "text", text: "second" }, + ]; + expect(flattenContent(content)).toBe("first\nsecond"); + }); + + it("returns empty string for empty content", () => { + expect(flattenContent([])).toBe(""); + }); + + it("handles mixed content types", () => { + const content: McpContentItem[] = [ + { type: "text", text: "here is an image:" }, + { type: "image", data: "data", mimeType: "image/jpeg" }, + { type: "resource", resource: { uri: "file:///x", text: "some data" } }, + ]; + expect(flattenContent(content)).toBe("here is an image:\n[image: image/jpeg]\nsome data"); + }); +}); + +describe("adaptTool", () => { + it("maps inputSchema to ToolParameterSchema", () => { + const mcpTool: McpToolInfo = { + name: "create_obj", + description: "Create an object", + inputSchema: { + type: "object", + properties: { name: { type: "string", description: "Object name" } }, + required: ["name"], + }, + }; + const caller = makeCaller({ content: [] }); + const adapted = adaptTool("freecad", mcpTool, caller); + + expect(adapted.name).toBe("freecad__create_obj"); + expect(adapted.description).toBe("[freecad] Create an object"); + expect(adapted.parameters.type).toBe("object"); + expect(adapted.parameters.properties).toEqual({ + name: { type: "string", description: "Object name" }, + }); + expect(adapted.parameters.required).toEqual(["name"]); + expect(adapted.concurrencySafe).toBe(false); + }); + + it("execute proxies to callTool", async () => { + const mcpTool: McpToolInfo = { + name: "test_tool", + description: "Test", + inputSchema: { type: "object" }, + }; + const caller = makeCaller({ + content: [{ type: "text", text: "called" }], + isError: false, + }); + const adapted = adaptTool("server", mcpTool, caller); + + const result = await adapted.execute( + { input: "value" }, + { + toolCallId: "call-1", + onOutput: () => {}, + signal: new AbortController().signal, + log: mockLogger, + }, + ); + + expect(result.content).toBe("called"); + expect(result.isError).toBe(false); + expect(caller.calls).toEqual([{ name: "test_tool", args: { input: "value" } }]); + }); + + it("execute propagates isError", async () => { + const caller = makeCaller({ + content: [{ type: "text", text: "error occurred" }], + isError: true, + }); + + const mcpTool: McpToolInfo = { + name: "fail_tool", + description: "Fails", + inputSchema: { type: "object" }, + }; + const adapted = adaptTool("server", mcpTool, caller); + + const result = await adapted.execute( + {}, + { + toolCallId: "call-2", + onOutput: () => {}, + signal: new AbortController().signal, + log: mockLogger, + }, + ); + + expect(result.isError).toBe(true); + expect(result.content).toBe("error occurred"); + }); + + it("handles inputSchema without optional fields", () => { + const mcpTool: McpToolInfo = { + name: "simple", + description: "Simple tool", + inputSchema: { type: "object" }, + }; + const caller = makeCaller({ content: [] }); + const adapted = adaptTool("server", mcpTool, caller); + + expect(adapted.parameters.type).toBe("object"); + expect(adapted.parameters.properties).toBeUndefined(); + expect(adapted.parameters.required).toBeUndefined(); + expect(adapted.parameters.additionalProperties).toBeUndefined(); + }); + + it("preserves additionalProperties", () => { + const mcpTool: McpToolInfo = { + name: "flex", + description: "Flexible", + inputSchema: { + type: "object", + additionalProperties: true, + }, + }; + const caller = makeCaller({ content: [] }); + const adapted = adaptTool("server", mcpTool, caller); + + expect(adapted.parameters.additionalProperties).toBe(true); + }); + + it("execute flattens multi-item content", async () => { + const caller = makeCaller({ + content: [ + { type: "text", text: "summary" }, + { type: "image", data: "d", mimeType: "image/png" }, + ], + isError: false, + }); + const mcpTool: McpToolInfo = { + name: "multi", + description: "Multi", + inputSchema: { type: "object" }, + }; + const adapted = adaptTool("srv", mcpTool, caller); + + const result = await adapted.execute( + {}, + { + toolCallId: "c", + onOutput: () => {}, + signal: new AbortController().signal, + log: mockLogger, + }, + ); + + expect(result.content).toBe("summary\n[image: image/png]"); + }); +}); diff --git a/packages/mcp/src/registry.ts b/packages/mcp/src/registry.ts new file mode 100644 index 0000000..7c31c88 --- /dev/null +++ b/packages/mcp/src/registry.ts @@ -0,0 +1,79 @@ +/** + * Tool name namespacing + ToolContract adapter. + * + * Namespaces each MCP tool as <serverId>__<toolName> (double underscore). + * Adapts an MCP tool definition into a Dispatch ToolContract whose execute() + * proxies to the MCP server via the injected McpToolCaller (client.callTool()). + * + * PURE: depends only on the McpToolCaller port — never on the concrete client. + */ + +import type { + ToolContract, + ToolExecuteContext, + ToolParameterSchema, + ToolResult, +} from "@dispatch/kernel"; +import type { McpContentItem, McpToolCaller, McpToolInfo } from "./types.js"; + +const NAMESPACE_SEP = "__"; + +export function namespace(serverId: string, toolName: string): string { + return `${serverId}${NAMESPACE_SEP}${toolName}`; +} + +export function adaptTool( + serverId: string, + mcpTool: McpToolInfo, + caller: McpToolCaller, +): ToolContract { + const parameters: ToolParameterSchema = { + type: "object", + ...(mcpTool.inputSchema.properties !== undefined && { + properties: mcpTool.inputSchema.properties, + }), + ...(mcpTool.inputSchema.required !== undefined && { + required: mcpTool.inputSchema.required, + }), + ...(mcpTool.inputSchema.additionalProperties !== undefined && { + additionalProperties: mcpTool.inputSchema.additionalProperties, + }), + }; + + return { + name: namespace(serverId, mcpTool.name), + description: `[${serverId}] ${mcpTool.description}`, + parameters, + concurrencySafe: false, + execute: async (args: unknown, ctx: ToolExecuteContext): Promise<ToolResult> => { + const result = await caller.callTool(mcpTool.name, args, ctx.signal); + const toolResult: ToolResult = { + content: flattenContent(result.content), + }; + if (result.isError !== undefined) { + (toolResult as { isError?: boolean }).isError = result.isError; + } + return toolResult; + }, + }; +} + +export function flattenContent(content: readonly McpContentItem[]): string { + if (content.length === 0) return ""; + + const parts: string[] = []; + for (const item of content) { + if (item.type === "text" && item.text !== undefined) { + parts.push(item.text); + } else if (item.type === "image" && item.mimeType !== undefined) { + parts.push(`[image: ${item.mimeType}]`); + } else if (item.type === "resource" && item.resource !== undefined) { + if (item.resource.text !== undefined) { + parts.push(item.resource.text); + } else { + parts.push(`[resource: ${item.resource.uri}]`); + } + } + } + return parts.join("\n"); +} diff --git a/packages/mcp/src/rpc.test.ts b/packages/mcp/src/rpc.test.ts new file mode 100644 index 0000000..9ffb121 --- /dev/null +++ b/packages/mcp/src/rpc.test.ts @@ -0,0 +1,117 @@ +import { describe, expect, it } from "vitest"; +import { JsonRpcClient } from "./rpc.js"; + +function makeClient(): { + client: JsonRpcClient; + written: Uint8Array[]; + feedMessage: (msg: unknown) => void; +} { + const written: Uint8Array[] = []; + const client = new JsonRpcClient((bytes) => { + written.push(bytes); + }); + return { + client, + written, + feedMessage: (msg: unknown) => { + client.handleMessage(JSON.stringify(msg)); + }, + }; +} + +describe("JsonRpcClient", () => { + it("request returns result", async () => { + const { client, feedMessage } = makeClient(); + + const resultPromise = client.request("initialize", { protocolVersion: "2025-11-25" }); + + feedMessage({ jsonrpc: "2.0", id: 1, result: { protocolVersion: "2025-11-25" } }); + + const result = await resultPromise; + expect(result).toEqual({ protocolVersion: "2025-11-25" }); + }); + + it("request rejects on error response", async () => { + const { client, feedMessage } = makeClient(); + + const resultPromise = client.request("bad-method"); + + feedMessage({ + jsonrpc: "2.0", + id: 1, + error: { code: -32601, message: "Method not found" }, + }); + + await expect(resultPromise).rejects.toThrow("Method not found"); + }); + + it("notify sends without expecting response", () => { + const { client, written } = makeClient(); + + client.notify("notifications/initialized", {}); + + expect(written.length).toBe(1); + const sent = new TextDecoder().decode(written[0]); + expect(sent).toContain('"method":"notifications/initialized"'); + expect(sent).not.toContain('"id"'); + }); + + it("onNotification fires for matching method", () => { + const { client, feedMessage } = makeClient(); + + let received: unknown = "unset"; + client.onNotification("notifications/tools/list_changed", (params) => { + received = params; + }); + + feedMessage({ jsonrpc: "2.0", method: "notifications/tools/list_changed", params: { a: 1 } }); + + expect(received).toEqual({ a: 1 }); + }); + + it("pending request rejected on close", async () => { + const { client } = makeClient(); + + const resultPromise = client.request("slow-method"); + + client.close(); + + await expect(resultPromise).rejects.toThrow("Connection closed"); + }); + + it("incremental request ids", () => { + const { client, written } = makeClient(); + + client.request("a"); + client.request("b"); + client.request("c"); + + expect(written.length).toBe(3); + // Extract JSON body from Content-Length framed messages + const parse = (bytes: Uint8Array): { id: number } => { + const text = new TextDecoder().decode(bytes); + const bodyStart = text.indexOf("\r\n\r\n") + 4; + return JSON.parse(text.slice(bodyStart)) as { id: number }; + }; + const msg1 = parse(written[0]); + const msg2 = parse(written[1]); + const msg3 = parse(written[2]); + expect(msg1.id).toBe(1); + expect(msg2.id).toBe(2); + expect(msg3.id).toBe(3); + }); + + it("notify after close is silently dropped", () => { + const { client, written } = makeClient(); + client.close(); + const count = written.length; + client.notify("test"); + expect(written.length).toBe(count); + }); + + it("request after close rejects immediately", async () => { + const { client } = makeClient(); + client.close(); + await expect(client.request("test")).rejects.toThrow("Connection closed"); + }); +}); diff --git a/packages/mcp/src/rpc.ts b/packages/mcp/src/rpc.ts new file mode 100644 index 0000000..eef8d73 --- /dev/null +++ b/packages/mcp/src/rpc.ts @@ -0,0 +1,103 @@ +/** + * JSON-RPC 2.0 client over an injected write function. + * + * Provides request (correlated by id), notify, and onNotification. + * The caller feeds decoded JSON messages via `handleMessage`. + */ + +import { encode } from "./framing.js"; + +export type WriteFn = (bytes: Uint8Array) => void; + +export interface PendingRequest { + readonly resolve: (value: unknown) => void; + readonly reject: (reason: unknown) => void; +} + +export type NotificationHandler = (params: unknown) => void; + +export interface JsonRpcMessage { + readonly jsonrpc: "2.0"; + readonly id?: number | string | undefined; + readonly method?: string | undefined; + readonly params?: unknown; + readonly result?: unknown; + readonly error?: + | { readonly code: number; readonly message: string; readonly data?: unknown } + | undefined; +} + +export class JsonRpcClient { + private nextId = 1; + private pending = new Map<number | string, PendingRequest>(); + private notificationHandlers = new Map<string, NotificationHandler>(); + private write: WriteFn; + private closed = false; + + constructor(write: WriteFn) { + this.write = write; + } + + request(method: string, params?: unknown): Promise<unknown> { + if (this.closed) { + return Promise.reject(new Error("Connection closed")); + } + const id = this.nextId++; + const msg: JsonRpcMessage = { jsonrpc: "2.0", id, method, params }; + return new Promise((resolve, reject) => { + this.pending.set(id, { resolve, reject }); + this.sendMessage(msg); + }); + } + + notify(method: string, params?: unknown): void { + if (this.closed) return; + const msg: JsonRpcMessage = { jsonrpc: "2.0", method, params }; + this.sendMessage(msg); + } + + onNotification(method: string, handler: NotificationHandler): void { + this.notificationHandlers.set(method, handler); + } + + handleMessage(json: string): void { + const msg = JSON.parse(json) as JsonRpcMessage; + const { id, method } = msg; + + if (method !== undefined && id === undefined) { + this.handleIncomingNotification(method, msg.params); + } else if (id !== undefined) { + this.handleResponse(id, msg); + } + } + + private sendMessage(msg: JsonRpcMessage): void { + this.write(encode(JSON.stringify(msg))); + } + + private handleResponse(id: number | string, msg: JsonRpcMessage): void { + const entry = this.pending.get(id); + if (!entry) return; + this.pending.delete(id); + if (msg.error) { + entry.reject(new Error(msg.error.message)); + } else { + entry.resolve(msg.result); + } + } + + private handleIncomingNotification(method: string, params: unknown): void { + const handler = this.notificationHandlers.get(method); + if (handler) { + handler(params); + } + } + + close(): void { + this.closed = true; + for (const entry of this.pending.values()) { + entry.reject(new Error("Connection closed")); + } + this.pending.clear(); + } +} diff --git a/packages/mcp/src/transport.test.ts b/packages/mcp/src/transport.test.ts new file mode 100644 index 0000000..b369e74 --- /dev/null +++ b/packages/mcp/src/transport.test.ts @@ -0,0 +1,154 @@ +import { describe, expect, it } from "vitest"; +import { encode } from "./framing.js"; +import type { SpawnedProcess, SpawnProcess } from "./transport.js"; +import { createStdioTransport } from "./transport.js"; + +/** + * In-memory pipe pair: simulates a child process. `emitStdout` pushes framed + * bytes the server "wrote" to stdout (which we read); `writtenToStdin` captures + * what we wrote to the child's stdin (our outgoing framed messages). + */ +function makePipe(): { + process: SpawnedProcess; + emitStdout: (data: Uint8Array) => void; + emitEnd: () => void; + writtenToStdin: () => Uint8Array[]; + killed: () => boolean; +} { + const dataListeners: Array<(data: Uint8Array) => void> = []; + const endListeners: Array<() => void> = []; + const stdinWrites: Uint8Array[] = []; + let killed = false; + + const process: SpawnedProcess = { + stdin: { + write: (bytes: Uint8Array) => { + stdinWrites.push(bytes); + }, + }, + stdout: { + on: (event: string, cb: (data: Uint8Array) => void) => { + if (event === "data") dataListeners.push(cb); + else if (event === "end") endListeners.push(cb as unknown as () => void); + }, + }, + pid: 12345, + kill: () => { + killed = true; + }, + }; + + return { + process, + emitStdout: (data: Uint8Array) => { + for (const cb of dataListeners) cb(data); + }, + emitEnd: () => { + for (const cb of endListeners) cb(); + }, + writtenToStdin: () => stdinWrites, + killed: () => killed, + }; +} + +describe("createStdioTransport", () => { + it("creates connection with correct pid", () => { + const pair = makePipe(); + const spawn: SpawnProcess = () => pair.process; + + const { connection } = createStdioTransport({ spawn, command: ["test-server"] }, "/tmp"); + + expect(connection.pid).toBe(12345); + connection.close(); + }); + + it("connection sends framed messages via stdin", () => { + const pair = makePipe(); + const spawn: SpawnProcess = () => pair.process; + + const { connection } = createStdioTransport({ spawn, command: ["test"] }, "/tmp"); + + connection.notify("test/method", { key: "value" }); + + const writes = pair.writtenToStdin(); + expect(writes.length).toBe(1); + const text = new TextDecoder().decode(writes[0]); + expect(text).toContain("Content-Length:"); + expect(text).toContain('"method":"test/method"'); + connection.close(); + }); + + it("close kills the child process", () => { + const pair = makePipe(); + const spawn: SpawnProcess = () => pair.process; + + const { connection } = createStdioTransport({ spawn, command: ["test"] }, "/tmp"); + + connection.close(); + expect(pair.killed()).toBe(true); + }); + + it("pipes stdout through framing: a notification triggers onNotification", async () => { + const pair = makePipe(); + const spawn: SpawnProcess = () => pair.process; + + const { connection } = createStdioTransport({ spawn, command: ["test"] }, "/tmp"); + + let received: unknown = null; + connection.onNotification("notifications/tools/list_changed", (params) => { + received = params; + }); + + // Simulate the server writing a framed notification to stdout. + const notification = JSON.stringify({ + jsonrpc: "2.0", + method: "notifications/tools/list_changed", + params: { reason: "tools added" }, + }); + pair.emitStdout(encode(notification)); + + // onNotification is invoked synchronously inside the data handler. + expect(received).toEqual({ reason: "tools added" }); + connection.close(); + }); + + it("pipes stdout through framing: a response resolves a request", async () => { + const pair = makePipe(); + const spawn: SpawnProcess = () => pair.process; + + const { connection } = createStdioTransport({ spawn, command: ["test"] }, "/tmp"); + + const resultPromise = connection.send("tools/list"); + + // The request was framed and written to stdin; respond via stdout. + const response = JSON.stringify({ + jsonrpc: "2.0", + id: 1, + result: { tools: [{ name: "t", description: "d", inputSchema: { type: "object" } }] }, + }); + pair.emitStdout(encode(response)); + + const result = await resultPromise; + expect(result).toEqual({ + tools: [{ name: "t", description: "d", inputSchema: { type: "object" } }], + }); + connection.close(); + }); + + it("handles a frame split across two stdout chunks", async () => { + const pair = makePipe(); + const spawn: SpawnProcess = () => pair.process; + + const { connection } = createStdioTransport({ spawn, command: ["test"] }, "/tmp"); + + const resultPromise = connection.send("ping"); + + const response = encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { ok: true } })); + const mid = Math.floor(response.length / 2); + pair.emitStdout(response.slice(0, mid)); + pair.emitStdout(response.slice(mid)); + + await expect(resultPromise).resolves.toEqual({ ok: true }); + connection.close(); + }); +}); diff --git a/packages/mcp/src/transport.ts b/packages/mcp/src/transport.ts new file mode 100644 index 0000000..b1e3ec5 --- /dev/null +++ b/packages/mcp/src/transport.ts @@ -0,0 +1,101 @@ +/** + * StdioTransport — spawns a child process and pipes stdin/stdout through + * Content-Length framing + JSON-RPC. + */ + +import { FrameDecoder } from "./framing.js"; +import { JsonRpcClient, type WriteFn } from "./rpc.js"; + +export interface SpawnedProcess { + readonly stdin: { readonly write: (bytes: Uint8Array) => void }; + readonly stdout: + | AsyncIterable<Uint8Array> + | { readonly on: (event: string, cb: (data: Uint8Array) => void) => void }; + readonly stderr?: + | AsyncIterable<Uint8Array> + | { readonly on: (event: string, cb: (data: Uint8Array) => void) => void } + | undefined; + readonly pid: number | undefined; + readonly kill: () => void; +} + +export type SpawnProcess = ( + command: readonly string[], + opts: { readonly cwd: string; readonly env?: Readonly<Record<string, string>> | undefined }, +) => SpawnedProcess; + +export interface Connection { + readonly send: (method: string, params?: unknown) => Promise<unknown>; + readonly notify: (method: string, params?: unknown) => void; + readonly onNotification: (method: string, handler: (params: unknown) => void) => void; + readonly close: () => void; + readonly pid: number | undefined; +} + +export interface StdioTransportDeps { + readonly spawn: SpawnProcess; + readonly command: readonly string[]; + readonly env?: Readonly<Record<string, string>>; +} + +export function createStdioTransport( + deps: StdioTransportDeps, + cwd: string, +): { connection: Connection; promise: Promise<void> } { + const spawnOpts: { readonly cwd: string; readonly env?: Readonly<Record<string, string>> } = { + cwd, + }; + if (deps.env) { + (spawnOpts as { env?: Readonly<Record<string, string>> }).env = deps.env; + } + + const proc = deps.spawn(deps.command, spawnOpts); + const decoder = new FrameDecoder(); + + const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes); + const rpc = new JsonRpcClient(writeFn); + + const stdoutSource = proc.stdout; + const promise = new Promise<void>((resolve, reject) => { + if (Symbol.asyncIterator in stdoutSource) { + (async () => { + try { + for await (const chunk of stdoutSource as AsyncIterable<Uint8Array>) { + const messages = decoder.decode(chunk); + for (const msg of messages) { + rpc.handleMessage(msg); + } + } + resolve(); + } catch (err: unknown) { + reject(err); + } + })(); + } else { + const source = stdoutSource as { + readonly on: (event: string, cb: (data: Uint8Array) => void) => void; + }; + source.on("data", (data: Uint8Array) => { + const messages = decoder.decode(data); + for (const msg of messages) { + rpc.handleMessage(msg); + } + }); + source.on("end", () => resolve()); + source.on("error", (err: unknown) => reject(err)); + } + }); + + const connection: Connection = { + send: (method, params) => rpc.request(method, params), + notify: (method, params) => rpc.notify(method, params), + onNotification: (method, handler) => rpc.onNotification(method, handler), + close: () => { + rpc.close(); + proc.kill(); + }, + pid: proc.pid, + }; + + return { connection, promise }; +} diff --git a/packages/mcp/src/types.ts b/packages/mcp/src/types.ts new file mode 100644 index 0000000..522511b --- /dev/null +++ b/packages/mcp/src/types.ts @@ -0,0 +1,87 @@ +/** + * Shared types for the MCP extension. + */ + +import type { ToolParameterSchema } from "@dispatch/kernel"; + +export interface McpServerConfig { + readonly command: string; + readonly args?: readonly string[]; + readonly env?: Readonly<Record<string, string>>; +} + +export interface ResolvedMcpServer { + readonly id: string; + readonly command: readonly string[]; + readonly env?: Readonly<Record<string, string>>; + readonly configSource: ".dispatch/mcp.json" | "opencode.json"; +} + +export interface ResolveResult { + readonly servers: readonly ResolvedMcpServer[]; + readonly shadowed: boolean; +} + +export type McpServerState = "connecting" | "connected" | "error" | "disconnected"; + +export interface McpServerStatus { + readonly id: string; + readonly state: McpServerState; + readonly error?: string; + readonly toolCount: number; +} + +export interface McpToolInfo { + readonly name: string; + readonly description: string; + readonly inputSchema: ToolParameterSchema; +} + +export interface McpContentItem { + readonly type: string; + readonly text?: string; + readonly data?: string; + readonly mimeType?: string; + readonly resource?: { + readonly uri: string; + readonly text?: string; + }; +} + +export interface McpCallResult { + readonly content: readonly McpContentItem[]; + readonly isError?: boolean; +} + +export interface McpServerCapabilities { + readonly tools?: { + readonly listChanged?: boolean; + }; +} + +export interface McpInitializeResult { + readonly protocolVersion: string; + readonly capabilities: McpServerCapabilities; + readonly serverInfo: { + readonly name: string; + readonly version: string; + }; +} + +export interface McpService { + readonly status: (cwd: string) => Promise<readonly McpServerStatus[]>; +} + +export interface McpListToolsResult { + readonly tools: readonly McpToolInfo[]; +} + +/** + * The narrow capability `adaptTool` depends on: the ability to invoke a tool + * on the MCP server. Declared as a port so the registry stays pure and + * testable against a real collaborator (not a mock of `McpClient`). + * `McpClient` satisfies this structurally. + */ +export interface McpToolCaller { + readonly callTool: (name: string, args: unknown, signal?: AbortSignal) => Promise<McpCallResult>; +} diff --git a/packages/mcp/tsconfig.json b/packages/mcp/tsconfig.json new file mode 100644 index 0000000..2ae3233 --- /dev/null +++ b/packages/mcp/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [{ "path": "../kernel" }, { "path": "../session-orchestrator" }] +} diff --git a/tsconfig.json b/tsconfig.json index 7babef3..3dea4a1 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,40 +1,113 @@ { "files": [], "references": [ - { "path": "./packages/wire" }, - { "path": "./packages/kernel" }, - { "path": "./packages/transport-contract" }, - { "path": "./packages/ui-contract" }, - { "path": "./packages/surface-registry" }, - { "path": "./packages/transport-ws" }, - { "path": "./packages/surface-loaded-extensions" }, - { "path": "./packages/storage-sqlite" }, - { "path": "./packages/auth-apikey" }, - { "path": "./packages/provider-openai-compat" }, - { "path": "./packages/openai-stream" }, - { "path": "./packages/provider-umans" }, - { "path": "./packages/credential-store" }, - { "path": "./packages/conversation-store" }, - { "path": "./packages/throughput-store" }, - { "path": "./packages/todo" }, - { "path": "./packages/session-orchestrator" }, - { "path": "./packages/transport-http" }, - { "path": "./packages/tool-read-file" }, - { "path": "./packages/tool-shell" }, - { "path": "./packages/tool-edit-file" }, - { "path": "./packages/tool-write-file" }, - { "path": "./packages/tool-web-search" }, - { "path": "./packages/tool-youtube-transcript" }, - { "path": "./packages/skills" }, - { "path": "./packages/cache-warming" }, - { "path": "./packages/message-queue" }, - { "path": "./packages/lsp" }, - { "path": "./packages/system-prompt" }, - { "path": "./packages/cli" }, - { "path": "./packages/journal-sink" }, - { "path": "./packages/trace-store" }, - { "path": "./packages/observability-collector" }, - { "path": "./packages/trace-replay" }, - { "path": "./packages/host-bin" } + { + "path": "./packages/wire" + }, + { + "path": "./packages/kernel" + }, + { + "path": "./packages/transport-contract" + }, + { + "path": "./packages/ui-contract" + }, + { + "path": "./packages/surface-registry" + }, + { + "path": "./packages/transport-ws" + }, + { + "path": "./packages/surface-loaded-extensions" + }, + { + "path": "./packages/storage-sqlite" + }, + { + "path": "./packages/auth-apikey" + }, + { + "path": "./packages/provider-openai-compat" + }, + { + "path": "./packages/openai-stream" + }, + { + "path": "./packages/provider-umans" + }, + { + "path": "./packages/credential-store" + }, + { + "path": "./packages/conversation-store" + }, + { + "path": "./packages/throughput-store" + }, + { + "path": "./packages/todo" + }, + { + "path": "./packages/session-orchestrator" + }, + { + "path": "./packages/transport-http" + }, + { + "path": "./packages/tool-read-file" + }, + { + "path": "./packages/tool-shell" + }, + { + "path": "./packages/tool-edit-file" + }, + { + "path": "./packages/tool-write-file" + }, + { + "path": "./packages/tool-web-search" + }, + { + "path": "./packages/tool-youtube-transcript" + }, + { + "path": "./packages/skills" + }, + { + "path": "./packages/cache-warming" + }, + { + "path": "./packages/message-queue" + }, + { + "path": "./packages/mcp" + }, + { + "path": "./packages/lsp" + }, + { + "path": "./packages/system-prompt" + }, + { + "path": "./packages/cli" + }, + { + "path": "./packages/journal-sink" + }, + { + "path": "./packages/trace-store" + }, + { + "path": "./packages/observability-collector" + }, + { + "path": "./packages/trace-replay" + }, + { + "path": "./packages/host-bin" + } ] } |
