summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--bun.lock3
-rw-r--r--packages/transport-contract/package.json1
-rw-r--r--packages/transport-contract/src/index.ts65
-rw-r--r--packages/transport-contract/tsconfig.json2
-rw-r--r--packages/transport-ws/package.json2
-rw-r--r--packages/transport-ws/src/extension.ts136
-rw-r--r--packages/transport-ws/src/index.ts7
-rw-r--r--packages/transport-ws/src/manifest.ts2
-rw-r--r--packages/transport-ws/src/router.test.ts81
-rw-r--r--packages/transport-ws/src/router.ts68
-rw-r--r--packages/transport-ws/src/server.bun.test.ts204
-rw-r--r--packages/transport-ws/tsconfig.json2
-rw-r--r--tasks.md38
13 files changed, 541 insertions, 70 deletions
diff --git a/bun.lock b/bun.lock
index b8f9bde..dd7c6ff 100644
--- a/bun.lock
+++ b/bun.lock
@@ -143,6 +143,7 @@
"name": "@dispatch/transport-contract",
"version": "0.0.0",
"dependencies": {
+ "@dispatch/ui-contract": "workspace:*",
"@dispatch/wire": "workspace:*",
},
},
@@ -163,7 +164,9 @@
"version": "0.0.0",
"dependencies": {
"@dispatch/kernel": "workspace:*",
+ "@dispatch/session-orchestrator": "workspace:*",
"@dispatch/surface-registry": "workspace:*",
+ "@dispatch/transport-contract": "workspace:*",
"@dispatch/ui-contract": "workspace:*",
},
},
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index 2af6a73..03efc2b 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -6,6 +6,7 @@
"main": "dist/index.js",
"types": "dist/index.d.ts",
"dependencies": {
+ "@dispatch/ui-contract": "workspace:*",
"@dispatch/wire": "workspace:*"
}
}
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index 9d8f6f4..cdddad6 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -1,18 +1,26 @@
/**
- * Transport contract — the typed description of Dispatch's HTTP API.
+ * Transport contract — the typed description of Dispatch's client–server API
+ * (HTTP + WebSocket).
*
* This package is types-only (zero runtime). It is the single shared surface
* every client imports to know how to talk to the backend — the CLI, the web
- * frontend (in its own repo), any third-party client — and the transport-http
- * server imports to know what it must accept and emit.
+ * frontend (in its own repo), any third-party client — and the transport-http /
+ * transport-ws servers import to know what they must accept and emit.
*
* Each side owns its OWN (de)serialization: there is deliberately no shared
* parse/serialize helper here (isolation-over-DRY). The contract is the SHAPES,
* not the codec. The streaming response payload is the kernel's `AgentEvent`
* union, re-exported here so a client has one import for the whole wire.
+ *
+ * The WebSocket carries BOTH chat ops (defined here) and surface ops (defined in
+ * `@dispatch/ui-contract`) over one connection; the unified `WsClientMessage` /
+ * `WsServerMessage` unions below compose them. Chat ops are new, non-colliding
+ * `type` variants — there is no channel wrapper, so the shipped surface protocol
+ * is unchanged.
*/
-import type { StoredChunk } from "@dispatch/wire";
+import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
+import type { AgentEvent, StoredChunk } from "@dispatch/wire";
export type { AgentEvent, StoredChunk } from "@dispatch/wire";
@@ -83,3 +91,52 @@ export interface ConversationHistoryResponse {
readonly chunks: readonly StoredChunk[];
readonly latestSeq: number;
}
+
+// ─── WebSocket chat ops ───────────────────────────────────────────────────────
+// The persistent WS connection multiplexes chat ops (below) with surface ops
+// (`@dispatch/ui-contract`). The unified unions at the bottom compose both. Chat
+// `type`s are namespaced (`chat.*`) so they never collide with surface ones.
+
+/**
+ * Client → server: start or continue a turn over the WS connection. Carries the
+ * same fields as the HTTP `ChatRequest` (so one shape drives both transports);
+ * omit `conversationId` to start fresh — the resolved id arrives on the streamed
+ * `AgentEvent`s (each carries `conversationId`).
+ */
+export interface ChatSendMessage extends ChatRequest {
+ readonly type: "chat.send";
+}
+
+/**
+ * Server → client: one `AgentEvent` from an in-flight turn (text-delta,
+ * tool-call, usage, done, turn-sealed, …). The client folds these into its
+ * transcript exactly as it folds the HTTP NDJSON stream — same events, different
+ * carrier.
+ */
+export interface ChatDeltaMessage {
+ readonly type: "chat.delta";
+ readonly event: AgentEvent;
+}
+
+/**
+ * Server → client: a chat-scoped TRANSPORT error — e.g. a malformed `chat.send`
+ * or a failure before a turn could start. (Errors DURING a turn arrive as a
+ * `TurnErrorEvent` inside a `chat.delta`.)
+ */
+export interface ChatErrorMessage {
+ readonly type: "chat.error";
+ readonly conversationId?: string;
+ readonly message: string;
+}
+
+/**
+ * Every client → server WS message: surface ops (`@dispatch/ui-contract`) + chat
+ * ops. A server discriminates on `type`.
+ */
+export type WsClientMessage = SurfaceClientMessage | ChatSendMessage;
+
+/**
+ * Every server → client WS message: surface ops (`@dispatch/ui-contract`) + chat
+ * ops. A client discriminates on `type`.
+ */
+export type WsServerMessage = SurfaceServerMessage | ChatDeltaMessage | ChatErrorMessage;
diff --git a/packages/transport-contract/tsconfig.json b/packages/transport-contract/tsconfig.json
index a882987..5a5de0f 100644
--- a/packages/transport-contract/tsconfig.json
+++ b/packages/transport-contract/tsconfig.json
@@ -2,5 +2,5 @@
"extends": "../../tsconfig.base.json",
"compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true },
"include": ["src/**/*.ts"],
- "references": [{ "path": "../wire" }]
+ "references": [{ "path": "../ui-contract" }, { "path": "../wire" }]
}
diff --git a/packages/transport-ws/package.json b/packages/transport-ws/package.json
index dab8ebc..b600a98 100644
--- a/packages/transport-ws/package.json
+++ b/packages/transport-ws/package.json
@@ -7,7 +7,9 @@
"types": "dist/index.d.ts",
"dependencies": {
"@dispatch/kernel": "workspace:*",
+ "@dispatch/session-orchestrator": "workspace:*",
"@dispatch/surface-registry": "workspace:*",
+ "@dispatch/transport-contract": "workspace:*",
"@dispatch/ui-contract": "workspace:*"
}
}
diff --git a/packages/transport-ws/src/extension.ts b/packages/transport-ws/src/extension.ts
index a18aefa..ddcb045 100644
--- a/packages/transport-ws/src/extension.ts
+++ b/packages/transport-ws/src/extension.ts
@@ -3,20 +3,24 @@
*
* All decision logic lives in router.ts (pure, unit-tested).
* This file handles I/O only: WS accept, JSON parse/stringify,
- * provider.subscribe wiring, server lifecycle.
+ * provider.subscribe wiring, orchestrator turn-streaming, server lifecycle.
*/
import type { Extension, HostAPI } from "@dispatch/kernel";
+import type { SessionOrchestrator } from "@dispatch/session-orchestrator";
+import { sessionOrchestratorHandle } from "@dispatch/session-orchestrator";
import type { SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry";
import { surfaceRegistryHandle } from "@dispatch/surface-registry";
-import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
+import type { WsClientMessage, WsServerMessage } from "@dispatch/transport-contract";
import { manifest } from "./manifest.js";
import { catalogMessage, routeClientMessage } from "./router.js";
-/** Active provider subscriptions for a single WS connection. */
+/** Active provider subscriptions + chat abort controller for a single WS connection. */
interface ConnectionState {
readonly subs: Set<string>;
readonly providerDisposers: Map<string, () => void>;
+ /** AbortController cancelled when the socket closes — aborts in-flight turns. */
+ readonly abortController: AbortController;
}
type Ws = Bun.ServerWebSocket<ConnectionState>;
@@ -28,10 +32,11 @@ export function createTransportWsExtension(): Extension {
manifest,
async activate(host: HostAPI) {
const registry: SurfaceRegistry = host.getService(surfaceRegistryHandle);
+ const orchestrator: SessionOrchestrator = host.getService(sessionOrchestratorHandle);
const logger = host.logger;
const port = host.config.get<number>("surfaceWsPort") ?? 24205;
- function send(ws: Ws, msg: SurfaceServerMessage): void {
+ function send(ws: Ws, msg: WsServerMessage): void {
try {
ws.send(JSON.stringify(msg));
} catch {
@@ -73,12 +78,48 @@ export function createTransportWsExtension(): Extension {
}
}
+ /**
+ * Drive one chat turn through the orchestrator. Error-isolated:
+ * a throw/reject sends a chat.error to the socket and never kills
+ * the connection or affects surface ops / other connections.
+ */
+ async function handleChatTurn(
+ ws: Ws,
+ state: ConnectionState,
+ conversationId: string | undefined,
+ text: string,
+ model: string | undefined,
+ cwd: string | undefined,
+ ): Promise<void> {
+ const resolvedId = conversationId ?? crypto.randomUUID();
+ try {
+ await orchestrator.handleMessage({
+ conversationId: resolvedId,
+ text,
+ ...(model !== undefined ? { modelName: model } : {}),
+ ...(cwd !== undefined ? { cwd } : {}),
+ signal: state.abortController.signal,
+ onEvent(event) {
+ send(ws, { type: "chat.delta", event });
+ },
+ });
+ } catch (err: unknown) {
+ const message = err instanceof Error ? err.message : "unknown orchestrator error";
+ send(ws, { type: "chat.error", conversationId: resolvedId, message });
+ logger.warn?.("transport-ws: chat turn failed", {
+ conversationId: resolvedId,
+ error: message,
+ });
+ }
+ }
+
server = Bun.serve<ConnectionState>({
port,
fetch(req, srv) {
const initial: ConnectionState = {
subs: new Set(),
providerDisposers: new Map(),
+ abortController: new AbortController(),
};
if (srv.upgrade(req, { data: initial })) return;
return new Response("expected websocket", { status: 426 });
@@ -92,9 +133,9 @@ export function createTransportWsExtension(): Extension {
const state = ws.data;
if (!state) return;
- let parsed: SurfaceClientMessage;
+ let parsed: WsClientMessage;
try {
- parsed = JSON.parse(String(message)) as SurfaceClientMessage;
+ parsed = JSON.parse(String(message)) as WsClientMessage;
} catch {
send(ws, { type: "error", message: "Invalid JSON" });
return;
@@ -102,37 +143,66 @@ export function createTransportWsExtension(): Extension {
const result = routeClientMessage(registry, state.subs, parsed);
- // Apply sub change.
- if (result.subChange) {
- if (result.subChange.op === "add") {
- state.subs.add(result.subChange.surfaceId);
- const provider = registry.getSurface(result.subChange.surfaceId);
- if (provider) {
- subscribeToProvider(ws, provider, result.subChange.surfaceId, state);
+ switch (result.kind) {
+ case "surface": {
+ // Apply sub change.
+ if (result.subChange) {
+ if (result.subChange.op === "add") {
+ state.subs.add(result.subChange.surfaceId);
+ const provider = registry.getSurface(result.subChange.surfaceId);
+ if (provider) {
+ subscribeToProvider(ws, provider, result.subChange.surfaceId, state);
+ }
+ } else {
+ state.subs.delete(result.subChange.surfaceId);
+ unsubscribeFromProvider(state, result.subChange.surfaceId);
+ }
}
- } else {
- state.subs.delete(result.subChange.surfaceId);
- unsubscribeFromProvider(state, result.subChange.surfaceId);
- }
- }
- // Send replies.
- for (const reply of result.replies) {
- send(ws, reply);
- }
+ // Send replies.
+ for (const reply of result.replies) {
+ send(ws, reply);
+ }
- // Perform invoke if signalled.
- if (result.invoke) {
- const provider = registry.getSurface(result.invoke.surfaceId);
- if (provider) {
- try {
- const r = provider.invoke(result.invoke.actionId, result.invoke.payload);
- if (r instanceof Promise) {
- r.catch(() => {});
+ // Perform invoke if signalled.
+ if (result.invoke) {
+ const provider = registry.getSurface(result.invoke.surfaceId);
+ if (provider) {
+ try {
+ const r = provider.invoke(result.invoke.actionId, result.invoke.payload);
+ if (r instanceof Promise) {
+ r.catch(() => {});
+ }
+ } catch {
+ // Provider threw on invoke — log but don't kill the connection.
+ }
}
- } catch {
- // Provider threw on invoke — log but don't kill the connection.
}
+ break;
+ }
+
+ case "chat": {
+ // Fire-and-forget the turn; errors are caught inside handleChatTurn.
+ void handleChatTurn(
+ ws,
+ state,
+ result.conversationId,
+ result.message,
+ result.model,
+ result.cwd,
+ );
+ break;
+ }
+
+ case "chat-error": {
+ send(ws, {
+ type: "chat.error",
+ ...(result.conversationId !== undefined
+ ? { conversationId: result.conversationId }
+ : {}),
+ message: result.errorMessage,
+ });
+ break;
}
}
},
@@ -140,6 +210,8 @@ export function createTransportWsExtension(): Extension {
close(ws) {
const state = ws.data;
if (state) {
+ // Abort any in-flight chat turns.
+ state.abortController.abort();
for (const dispose of state.providerDisposers.values()) {
dispose();
}
diff --git a/packages/transport-ws/src/index.ts b/packages/transport-ws/src/index.ts
index a93611f..f4355c0 100644
--- a/packages/transport-ws/src/index.ts
+++ b/packages/transport-ws/src/index.ts
@@ -1,4 +1,9 @@
export { createTransportWsExtension } from "./extension.js";
export { manifest } from "./manifest.js";
-export type { RouteResult } from "./router.js";
+export type {
+ ChatRouteError,
+ ChatRouteResult,
+ RouteResult,
+ SurfaceRouteResult,
+} from "./router.js";
export { catalogMessage, routeClientMessage } from "./router.js";
diff --git a/packages/transport-ws/src/manifest.ts b/packages/transport-ws/src/manifest.ts
index b0612e2..5058311 100644
--- a/packages/transport-ws/src/manifest.ts
+++ b/packages/transport-ws/src/manifest.ts
@@ -6,7 +6,7 @@ export const manifest: Manifest = {
version: "0.0.0",
apiVersion: "^0.1.0",
trust: "bundled",
- dependsOn: ["surface-registry"],
+ dependsOn: ["surface-registry", "session-orchestrator"],
capabilities: { network: true },
contributes: { routes: ["/ws/surfaces"] },
activation: "eager",
diff --git a/packages/transport-ws/src/router.test.ts b/packages/transport-ws/src/router.test.ts
index 83496f3..ae76c5d 100644
--- a/packages/transport-ws/src/router.test.ts
+++ b/packages/transport-ws/src/router.test.ts
@@ -59,6 +59,8 @@ describe("routeClientMessage", () => {
surfaceId: "a",
});
+ expect(result.kind).toBe("surface");
+ if (result.kind !== "surface") throw new Error("expected surface");
expect(result.replies).toHaveLength(1);
expect(result.replies[0]).toEqual({
type: "surface",
@@ -82,6 +84,8 @@ describe("routeClientMessage", () => {
surfaceId: "a",
});
+ expect(result.kind).toBe("surface");
+ if (result.kind !== "surface") throw new Error("expected surface");
expect(result.replies).toHaveLength(1);
expect(result.replies[0]?.type).toBe("surface");
expect(result.subChange).toBeUndefined();
@@ -96,6 +100,8 @@ describe("routeClientMessage", () => {
surfaceId: "nonexistent",
});
+ expect(result.kind).toBe("surface");
+ if (result.kind !== "surface") throw new Error("expected surface");
expect(result.replies).toHaveLength(1);
expect(result.replies[0]).toEqual({
type: "error",
@@ -116,6 +122,8 @@ describe("routeClientMessage", () => {
surfaceId: "a",
});
+ expect(result.kind).toBe("surface");
+ if (result.kind !== "surface") throw new Error("expected surface");
expect(result.replies).toHaveLength(0);
expect(result.subChange).toEqual({ op: "remove", surfaceId: "a" });
});
@@ -129,6 +137,8 @@ describe("routeClientMessage", () => {
surfaceId: "a",
});
+ expect(result.kind).toBe("surface");
+ if (result.kind !== "surface") throw new Error("expected surface");
expect(result.replies).toHaveLength(0);
expect(result.subChange).toEqual({ op: "remove", surfaceId: "a" });
});
@@ -147,6 +157,8 @@ describe("routeClientMessage", () => {
payload: true,
});
+ expect(result.kind).toBe("surface");
+ if (result.kind !== "surface") throw new Error("expected surface");
expect(result.replies).toHaveLength(0);
expect(result.invoke).toEqual({
surfaceId: "a",
@@ -165,6 +177,8 @@ describe("routeClientMessage", () => {
actionId: "toggle",
});
+ expect(result.kind).toBe("surface");
+ if (result.kind !== "surface") throw new Error("expected surface");
expect(result.replies).toHaveLength(1);
expect(result.replies[0]).toEqual({
type: "error",
@@ -174,6 +188,73 @@ describe("routeClientMessage", () => {
expect(result.invoke).toBeUndefined();
});
});
+
+ describe("chat.send", () => {
+ it("classifies a chat.send message", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.send",
+ message: "hello",
+ });
+
+ expect(result.kind).toBe("chat");
+ if (result.kind !== "chat") throw new Error("expected chat");
+ expect(result.message).toBe("hello");
+ expect(result.conversationId).toBeUndefined();
+ expect(result.model).toBeUndefined();
+ expect(result.cwd).toBeUndefined();
+ });
+
+ it("passes through optional fields", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.send",
+ conversationId: "conv-123",
+ message: "follow up",
+ model: "gpt-4",
+ cwd: "/tmp",
+ });
+
+ expect(result.kind).toBe("chat");
+ if (result.kind !== "chat") throw new Error("expected chat");
+ expect(result.conversationId).toBe("conv-123");
+ expect(result.message).toBe("follow up");
+ expect(result.model).toBe("gpt-4");
+ expect(result.cwd).toBe("/tmp");
+ });
+
+ it("rejects a malformed chat.send (empty message)", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.send",
+ message: "",
+ });
+
+ expect(result.kind).toBe("chat-error");
+ if (result.kind !== "chat-error") throw new Error("expected chat-error");
+ expect(result.errorMessage).toContain("non-empty string");
+ });
+
+ it("rejects a malformed chat.send (missing message)", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.send",
+ message: undefined as unknown as string,
+ });
+
+ expect(result.kind).toBe("chat-error");
+ if (result.kind !== "chat-error") throw new Error("expected chat-error");
+ expect(result.errorMessage).toContain("non-empty string");
+ });
+ });
});
describe("catalogMessage", () => {
diff --git a/packages/transport-ws/src/router.ts b/packages/transport-ws/src/router.ts
index f9a7a82..1a90e86 100644
--- a/packages/transport-ws/src/router.ts
+++ b/packages/transport-ws/src/router.ts
@@ -4,16 +4,18 @@
* Zero I/O, zero ambient state. Every function is `input → output`:
* it decides what to do but does NOT do it. The shell (extension.ts)
* interprets the result: sends WS messages, mutates connSubs, calls
- * provider.invoke.
+ * provider.invoke, drives the orchestrator.
*/
import type { SurfaceRegistry } from "@dispatch/surface-registry";
-import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
+import type { ChatSendMessage, WsClientMessage } from "@dispatch/transport-contract";
+import type { SurfaceServerMessage } from "@dispatch/ui-contract";
// ── Result types ────────────────────────────────────────────────────────────
-/** The effect a single client message should produce. */
-export interface RouteResult {
+/** The effect a surface client message should produce. */
+export interface SurfaceRouteResult {
+ readonly kind: "surface";
/** Server messages to send back to this connection. */
readonly replies: readonly SurfaceServerMessage[];
/** Whether to add or remove the surface id from connSubs. */
@@ -26,6 +28,25 @@ export interface RouteResult {
};
}
+/** The effect a validated chat.send should produce. */
+export interface ChatRouteResult {
+ readonly kind: "chat";
+ readonly conversationId: string | undefined;
+ readonly message: string;
+ readonly model: string | undefined;
+ readonly cwd: string | undefined;
+}
+
+/** A malformed chat.send that should yield a chat.error reply. */
+export interface ChatRouteError {
+ readonly kind: "chat-error";
+ readonly conversationId: string | undefined;
+ readonly errorMessage: string;
+}
+
+/** The effect any client WS message should produce. */
+export type RouteResult = SurfaceRouteResult | ChatRouteResult | ChatRouteError;
+
// ── Helpers ─────────────────────────────────────────────────────────────────
/** Build the catalog `SurfaceServerMessage` from the registry. */
@@ -40,12 +61,12 @@ export function catalogMessage(registry: SurfaceRegistry): SurfaceServerMessage
*
* @param registry The surface registry (looked up once, injected).
* @param connSubs This connection's current subscribed surface ids.
- * @param msg The parsed client message.
+ * @param msg The parsed client message (surface or chat).
*/
export function routeClientMessage(
registry: SurfaceRegistry,
connSubs: ReadonlySet<string>,
- msg: SurfaceClientMessage,
+ msg: WsClientMessage,
): RouteResult {
switch (msg.type) {
case "subscribe":
@@ -54,19 +75,41 @@ export function routeClientMessage(
return handleUnsubscribe(msg.surfaceId);
case "invoke":
return handleInvoke(registry, msg.surfaceId, msg.actionId, msg.payload);
+ case "chat.send":
+ return handleChatSend(msg);
}
}
+// ── Chat validation ─────────────────────────────────────────────────────────
+
+function handleChatSend(msg: ChatSendMessage): ChatRouteResult | ChatRouteError {
+ if (typeof msg.message !== "string" || msg.message.length === 0) {
+ return {
+ kind: "chat-error",
+ conversationId: msg.conversationId,
+ errorMessage: "chat.send requires a non-empty string `message`",
+ };
+ }
+ return {
+ kind: "chat",
+ conversationId: msg.conversationId,
+ message: msg.message,
+ model: msg.model,
+ cwd: msg.cwd,
+ };
+}
+
// ── Per-message handlers ────────────────────────────────────────────────────
function handleSubscribe(
registry: SurfaceRegistry,
connSubs: ReadonlySet<string>,
surfaceId: string,
-): RouteResult {
+): SurfaceRouteResult {
const provider = registry.getSurface(surfaceId);
if (!provider) {
return {
+ kind: "surface",
replies: [{ type: "error", surfaceId, message: `Unknown surface: ${surfaceId}` }],
};
}
@@ -85,13 +128,14 @@ function handleSubscribe(
// Idempotent: only emit subChange if not already subscribed.
if (!connSubs.has(surfaceId)) {
- return { replies, subChange: { op: "add", surfaceId } };
+ return { kind: "surface", replies, subChange: { op: "add", surfaceId } };
}
- return { replies };
+ return { kind: "surface", replies };
}
-function handleUnsubscribe(surfaceId: string): RouteResult {
+function handleUnsubscribe(surfaceId: string): SurfaceRouteResult {
return {
+ kind: "surface",
replies: [],
subChange: { op: "remove", surfaceId },
};
@@ -102,14 +146,16 @@ function handleInvoke(
surfaceId: string,
actionId: string,
payload?: unknown,
-): RouteResult {
+): SurfaceRouteResult {
const provider = registry.getSurface(surfaceId);
if (!provider) {
return {
+ kind: "surface",
replies: [{ type: "error", surfaceId, message: `Unknown surface: ${surfaceId}` }],
};
}
return {
+ kind: "surface",
replies: [],
invoke: { surfaceId, actionId, payload },
};
diff --git a/packages/transport-ws/src/server.bun.test.ts b/packages/transport-ws/src/server.bun.test.ts
index d51eb72..8c16f72 100644
--- a/packages/transport-ws/src/server.bun.test.ts
+++ b/packages/transport-ws/src/server.bun.test.ts
@@ -1,11 +1,9 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
+import type { AgentEvent } from "@dispatch/kernel";
+import type { SessionOrchestrator } from "@dispatch/session-orchestrator";
import type { SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry";
-import type {
- SurfaceCatalogEntry,
- SurfaceClientMessage,
- SurfaceServerMessage,
- SurfaceSpec,
-} from "@dispatch/ui-contract";
+import type { WsServerMessage } from "@dispatch/transport-contract";
+import type { SurfaceCatalogEntry, SurfaceClientMessage, SurfaceSpec } from "@dispatch/ui-contract";
import { catalogMessage, routeClientMessage } from "./router.js";
// ── Fake registry (same pattern as router.test.ts) ──────────────────────────
@@ -45,22 +43,30 @@ function fakeRegistry(providers: readonly SurfaceProvider[]): SurfaceRegistry {
};
}
+function fakeOrchestrator(handler?: SessionOrchestrator["handleMessage"]): SessionOrchestrator {
+ return {
+ handleMessage: handler ?? (async () => {}),
+ };
+}
+
// ── Per-connection state (mirrors extension.ts) ─────────────────────────────
interface ConnectionState {
readonly subs: Set<string>;
readonly providerDisposers: Map<string, () => void>;
+ readonly abortController: AbortController;
}
// ── Server helper ───────────────────────────────────────────────────────────
-function startServer(registry: SurfaceRegistry, port = 0) {
+function startServer(registry: SurfaceRegistry, orchestrator: SessionOrchestrator, port = 0) {
return Bun.serve<ConnectionState>({
port,
fetch(req, srv) {
const initial: ConnectionState = {
subs: new Set(),
providerDisposers: new Map(),
+ abortController: new AbortController(),
};
if (srv.upgrade(req, { data: initial })) return;
return new Response("expected websocket", { status: 426 });
@@ -84,22 +90,67 @@ function startServer(registry: SurfaceRegistry, port = 0) {
const result = routeClientMessage(registry, state.subs, parsed);
- if (result.subChange) {
- if (result.subChange.op === "add") {
- state.subs.add(result.subChange.surfaceId);
- } else {
- state.subs.delete(result.subChange.surfaceId);
+ switch (result.kind) {
+ case "surface": {
+ if (result.subChange) {
+ if (result.subChange.op === "add") {
+ state.subs.add(result.subChange.surfaceId);
+ } else {
+ state.subs.delete(result.subChange.surfaceId);
+ }
+ }
+
+ for (const reply of result.replies) {
+ ws.send(JSON.stringify(reply));
+ }
+ break;
+ }
+
+ case "chat": {
+ const resolvedId = result.conversationId ?? crypto.randomUUID();
+ void (async () => {
+ try {
+ await orchestrator.handleMessage({
+ conversationId: resolvedId,
+ text: result.message,
+ ...(result.model !== undefined ? { modelName: result.model } : {}),
+ ...(result.cwd !== undefined ? { cwd: result.cwd } : {}),
+ signal: state.abortController.signal,
+ onEvent(event) {
+ ws.send(JSON.stringify({ type: "chat.delta", event }));
+ },
+ });
+ } catch (err: unknown) {
+ const message = err instanceof Error ? err.message : "unknown orchestrator error";
+ ws.send(
+ JSON.stringify({
+ type: "chat.error",
+ conversationId: resolvedId,
+ message,
+ }),
+ );
+ }
+ })();
+ break;
}
- }
- for (const reply of result.replies) {
- ws.send(JSON.stringify(reply));
+ case "chat-error": {
+ ws.send(
+ JSON.stringify({
+ type: "chat.error",
+ conversationId: result.conversationId,
+ message: result.errorMessage,
+ }),
+ );
+ break;
+ }
}
},
close(ws) {
const state = ws.data;
if (state) {
+ state.abortController.abort();
for (const dispose of state.providerDisposers.values()) {
dispose();
}
@@ -111,13 +162,32 @@ function startServer(registry: SurfaceRegistry, port = 0) {
// ── Helpers ─────────────────────────────────────────────────────────────────
-function waitForMessage(ws: WebSocket): Promise<SurfaceServerMessage> {
+function waitForMessage(ws: WebSocket): Promise<WsServerMessage> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error("timed out waiting for message")), 5000);
function handler(ev: MessageEvent) {
clearTimeout(timeout);
ws.removeEventListener("message", handler);
- resolve(JSON.parse(ev.data as string) as SurfaceServerMessage);
+ resolve(JSON.parse(ev.data as string) as WsServerMessage);
+ }
+ ws.addEventListener("message", handler);
+ });
+}
+
+function waitForMessages(ws: WebSocket, count: number): Promise<WsServerMessage[]> {
+ return new Promise((resolve, reject) => {
+ const msgs: WsServerMessage[] = [];
+ const timeout = setTimeout(
+ () => reject(new Error(`timed out waiting for ${count} messages (got ${msgs.length})`)),
+ 5000,
+ );
+ function handler(ev: MessageEvent) {
+ msgs.push(JSON.parse(ev.data as string) as WsServerMessage);
+ if (msgs.length === count) {
+ clearTimeout(timeout);
+ ws.removeEventListener("message", handler);
+ resolve(msgs);
+ }
}
ws.addEventListener("message", handler);
});
@@ -128,11 +198,12 @@ function waitForMessage(ws: WebSocket): Promise<SurfaceServerMessage> {
describe("Bun.serve WebSocket server", () => {
let server: ReturnType<typeof Bun.serve>;
let port: number;
+ const defaultOrchestrator = fakeOrchestrator();
beforeEach(() => {
const provider = fakeProvider("demo", "Demo Surface");
const registry = fakeRegistry([provider]);
- server = startServer(registry);
+ server = startServer(registry, defaultOrchestrator);
port = server.port as number;
});
@@ -193,3 +264,100 @@ describe("Bun.serve WebSocket server", () => {
expect(await res.text()).toBe("expected websocket");
});
});
+
+describe("chat ops", () => {
+ let server: ReturnType<typeof Bun.serve>;
+ let port: number;
+
+ afterEach(() => {
+ server.stop();
+ });
+
+ test("chat.send streams AgentEvents back as chat.delta in order", async () => {
+ const events: AgentEvent[] = [
+ { type: "turn-start", conversationId: "c1", turnId: "t1" } as AgentEvent,
+ { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "Hello" } as AgentEvent,
+ { type: "text-delta", conversationId: "c1", turnId: "t1", delta: " world" } as AgentEvent,
+ { type: "done", conversationId: "c1", turnId: "t1" } as AgentEvent,
+ { type: "turn-sealed", conversationId: "c1", turnId: "t1" } as AgentEvent,
+ ];
+
+ const orchestrator = fakeOrchestrator(async (input) => {
+ for (const event of events) {
+ input.onEvent(event);
+ }
+ });
+
+ const registry = fakeRegistry([]);
+ server = startServer(registry, orchestrator);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ ws.send(JSON.stringify({ type: "chat.send", message: "hi" }));
+
+ const msgs = await waitForMessages(ws, events.length);
+
+ for (let i = 0; i < events.length; i++) {
+ const msg = msgs[i];
+ const expected = events[i];
+ if (!msg || !expected) throw new Error(`missing at index ${i}`);
+ expect(msg.type).toBe("chat.delta");
+ if (msg.type === "chat.delta") {
+ expect(msg.event).toEqual(expected);
+ }
+ }
+
+ ws.close();
+ });
+
+ test("chat orchestrator failure yields chat.error without crashing the connection", async () => {
+ const orchestrator = fakeOrchestrator(async () => {
+ throw new Error("boom");
+ });
+
+ const registry = fakeRegistry([fakeProvider("demo", "Demo")]);
+ server = startServer(registry, orchestrator);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ // Send a chat.send that will fail
+ ws.send(JSON.stringify({ type: "chat.send", message: "trigger failure" }));
+ const errMsg = await waitForMessage(ws);
+
+ expect(errMsg.type).toBe("chat.error");
+ if (errMsg.type === "chat.error") {
+ expect(errMsg.message).toBe("boom");
+ }
+
+ // Socket must still be alive — send a surface subscribe to prove it
+ ws.send(JSON.stringify({ type: "subscribe", surfaceId: "demo" }));
+ const surfaceMsg = await waitForMessage(ws);
+ expect(surfaceMsg.type).toBe("surface");
+
+ ws.close();
+ });
+
+ test("chat.send with empty message yields chat.error (pure router rejection)", async () => {
+ const orchestrator = fakeOrchestrator();
+ const registry = fakeRegistry([]);
+ server = startServer(registry, orchestrator);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ ws.send(JSON.stringify({ type: "chat.send", message: "" }));
+ const errMsg = await waitForMessage(ws);
+
+ expect(errMsg.type).toBe("chat.error");
+ if (errMsg.type === "chat.error") {
+ expect(errMsg.message).toContain("non-empty string");
+ }
+
+ ws.close();
+ });
+});
diff --git a/packages/transport-ws/tsconfig.json b/packages/transport-ws/tsconfig.json
index 102c8f0..2a1d7ab 100644
--- a/packages/transport-ws/tsconfig.json
+++ b/packages/transport-ws/tsconfig.json
@@ -4,7 +4,9 @@
"include": ["src/**/*.ts"],
"references": [
{ "path": "../kernel" },
+ { "path": "../session-orchestrator" },
{ "path": "../surface-registry" },
+ { "path": "../transport-contract" },
{ "path": "../ui-contract" }
]
}
diff --git a/tasks.md b/tasks.md
index af6d020..4bfe7d5 100644
--- a/tasks.md
+++ b/tasks.md
@@ -413,8 +413,42 @@ streaming. Spans both repos; the backend prereqs live HERE (FE work runs in `../
- **Verified (orchestrator):** typecheck clean, **481 vitest** (469→+12), biome clean, no internal
`@dispatch/*` mocks, in-lane. Live boot-probe deferred to the WS step (this GET route has no
effectful-shell surprise surface; host wiring mirrors `/chat`).
-- [ ] **WS turn-deltas** — `transport-ws` multiplexes `sendMessage`/`onDelta(AgentEvent)`
- alongside surface ops (one connection carries both; frontend-design §5).
+- [x] **WS turn-deltas** — `transport-ws` multiplexes chat ops alongside surface ops. DONE + verified live.
+ Design (user-confirmed): chat WS ops live in `@dispatch/transport-contract` (NOT ui-contract —
+ it stays surface-only/zero-`@dispatch`-deps); new non-colliding `type` variants (`chat.send`
+ client; `chat.delta`/`chat.error` server) widen unified `WsClientMessage`/`WsServerMessage`
+ unions — NO channel wrapper, so the shipped slice-1 surface protocol is byte-identical.
+ - **Contract + wiring (orchestrator):** added `ChatSendMessage`/`ChatDeltaMessage`/
+ `ChatErrorMessage` + `WsClientMessage`/`WsServerMessage` to transport-contract (now imports
+ `ui-contract`; doc broadened to "HTTP + WebSocket"); deps/refs for transport-contract→ui-contract
+ and transport-ws→{session-orchestrator,transport-contract}; `bun install`.
+ - **transport-ws (owner, mimo-v2.5-pro):** pure `routeClientMessage` extended to the full union
+ (`kind:"chat"|"chat-error"|"surface"`); shell drives `sessionOrchestratorHandle.handleMessage`,
+ streaming each `AgentEvent` as `{type:"chat.delta",event}` via `onEvent`; per-connection
+ `AbortController` aborted on socket close (no leaked turns); error-isolated (`chat.error`,
+ never crashes the connection). `dependsOn:["session-orchestrator"]`. +4 router + 3 bun:test
+ integration (real `Bun.serve` WS + fake orchestrator). prompts/ws-turn-deltas.md, reports/transport-ws.md.
+ - **Verified (orchestrator):** typecheck clean, **485 vitest** (481→+4) + **80 bun** (77→+3),
+ biome clean, in-lane. **Live (host-bin :24203 HTTP / :24205 WS, real flash):** one WS connection
+ delivered `catalog` (surface op) AND a real chat turn — `chat.delta` streamed
+ reasoning-delta×37 → text-delta×4 → usage, reply "Hello my friend". No leaked procs after
+ bracket-trick cleanup.
+ - **Probe-artifact lesson (scar tissue):** the first probe reported FAIL because it asserted
+ `turn-start`/`done`/`turn-sealed` frames — but the runtime emits NONE of those (see open item).
+ transport-ws faithfully forwarded exactly what `onEvent` delivered; the failure was bad probe
+ criteria, NOT a transport bug (cf. slice-1 "10 vs 11 extensions" artifact). Verified by
+ confirming HTTP `/chat` emits the identical event set.
+
+#### Open item discovered live — runtime turn-lifecycle events NOT emitted (BLOCKS FE cache-commit)
+The runtime/orchestrator emits `reasoning-delta`/`text-delta`/`usage` but **NOT `turn-start`,
+`done`, or `turn-sealed`** through `emit`/`onEvent` — on EITHER transport (HTTP `/chat` + WS chat
+both confirmed). The wire DEFINES these (`TurnStartEvent`/`TurnDoneEvent`/`TurnSealedEvent`) but
+`runTurn` doesn't fire them. **`turn-sealed` is the FE's cache-commit signal (frontend-design §6.3
+— "below the last seal is immutable + cacheable"); `done` ends the stream.** Needs a
+kernel-runtime (and/or session-orchestrator) emission fix BEFORE the FE chat slice can commit
+turns to its cache. Diagnose-from-symptoms done; fix = summon the owning unit. NOT a transport-ws
+defect.
+
Then FE (`../dispatch-web`): `core/transcript` reducer + `conversation-cache` + `chat` feature.
### 3. dedup / storage growth (after frontend)