diff options
| author | Adam Malczewski <[email protected]> | 2026-06-06 21:13:58 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-06 21:13:58 +0900 |
| commit | fedf9c2695476e9ee6f95776b0244acfc37f022f (patch) | |
| tree | 313b2c39e1a7677f2d0997d74df8241986f3283a | |
| parent | 61b6e24c7abb4eebf94da0a0498a68a1bb8ba92e (diff) | |
| download | dispatch-fedf9c2695476e9ee6f95776b0244acfc37f022f.tar.gz dispatch-fedf9c2695476e9ee6f95776b0244acfc37f022f.zip | |
feat(transport-ws,transport-contract): multiplex chat ops onto the surface WS
Add chat WS ops (chat.send / chat.delta / chat.error) + unified
WsClientMessage/WsServerMessage unions to @dispatch/transport-contract
(imports ui-contract; surface protocol unchanged — additive non-colliding
type variants, no channel wrapper). transport-ws drives
sessionOrchestrator.handleMessage, streaming each AgentEvent as chat.delta
over the same connection that carries surface ops; per-connection
AbortController cancels in-flight turns on socket close; error-isolated.
Verified live: one WS connection delivered the surface catalog AND a real
flash chat turn (chat.delta stream, reply 'Hello my friend').
Completes the FE Slice 2 backend prereqs. typecheck clean, 485 vitest + 80 bun,
biome clean.
Discovered (separate, pre-existing): runtime does not emit
turn-start/done/turn-sealed on either transport — needed for FE cache-commit;
tracked in tasks.md.
| -rw-r--r-- | bun.lock | 3 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 1 | ||||
| -rw-r--r-- | packages/transport-contract/src/index.ts | 65 | ||||
| -rw-r--r-- | packages/transport-contract/tsconfig.json | 2 | ||||
| -rw-r--r-- | packages/transport-ws/package.json | 2 | ||||
| -rw-r--r-- | packages/transport-ws/src/extension.ts | 136 | ||||
| -rw-r--r-- | packages/transport-ws/src/index.ts | 7 | ||||
| -rw-r--r-- | packages/transport-ws/src/manifest.ts | 2 | ||||
| -rw-r--r-- | packages/transport-ws/src/router.test.ts | 81 | ||||
| -rw-r--r-- | packages/transport-ws/src/router.ts | 68 | ||||
| -rw-r--r-- | packages/transport-ws/src/server.bun.test.ts | 204 | ||||
| -rw-r--r-- | packages/transport-ws/tsconfig.json | 2 | ||||
| -rw-r--r-- | tasks.md | 38 |
13 files changed, 541 insertions, 70 deletions
@@ -143,6 +143,7 @@ "name": "@dispatch/transport-contract", "version": "0.0.0", "dependencies": { + "@dispatch/ui-contract": "workspace:*", "@dispatch/wire": "workspace:*", }, }, @@ -163,7 +164,9 @@ "version": "0.0.0", "dependencies": { "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*", "@dispatch/surface-registry": "workspace:*", + "@dispatch/transport-contract": "workspace:*", "@dispatch/ui-contract": "workspace:*", }, }, diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 2af6a73..03efc2b 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -6,6 +6,7 @@ "main": "dist/index.js", "types": "dist/index.d.ts", "dependencies": { + "@dispatch/ui-contract": "workspace:*", "@dispatch/wire": "workspace:*" } } diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index 9d8f6f4..cdddad6 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -1,18 +1,26 @@ /** - * Transport contract — the typed description of Dispatch's HTTP API. + * Transport contract — the typed description of Dispatch's client–server API + * (HTTP + WebSocket). * * This package is types-only (zero runtime). It is the single shared surface * every client imports to know how to talk to the backend — the CLI, the web - * frontend (in its own repo), any third-party client — and the transport-http - * server imports to know what it must accept and emit. + * frontend (in its own repo), any third-party client — and the transport-http / + * transport-ws servers import to know what they must accept and emit. * * Each side owns its OWN (de)serialization: there is deliberately no shared * parse/serialize helper here (isolation-over-DRY). The contract is the SHAPES, * not the codec. The streaming response payload is the kernel's `AgentEvent` * union, re-exported here so a client has one import for the whole wire. + * + * The WebSocket carries BOTH chat ops (defined here) and surface ops (defined in + * `@dispatch/ui-contract`) over one connection; the unified `WsClientMessage` / + * `WsServerMessage` unions below compose them. Chat ops are new, non-colliding + * `type` variants — there is no channel wrapper, so the shipped surface protocol + * is unchanged. */ -import type { StoredChunk } from "@dispatch/wire"; +import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; +import type { AgentEvent, StoredChunk } from "@dispatch/wire"; export type { AgentEvent, StoredChunk } from "@dispatch/wire"; @@ -83,3 +91,52 @@ export interface ConversationHistoryResponse { readonly chunks: readonly StoredChunk[]; readonly latestSeq: number; } + +// ─── WebSocket chat ops ─────────────────────────────────────────────────────── +// The persistent WS connection multiplexes chat ops (below) with surface ops +// (`@dispatch/ui-contract`). The unified unions at the bottom compose both. Chat +// `type`s are namespaced (`chat.*`) so they never collide with surface ones. + +/** + * Client → server: start or continue a turn over the WS connection. Carries the + * same fields as the HTTP `ChatRequest` (so one shape drives both transports); + * omit `conversationId` to start fresh — the resolved id arrives on the streamed + * `AgentEvent`s (each carries `conversationId`). + */ +export interface ChatSendMessage extends ChatRequest { + readonly type: "chat.send"; +} + +/** + * Server → client: one `AgentEvent` from an in-flight turn (text-delta, + * tool-call, usage, done, turn-sealed, …). The client folds these into its + * transcript exactly as it folds the HTTP NDJSON stream — same events, different + * carrier. + */ +export interface ChatDeltaMessage { + readonly type: "chat.delta"; + readonly event: AgentEvent; +} + +/** + * Server → client: a chat-scoped TRANSPORT error — e.g. a malformed `chat.send` + * or a failure before a turn could start. (Errors DURING a turn arrive as a + * `TurnErrorEvent` inside a `chat.delta`.) + */ +export interface ChatErrorMessage { + readonly type: "chat.error"; + readonly conversationId?: string; + readonly message: string; +} + +/** + * Every client → server WS message: surface ops (`@dispatch/ui-contract`) + chat + * ops. A server discriminates on `type`. + */ +export type WsClientMessage = SurfaceClientMessage | ChatSendMessage; + +/** + * Every server → client WS message: surface ops (`@dispatch/ui-contract`) + chat + * ops. A client discriminates on `type`. + */ +export type WsServerMessage = SurfaceServerMessage | ChatDeltaMessage | ChatErrorMessage; diff --git a/packages/transport-contract/tsconfig.json b/packages/transport-contract/tsconfig.json index a882987..5a5de0f 100644 --- a/packages/transport-contract/tsconfig.json +++ b/packages/transport-contract/tsconfig.json @@ -2,5 +2,5 @@ "extends": "../../tsconfig.base.json", "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, "include": ["src/**/*.ts"], - "references": [{ "path": "../wire" }] + "references": [{ "path": "../ui-contract" }, { "path": "../wire" }] } diff --git a/packages/transport-ws/package.json b/packages/transport-ws/package.json index dab8ebc..b600a98 100644 --- a/packages/transport-ws/package.json +++ b/packages/transport-ws/package.json @@ -7,7 +7,9 @@ "types": "dist/index.d.ts", "dependencies": { "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*", "@dispatch/surface-registry": "workspace:*", + "@dispatch/transport-contract": "workspace:*", "@dispatch/ui-contract": "workspace:*" } } diff --git a/packages/transport-ws/src/extension.ts b/packages/transport-ws/src/extension.ts index a18aefa..ddcb045 100644 --- a/packages/transport-ws/src/extension.ts +++ b/packages/transport-ws/src/extension.ts @@ -3,20 +3,24 @@ * * All decision logic lives in router.ts (pure, unit-tested). * This file handles I/O only: WS accept, JSON parse/stringify, - * provider.subscribe wiring, server lifecycle. + * provider.subscribe wiring, orchestrator turn-streaming, server lifecycle. */ import type { Extension, HostAPI } from "@dispatch/kernel"; +import type { SessionOrchestrator } from "@dispatch/session-orchestrator"; +import { sessionOrchestratorHandle } from "@dispatch/session-orchestrator"; import type { SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; import { surfaceRegistryHandle } from "@dispatch/surface-registry"; -import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; +import type { WsClientMessage, WsServerMessage } from "@dispatch/transport-contract"; import { manifest } from "./manifest.js"; import { catalogMessage, routeClientMessage } from "./router.js"; -/** Active provider subscriptions for a single WS connection. */ +/** Active provider subscriptions + chat abort controller for a single WS connection. */ interface ConnectionState { readonly subs: Set<string>; readonly providerDisposers: Map<string, () => void>; + /** AbortController cancelled when the socket closes — aborts in-flight turns. */ + readonly abortController: AbortController; } type Ws = Bun.ServerWebSocket<ConnectionState>; @@ -28,10 +32,11 @@ export function createTransportWsExtension(): Extension { manifest, async activate(host: HostAPI) { const registry: SurfaceRegistry = host.getService(surfaceRegistryHandle); + const orchestrator: SessionOrchestrator = host.getService(sessionOrchestratorHandle); const logger = host.logger; const port = host.config.get<number>("surfaceWsPort") ?? 24205; - function send(ws: Ws, msg: SurfaceServerMessage): void { + function send(ws: Ws, msg: WsServerMessage): void { try { ws.send(JSON.stringify(msg)); } catch { @@ -73,12 +78,48 @@ export function createTransportWsExtension(): Extension { } } + /** + * Drive one chat turn through the orchestrator. Error-isolated: + * a throw/reject sends a chat.error to the socket and never kills + * the connection or affects surface ops / other connections. + */ + async function handleChatTurn( + ws: Ws, + state: ConnectionState, + conversationId: string | undefined, + text: string, + model: string | undefined, + cwd: string | undefined, + ): Promise<void> { + const resolvedId = conversationId ?? crypto.randomUUID(); + try { + await orchestrator.handleMessage({ + conversationId: resolvedId, + text, + ...(model !== undefined ? { modelName: model } : {}), + ...(cwd !== undefined ? { cwd } : {}), + signal: state.abortController.signal, + onEvent(event) { + send(ws, { type: "chat.delta", event }); + }, + }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : "unknown orchestrator error"; + send(ws, { type: "chat.error", conversationId: resolvedId, message }); + logger.warn?.("transport-ws: chat turn failed", { + conversationId: resolvedId, + error: message, + }); + } + } + server = Bun.serve<ConnectionState>({ port, fetch(req, srv) { const initial: ConnectionState = { subs: new Set(), providerDisposers: new Map(), + abortController: new AbortController(), }; if (srv.upgrade(req, { data: initial })) return; return new Response("expected websocket", { status: 426 }); @@ -92,9 +133,9 @@ export function createTransportWsExtension(): Extension { const state = ws.data; if (!state) return; - let parsed: SurfaceClientMessage; + let parsed: WsClientMessage; try { - parsed = JSON.parse(String(message)) as SurfaceClientMessage; + parsed = JSON.parse(String(message)) as WsClientMessage; } catch { send(ws, { type: "error", message: "Invalid JSON" }); return; @@ -102,37 +143,66 @@ export function createTransportWsExtension(): Extension { const result = routeClientMessage(registry, state.subs, parsed); - // Apply sub change. - if (result.subChange) { - if (result.subChange.op === "add") { - state.subs.add(result.subChange.surfaceId); - const provider = registry.getSurface(result.subChange.surfaceId); - if (provider) { - subscribeToProvider(ws, provider, result.subChange.surfaceId, state); + switch (result.kind) { + case "surface": { + // Apply sub change. + if (result.subChange) { + if (result.subChange.op === "add") { + state.subs.add(result.subChange.surfaceId); + const provider = registry.getSurface(result.subChange.surfaceId); + if (provider) { + subscribeToProvider(ws, provider, result.subChange.surfaceId, state); + } + } else { + state.subs.delete(result.subChange.surfaceId); + unsubscribeFromProvider(state, result.subChange.surfaceId); + } } - } else { - state.subs.delete(result.subChange.surfaceId); - unsubscribeFromProvider(state, result.subChange.surfaceId); - } - } - // Send replies. - for (const reply of result.replies) { - send(ws, reply); - } + // Send replies. + for (const reply of result.replies) { + send(ws, reply); + } - // Perform invoke if signalled. - if (result.invoke) { - const provider = registry.getSurface(result.invoke.surfaceId); - if (provider) { - try { - const r = provider.invoke(result.invoke.actionId, result.invoke.payload); - if (r instanceof Promise) { - r.catch(() => {}); + // Perform invoke if signalled. + if (result.invoke) { + const provider = registry.getSurface(result.invoke.surfaceId); + if (provider) { + try { + const r = provider.invoke(result.invoke.actionId, result.invoke.payload); + if (r instanceof Promise) { + r.catch(() => {}); + } + } catch { + // Provider threw on invoke — log but don't kill the connection. + } } - } catch { - // Provider threw on invoke — log but don't kill the connection. } + break; + } + + case "chat": { + // Fire-and-forget the turn; errors are caught inside handleChatTurn. + void handleChatTurn( + ws, + state, + result.conversationId, + result.message, + result.model, + result.cwd, + ); + break; + } + + case "chat-error": { + send(ws, { + type: "chat.error", + ...(result.conversationId !== undefined + ? { conversationId: result.conversationId } + : {}), + message: result.errorMessage, + }); + break; } } }, @@ -140,6 +210,8 @@ export function createTransportWsExtension(): Extension { close(ws) { const state = ws.data; if (state) { + // Abort any in-flight chat turns. + state.abortController.abort(); for (const dispose of state.providerDisposers.values()) { dispose(); } diff --git a/packages/transport-ws/src/index.ts b/packages/transport-ws/src/index.ts index a93611f..f4355c0 100644 --- a/packages/transport-ws/src/index.ts +++ b/packages/transport-ws/src/index.ts @@ -1,4 +1,9 @@ export { createTransportWsExtension } from "./extension.js"; export { manifest } from "./manifest.js"; -export type { RouteResult } from "./router.js"; +export type { + ChatRouteError, + ChatRouteResult, + RouteResult, + SurfaceRouteResult, +} from "./router.js"; export { catalogMessage, routeClientMessage } from "./router.js"; diff --git a/packages/transport-ws/src/manifest.ts b/packages/transport-ws/src/manifest.ts index b0612e2..5058311 100644 --- a/packages/transport-ws/src/manifest.ts +++ b/packages/transport-ws/src/manifest.ts @@ -6,7 +6,7 @@ export const manifest: Manifest = { version: "0.0.0", apiVersion: "^0.1.0", trust: "bundled", - dependsOn: ["surface-registry"], + dependsOn: ["surface-registry", "session-orchestrator"], capabilities: { network: true }, contributes: { routes: ["/ws/surfaces"] }, activation: "eager", diff --git a/packages/transport-ws/src/router.test.ts b/packages/transport-ws/src/router.test.ts index 83496f3..ae76c5d 100644 --- a/packages/transport-ws/src/router.test.ts +++ b/packages/transport-ws/src/router.test.ts @@ -59,6 +59,8 @@ describe("routeClientMessage", () => { surfaceId: "a", }); + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); expect(result.replies).toHaveLength(1); expect(result.replies[0]).toEqual({ type: "surface", @@ -82,6 +84,8 @@ describe("routeClientMessage", () => { surfaceId: "a", }); + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); expect(result.replies).toHaveLength(1); expect(result.replies[0]?.type).toBe("surface"); expect(result.subChange).toBeUndefined(); @@ -96,6 +100,8 @@ describe("routeClientMessage", () => { surfaceId: "nonexistent", }); + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); expect(result.replies).toHaveLength(1); expect(result.replies[0]).toEqual({ type: "error", @@ -116,6 +122,8 @@ describe("routeClientMessage", () => { surfaceId: "a", }); + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); expect(result.replies).toHaveLength(0); expect(result.subChange).toEqual({ op: "remove", surfaceId: "a" }); }); @@ -129,6 +137,8 @@ describe("routeClientMessage", () => { surfaceId: "a", }); + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); expect(result.replies).toHaveLength(0); expect(result.subChange).toEqual({ op: "remove", surfaceId: "a" }); }); @@ -147,6 +157,8 @@ describe("routeClientMessage", () => { payload: true, }); + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); expect(result.replies).toHaveLength(0); expect(result.invoke).toEqual({ surfaceId: "a", @@ -165,6 +177,8 @@ describe("routeClientMessage", () => { actionId: "toggle", }); + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); expect(result.replies).toHaveLength(1); expect(result.replies[0]).toEqual({ type: "error", @@ -174,6 +188,73 @@ describe("routeClientMessage", () => { expect(result.invoke).toBeUndefined(); }); }); + + describe("chat.send", () => { + it("classifies a chat.send message", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "chat.send", + message: "hello", + }); + + expect(result.kind).toBe("chat"); + if (result.kind !== "chat") throw new Error("expected chat"); + expect(result.message).toBe("hello"); + expect(result.conversationId).toBeUndefined(); + expect(result.model).toBeUndefined(); + expect(result.cwd).toBeUndefined(); + }); + + it("passes through optional fields", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "chat.send", + conversationId: "conv-123", + message: "follow up", + model: "gpt-4", + cwd: "/tmp", + }); + + expect(result.kind).toBe("chat"); + if (result.kind !== "chat") throw new Error("expected chat"); + expect(result.conversationId).toBe("conv-123"); + expect(result.message).toBe("follow up"); + expect(result.model).toBe("gpt-4"); + expect(result.cwd).toBe("/tmp"); + }); + + it("rejects a malformed chat.send (empty message)", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "chat.send", + message: "", + }); + + expect(result.kind).toBe("chat-error"); + if (result.kind !== "chat-error") throw new Error("expected chat-error"); + expect(result.errorMessage).toContain("non-empty string"); + }); + + it("rejects a malformed chat.send (missing message)", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "chat.send", + message: undefined as unknown as string, + }); + + expect(result.kind).toBe("chat-error"); + if (result.kind !== "chat-error") throw new Error("expected chat-error"); + expect(result.errorMessage).toContain("non-empty string"); + }); + }); }); describe("catalogMessage", () => { diff --git a/packages/transport-ws/src/router.ts b/packages/transport-ws/src/router.ts index f9a7a82..1a90e86 100644 --- a/packages/transport-ws/src/router.ts +++ b/packages/transport-ws/src/router.ts @@ -4,16 +4,18 @@ * Zero I/O, zero ambient state. Every function is `input → output`: * it decides what to do but does NOT do it. The shell (extension.ts) * interprets the result: sends WS messages, mutates connSubs, calls - * provider.invoke. + * provider.invoke, drives the orchestrator. */ import type { SurfaceRegistry } from "@dispatch/surface-registry"; -import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; +import type { ChatSendMessage, WsClientMessage } from "@dispatch/transport-contract"; +import type { SurfaceServerMessage } from "@dispatch/ui-contract"; // ── Result types ──────────────────────────────────────────────────────────── -/** The effect a single client message should produce. */ -export interface RouteResult { +/** The effect a surface client message should produce. */ +export interface SurfaceRouteResult { + readonly kind: "surface"; /** Server messages to send back to this connection. */ readonly replies: readonly SurfaceServerMessage[]; /** Whether to add or remove the surface id from connSubs. */ @@ -26,6 +28,25 @@ export interface RouteResult { }; } +/** The effect a validated chat.send should produce. */ +export interface ChatRouteResult { + readonly kind: "chat"; + readonly conversationId: string | undefined; + readonly message: string; + readonly model: string | undefined; + readonly cwd: string | undefined; +} + +/** A malformed chat.send that should yield a chat.error reply. */ +export interface ChatRouteError { + readonly kind: "chat-error"; + readonly conversationId: string | undefined; + readonly errorMessage: string; +} + +/** The effect any client WS message should produce. */ +export type RouteResult = SurfaceRouteResult | ChatRouteResult | ChatRouteError; + // ── Helpers ───────────────────────────────────────────────────────────────── /** Build the catalog `SurfaceServerMessage` from the registry. */ @@ -40,12 +61,12 @@ export function catalogMessage(registry: SurfaceRegistry): SurfaceServerMessage * * @param registry The surface registry (looked up once, injected). * @param connSubs This connection's current subscribed surface ids. - * @param msg The parsed client message. + * @param msg The parsed client message (surface or chat). */ export function routeClientMessage( registry: SurfaceRegistry, connSubs: ReadonlySet<string>, - msg: SurfaceClientMessage, + msg: WsClientMessage, ): RouteResult { switch (msg.type) { case "subscribe": @@ -54,19 +75,41 @@ export function routeClientMessage( return handleUnsubscribe(msg.surfaceId); case "invoke": return handleInvoke(registry, msg.surfaceId, msg.actionId, msg.payload); + case "chat.send": + return handleChatSend(msg); } } +// ── Chat validation ───────────────────────────────────────────────────────── + +function handleChatSend(msg: ChatSendMessage): ChatRouteResult | ChatRouteError { + if (typeof msg.message !== "string" || msg.message.length === 0) { + return { + kind: "chat-error", + conversationId: msg.conversationId, + errorMessage: "chat.send requires a non-empty string `message`", + }; + } + return { + kind: "chat", + conversationId: msg.conversationId, + message: msg.message, + model: msg.model, + cwd: msg.cwd, + }; +} + // ── Per-message handlers ──────────────────────────────────────────────────── function handleSubscribe( registry: SurfaceRegistry, connSubs: ReadonlySet<string>, surfaceId: string, -): RouteResult { +): SurfaceRouteResult { const provider = registry.getSurface(surfaceId); if (!provider) { return { + kind: "surface", replies: [{ type: "error", surfaceId, message: `Unknown surface: ${surfaceId}` }], }; } @@ -85,13 +128,14 @@ function handleSubscribe( // Idempotent: only emit subChange if not already subscribed. if (!connSubs.has(surfaceId)) { - return { replies, subChange: { op: "add", surfaceId } }; + return { kind: "surface", replies, subChange: { op: "add", surfaceId } }; } - return { replies }; + return { kind: "surface", replies }; } -function handleUnsubscribe(surfaceId: string): RouteResult { +function handleUnsubscribe(surfaceId: string): SurfaceRouteResult { return { + kind: "surface", replies: [], subChange: { op: "remove", surfaceId }, }; @@ -102,14 +146,16 @@ function handleInvoke( surfaceId: string, actionId: string, payload?: unknown, -): RouteResult { +): SurfaceRouteResult { const provider = registry.getSurface(surfaceId); if (!provider) { return { + kind: "surface", replies: [{ type: "error", surfaceId, message: `Unknown surface: ${surfaceId}` }], }; } return { + kind: "surface", replies: [], invoke: { surfaceId, actionId, payload }, }; diff --git a/packages/transport-ws/src/server.bun.test.ts b/packages/transport-ws/src/server.bun.test.ts index d51eb72..8c16f72 100644 --- a/packages/transport-ws/src/server.bun.test.ts +++ b/packages/transport-ws/src/server.bun.test.ts @@ -1,11 +1,9 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import type { AgentEvent } from "@dispatch/kernel"; +import type { SessionOrchestrator } from "@dispatch/session-orchestrator"; import type { SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; -import type { - SurfaceCatalogEntry, - SurfaceClientMessage, - SurfaceServerMessage, - SurfaceSpec, -} from "@dispatch/ui-contract"; +import type { WsServerMessage } from "@dispatch/transport-contract"; +import type { SurfaceCatalogEntry, SurfaceClientMessage, SurfaceSpec } from "@dispatch/ui-contract"; import { catalogMessage, routeClientMessage } from "./router.js"; // ── Fake registry (same pattern as router.test.ts) ────────────────────────── @@ -45,22 +43,30 @@ function fakeRegistry(providers: readonly SurfaceProvider[]): SurfaceRegistry { }; } +function fakeOrchestrator(handler?: SessionOrchestrator["handleMessage"]): SessionOrchestrator { + return { + handleMessage: handler ?? (async () => {}), + }; +} + // ── Per-connection state (mirrors extension.ts) ───────────────────────────── interface ConnectionState { readonly subs: Set<string>; readonly providerDisposers: Map<string, () => void>; + readonly abortController: AbortController; } // ── Server helper ─────────────────────────────────────────────────────────── -function startServer(registry: SurfaceRegistry, port = 0) { +function startServer(registry: SurfaceRegistry, orchestrator: SessionOrchestrator, port = 0) { return Bun.serve<ConnectionState>({ port, fetch(req, srv) { const initial: ConnectionState = { subs: new Set(), providerDisposers: new Map(), + abortController: new AbortController(), }; if (srv.upgrade(req, { data: initial })) return; return new Response("expected websocket", { status: 426 }); @@ -84,22 +90,67 @@ function startServer(registry: SurfaceRegistry, port = 0) { const result = routeClientMessage(registry, state.subs, parsed); - if (result.subChange) { - if (result.subChange.op === "add") { - state.subs.add(result.subChange.surfaceId); - } else { - state.subs.delete(result.subChange.surfaceId); + switch (result.kind) { + case "surface": { + if (result.subChange) { + if (result.subChange.op === "add") { + state.subs.add(result.subChange.surfaceId); + } else { + state.subs.delete(result.subChange.surfaceId); + } + } + + for (const reply of result.replies) { + ws.send(JSON.stringify(reply)); + } + break; + } + + case "chat": { + const resolvedId = result.conversationId ?? crypto.randomUUID(); + void (async () => { + try { + await orchestrator.handleMessage({ + conversationId: resolvedId, + text: result.message, + ...(result.model !== undefined ? { modelName: result.model } : {}), + ...(result.cwd !== undefined ? { cwd: result.cwd } : {}), + signal: state.abortController.signal, + onEvent(event) { + ws.send(JSON.stringify({ type: "chat.delta", event })); + }, + }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : "unknown orchestrator error"; + ws.send( + JSON.stringify({ + type: "chat.error", + conversationId: resolvedId, + message, + }), + ); + } + })(); + break; } - } - for (const reply of result.replies) { - ws.send(JSON.stringify(reply)); + case "chat-error": { + ws.send( + JSON.stringify({ + type: "chat.error", + conversationId: result.conversationId, + message: result.errorMessage, + }), + ); + break; + } } }, close(ws) { const state = ws.data; if (state) { + state.abortController.abort(); for (const dispose of state.providerDisposers.values()) { dispose(); } @@ -111,13 +162,32 @@ function startServer(registry: SurfaceRegistry, port = 0) { // ── Helpers ───────────────────────────────────────────────────────────────── -function waitForMessage(ws: WebSocket): Promise<SurfaceServerMessage> { +function waitForMessage(ws: WebSocket): Promise<WsServerMessage> { return new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error("timed out waiting for message")), 5000); function handler(ev: MessageEvent) { clearTimeout(timeout); ws.removeEventListener("message", handler); - resolve(JSON.parse(ev.data as string) as SurfaceServerMessage); + resolve(JSON.parse(ev.data as string) as WsServerMessage); + } + ws.addEventListener("message", handler); + }); +} + +function waitForMessages(ws: WebSocket, count: number): Promise<WsServerMessage[]> { + return new Promise((resolve, reject) => { + const msgs: WsServerMessage[] = []; + const timeout = setTimeout( + () => reject(new Error(`timed out waiting for ${count} messages (got ${msgs.length})`)), + 5000, + ); + function handler(ev: MessageEvent) { + msgs.push(JSON.parse(ev.data as string) as WsServerMessage); + if (msgs.length === count) { + clearTimeout(timeout); + ws.removeEventListener("message", handler); + resolve(msgs); + } } ws.addEventListener("message", handler); }); @@ -128,11 +198,12 @@ function waitForMessage(ws: WebSocket): Promise<SurfaceServerMessage> { describe("Bun.serve WebSocket server", () => { let server: ReturnType<typeof Bun.serve>; let port: number; + const defaultOrchestrator = fakeOrchestrator(); beforeEach(() => { const provider = fakeProvider("demo", "Demo Surface"); const registry = fakeRegistry([provider]); - server = startServer(registry); + server = startServer(registry, defaultOrchestrator); port = server.port as number; }); @@ -193,3 +264,100 @@ describe("Bun.serve WebSocket server", () => { expect(await res.text()).toBe("expected websocket"); }); }); + +describe("chat ops", () => { + let server: ReturnType<typeof Bun.serve>; + let port: number; + + afterEach(() => { + server.stop(); + }); + + test("chat.send streams AgentEvents back as chat.delta in order", async () => { + const events: AgentEvent[] = [ + { type: "turn-start", conversationId: "c1", turnId: "t1" } as AgentEvent, + { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "Hello" } as AgentEvent, + { type: "text-delta", conversationId: "c1", turnId: "t1", delta: " world" } as AgentEvent, + { type: "done", conversationId: "c1", turnId: "t1" } as AgentEvent, + { type: "turn-sealed", conversationId: "c1", turnId: "t1" } as AgentEvent, + ]; + + const orchestrator = fakeOrchestrator(async (input) => { + for (const event of events) { + input.onEvent(event); + } + }); + + const registry = fakeRegistry([]); + server = startServer(registry, orchestrator); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + ws.send(JSON.stringify({ type: "chat.send", message: "hi" })); + + const msgs = await waitForMessages(ws, events.length); + + for (let i = 0; i < events.length; i++) { + const msg = msgs[i]; + const expected = events[i]; + if (!msg || !expected) throw new Error(`missing at index ${i}`); + expect(msg.type).toBe("chat.delta"); + if (msg.type === "chat.delta") { + expect(msg.event).toEqual(expected); + } + } + + ws.close(); + }); + + test("chat orchestrator failure yields chat.error without crashing the connection", async () => { + const orchestrator = fakeOrchestrator(async () => { + throw new Error("boom"); + }); + + const registry = fakeRegistry([fakeProvider("demo", "Demo")]); + server = startServer(registry, orchestrator); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + // Send a chat.send that will fail + ws.send(JSON.stringify({ type: "chat.send", message: "trigger failure" })); + const errMsg = await waitForMessage(ws); + + expect(errMsg.type).toBe("chat.error"); + if (errMsg.type === "chat.error") { + expect(errMsg.message).toBe("boom"); + } + + // Socket must still be alive — send a surface subscribe to prove it + ws.send(JSON.stringify({ type: "subscribe", surfaceId: "demo" })); + const surfaceMsg = await waitForMessage(ws); + expect(surfaceMsg.type).toBe("surface"); + + ws.close(); + }); + + test("chat.send with empty message yields chat.error (pure router rejection)", async () => { + const orchestrator = fakeOrchestrator(); + const registry = fakeRegistry([]); + server = startServer(registry, orchestrator); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + ws.send(JSON.stringify({ type: "chat.send", message: "" })); + const errMsg = await waitForMessage(ws); + + expect(errMsg.type).toBe("chat.error"); + if (errMsg.type === "chat.error") { + expect(errMsg.message).toContain("non-empty string"); + } + + ws.close(); + }); +}); diff --git a/packages/transport-ws/tsconfig.json b/packages/transport-ws/tsconfig.json index 102c8f0..2a1d7ab 100644 --- a/packages/transport-ws/tsconfig.json +++ b/packages/transport-ws/tsconfig.json @@ -4,7 +4,9 @@ "include": ["src/**/*.ts"], "references": [ { "path": "../kernel" }, + { "path": "../session-orchestrator" }, { "path": "../surface-registry" }, + { "path": "../transport-contract" }, { "path": "../ui-contract" } ] } @@ -413,8 +413,42 @@ streaming. Spans both repos; the backend prereqs live HERE (FE work runs in `../ - **Verified (orchestrator):** typecheck clean, **481 vitest** (469→+12), biome clean, no internal `@dispatch/*` mocks, in-lane. Live boot-probe deferred to the WS step (this GET route has no effectful-shell surprise surface; host wiring mirrors `/chat`). -- [ ] **WS turn-deltas** — `transport-ws` multiplexes `sendMessage`/`onDelta(AgentEvent)` - alongside surface ops (one connection carries both; frontend-design §5). +- [x] **WS turn-deltas** — `transport-ws` multiplexes chat ops alongside surface ops. DONE + verified live. + Design (user-confirmed): chat WS ops live in `@dispatch/transport-contract` (NOT ui-contract — + it stays surface-only/zero-`@dispatch`-deps); new non-colliding `type` variants (`chat.send` + client; `chat.delta`/`chat.error` server) widen unified `WsClientMessage`/`WsServerMessage` + unions — NO channel wrapper, so the shipped slice-1 surface protocol is byte-identical. + - **Contract + wiring (orchestrator):** added `ChatSendMessage`/`ChatDeltaMessage`/ + `ChatErrorMessage` + `WsClientMessage`/`WsServerMessage` to transport-contract (now imports + `ui-contract`; doc broadened to "HTTP + WebSocket"); deps/refs for transport-contract→ui-contract + and transport-ws→{session-orchestrator,transport-contract}; `bun install`. + - **transport-ws (owner, mimo-v2.5-pro):** pure `routeClientMessage` extended to the full union + (`kind:"chat"|"chat-error"|"surface"`); shell drives `sessionOrchestratorHandle.handleMessage`, + streaming each `AgentEvent` as `{type:"chat.delta",event}` via `onEvent`; per-connection + `AbortController` aborted on socket close (no leaked turns); error-isolated (`chat.error`, + never crashes the connection). `dependsOn:["session-orchestrator"]`. +4 router + 3 bun:test + integration (real `Bun.serve` WS + fake orchestrator). prompts/ws-turn-deltas.md, reports/transport-ws.md. + - **Verified (orchestrator):** typecheck clean, **485 vitest** (481→+4) + **80 bun** (77→+3), + biome clean, in-lane. **Live (host-bin :24203 HTTP / :24205 WS, real flash):** one WS connection + delivered `catalog` (surface op) AND a real chat turn — `chat.delta` streamed + reasoning-delta×37 → text-delta×4 → usage, reply "Hello my friend". No leaked procs after + bracket-trick cleanup. + - **Probe-artifact lesson (scar tissue):** the first probe reported FAIL because it asserted + `turn-start`/`done`/`turn-sealed` frames — but the runtime emits NONE of those (see open item). + transport-ws faithfully forwarded exactly what `onEvent` delivered; the failure was bad probe + criteria, NOT a transport bug (cf. slice-1 "10 vs 11 extensions" artifact). Verified by + confirming HTTP `/chat` emits the identical event set. + +#### Open item discovered live — runtime turn-lifecycle events NOT emitted (BLOCKS FE cache-commit) +The runtime/orchestrator emits `reasoning-delta`/`text-delta`/`usage` but **NOT `turn-start`, +`done`, or `turn-sealed`** through `emit`/`onEvent` — on EITHER transport (HTTP `/chat` + WS chat +both confirmed). The wire DEFINES these (`TurnStartEvent`/`TurnDoneEvent`/`TurnSealedEvent`) but +`runTurn` doesn't fire them. **`turn-sealed` is the FE's cache-commit signal (frontend-design §6.3 +— "below the last seal is immutable + cacheable"); `done` ends the stream.** Needs a +kernel-runtime (and/or session-orchestrator) emission fix BEFORE the FE chat slice can commit +turns to its cache. Diagnose-from-symptoms done; fix = summon the owning unit. NOT a transport-ws +defect. + Then FE (`../dispatch-web`): `core/transcript` reducer + `conversation-cache` + `chat` feature. ### 3. dedup / storage growth (after frontend) |
