summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-21 16:56:42 +0900
committerAdam Malczewski <[email protected]>2026-06-21 16:56:42 +0900
commitea0e938eca3072649dc8707c999ec00cf87b986a (patch)
tree41f96ed517fe779ed612fc4cb3e8359a841ad088 /packages
parent36e950ba2cd2591e86f0dcc898f740481c59d912 (diff)
downloaddispatch-ea0e938eca3072649dc8707c999ec00cf87b986a.tar.gz
dispatch-ea0e938eca3072649dc8707c999ec00cf87b986a.zip
feat(transport): CLI endpoints + conversation.open broadcast (Wave 2)
transport-http: GET /conversations (list with ?q= prefix filter), GET /conversations/:id/last (blocks until turn settles, returns last AI text), POST /conversations/:id/open (emits conversationOpened hook), PUT /conversations/:id/title (set title). emit threaded from host.emit. extractLastAssistantText pure helper. 21 new tests (166 total). transport-ws: subscribes to conversationOpened hook, broadcasts ConversationOpenMessage to all connected WS clients. 2 new tests. session-orchestrator: conversationOpened hook descriptor (exported).
Diffstat (limited to 'packages')
-rw-r--r--packages/session-orchestrator/src/index.ts2
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts14
-rw-r--r--packages/transport-http/src/app.test.ts375
-rw-r--r--packages/transport-http/src/app.ts140
-rw-r--r--packages/transport-http/src/extension.ts1
-rw-r--r--packages/transport-http/src/index.ts1
-rw-r--r--packages/transport-http/src/logic.ts32
-rw-r--r--packages/transport-http/src/seam.ts6
-rw-r--r--packages/transport-ws/src/extension.ts31
-rw-r--r--packages/transport-ws/src/server.bun.test.ts79
10 files changed, 676 insertions, 5 deletions
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index afec2b4..3d8ad18 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -1,8 +1,10 @@
export { extension, manifest } from "./extension.js";
export {
type ConversationClosedPayload,
+ type ConversationOpenedPayload,
cacheWarmHandle,
conversationClosed,
+ conversationOpened,
createSessionOrchestrator,
createWarmService,
type EnqueueInput,
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 3a74c2d..82ca59e 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -97,6 +97,20 @@ export interface ConversationClosedPayload {
export const conversationClosed: EventHookDescriptor<ConversationClosedPayload> =
defineEventHook<ConversationClosedPayload>("session-orchestrator/conversation-closed");
+/** Payload for the conversationOpened bus event. */
+export interface ConversationOpenedPayload {
+ readonly conversationId: string;
+}
+
+/**
+ * Fired when a client requests to "open" a conversation (e.g. the CLI `--open`
+ * flag). Transport-ws subscribes and broadcasts a `conversation.open` WS
+ * message to all connected frontend clients. The frontend decides whether to
+ * open/focus a tab — the backend just signals.
+ */
+export const conversationOpened: EventHookDescriptor<ConversationOpenedPayload> =
+ defineEventHook<ConversationOpenedPayload>("session-orchestrator/conversation-opened");
+
/** Payload for the warmCompleted bus event. */
export interface WarmCompletedPayload {
readonly conversationId: string;
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts
index 9078a07..cb14648 100644
--- a/packages/transport-http/src/app.test.ts
+++ b/packages/transport-http/src/app.test.ts
@@ -1,5 +1,8 @@
import type {
AgentEvent,
+ ChatMessage,
+ ConversationMeta,
+ HostAPI,
Logger,
ReasoningEffort,
StepId,
@@ -15,6 +18,7 @@ import type {
} from "@dispatch/transport-contract";
import { describe, expect, it } from "vitest";
import { createApp } from "./app.js";
+import { extractLastAssistantText } from "./logic.js";
import type {
ConversationStore,
CredentialStore,
@@ -22,6 +26,7 @@ import type {
SessionOrchestrator,
WarmService,
} from "./seam.js";
+import { conversationOpened } from "./seam.js";
function createMemStorage(): StorageNamespace {
const map = new Map<string, string>();
@@ -2026,3 +2031,373 @@ describe("PUT /conversations/:id/reasoning-effort", () => {
expect(storeCalled).toBe(false);
});
});
+
+describe("GET /conversations", () => {
+ const sampleConvos: ConversationMeta[] = [
+ { id: "conv-1", createdAt: 1000, lastActivityAt: 2000, title: "First" },
+ { id: "conv-2", createdAt: 1500, lastActivityAt: 2500, title: "Second" },
+ { id: "other-1", createdAt: 3000, lastActivityAt: 4000, title: "Other" },
+ ];
+
+ function appWithList(list: ConversationMeta[]) {
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async listConversations() {
+ return list;
+ },
+ };
+ return createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ }
+
+ it("returns 200 with list", async () => {
+ const app = appWithList(sampleConvos);
+ const res = await app.request("/conversations");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversations: ConversationMeta[] };
+ expect(body.conversations).toHaveLength(3);
+ expect(body.conversations.map((c) => c.id)).toEqual(["conv-1", "conv-2", "other-1"]);
+ });
+
+ it("?q= filters by id prefix", async () => {
+ const app = appWithList(sampleConvos);
+ const res = await app.request("/conversations?q=conv-");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversations: ConversationMeta[] };
+ expect(body.conversations).toHaveLength(2);
+ expect(body.conversations.map((c) => c.id)).toEqual(["conv-1", "conv-2"]);
+ });
+
+ it("?q= returns all when q is empty", async () => {
+ const app = appWithList(sampleConvos);
+ const res = await app.request("/conversations?q=");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversations: ConversationMeta[] };
+ expect(body.conversations).toHaveLength(3);
+ });
+
+ it("?q= with whitespace-only returns all (trimmed to empty)", async () => {
+ const app = appWithList(sampleConvos);
+ const res = await app.request("/conversations?q=%20%20%20");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversations: ConversationMeta[] };
+ expect(body.conversations).toHaveLength(3);
+ });
+
+ it("returns 500 when listConversations throws", async () => {
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async listConversations() {
+ throw new Error("db down");
+ },
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations");
+ expect(res.status).toBe(500);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("Failed to list conversations");
+ });
+});
+
+describe("GET /conversations/:id/last", () => {
+ function appWithMessages(messagesByConv: Map<string, ChatMessage[]>) {
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async load(conversationId) {
+ return messagesByConv.get(conversationId) ?? [];
+ },
+ };
+ return createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ }
+
+ it("returns last assistant text", async () => {
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "hello" }] },
+ { role: "assistant", chunks: [{ type: "text", text: "hi there" }] },
+ { role: "user", chunks: [{ type: "text", text: "more" }] },
+ { role: "assistant", chunks: [{ type: "text", text: "final reply" }] },
+ ];
+ const app = appWithMessages(new Map([["conv1", messages]]));
+ const res = await app.request("/conversations/conv1/last");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversationId: string; content: string; turnId?: string };
+ expect(body.conversationId).toBe("conv1");
+ expect(body.content).toBe("final reply");
+ expect(body.turnId).toBeUndefined();
+ });
+
+ it("returns empty content for unknown conversation", async () => {
+ const app = appWithMessages(new Map());
+ const res = await app.request("/conversations/unknown/last");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversationId: string; content: string; turnId?: string };
+ expect(body.conversationId).toBe("unknown");
+ expect(body.content).toBe("");
+ expect(body.turnId).toBeUndefined();
+ });
+
+ it("blocks until turn settles", async () => {
+ const turnId = "sealed-turn";
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ subscribe(conversationId, listener) {
+ const event = { type: "turn-sealed" as const, conversationId, turnId };
+ setTimeout(() => listener(event), 0);
+ return () => {};
+ },
+ isActive() {
+ return true;
+ },
+ };
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "hello" }] },
+ { role: "assistant", chunks: [{ type: "text", text: "after seal" }] },
+ ];
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async load() {
+ return messages;
+ },
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv1/last");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversationId: string; content: string; turnId?: string };
+ expect(body.conversationId).toBe("conv1");
+ expect(body.content).toBe("after seal");
+ expect(body.turnId).toBe(turnId);
+ });
+});
+
+describe("POST /conversations/:id/open", () => {
+ it("returns 200", async () => {
+ const emit: HostAPI["emit"] = () => {};
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ emit,
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/open", { method: "POST" });
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversationId: string };
+ expect(body.conversationId).toBe("conv1");
+ });
+
+ it("calls emit with conversationOpened", async () => {
+ const emitCalls: Array<{ readonly hook: unknown; readonly payload: unknown }> = [];
+ const emit: HostAPI["emit"] = (hook, payload) => {
+ emitCalls.push({ hook, payload });
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ emit,
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/open", { method: "POST" });
+ expect(res.status).toBe(200);
+ expect(emitCalls).toHaveLength(1);
+ expect(emitCalls[0]?.hook).toBe(conversationOpened);
+ expect(emitCalls[0]?.payload).toEqual({ conversationId: "conv1" });
+ });
+
+ it("returns 500 when emit is absent", async () => {
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/open", { method: "POST" });
+ expect(res.status).toBe(500);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toBe("not available");
+ });
+});
+
+describe("PUT /conversations/:id/title", () => {
+ it("returns 200 with title", async () => {
+ const store = createFakeConversationStore();
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/title", {
+ method: "PUT",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ title: "My Conversation" }),
+ });
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversationId: string; title: string };
+ expect(body.conversationId).toBe("conv1");
+ expect(body.title).toBe("My Conversation");
+ });
+
+ it("rejects empty title with 400", async () => {
+ let setTitleCalled = false;
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async setConversationTitle() {
+ setTitleCalled = true;
+ },
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/title", {
+ method: "PUT",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ title: " " }),
+ });
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("title");
+ expect(setTitleCalled).toBe(false);
+ });
+
+ it("returns 400 when title is missing", async () => {
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/title", {
+ method: "PUT",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({}),
+ });
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("title");
+ });
+
+ it("forwards the trimmed title to setConversationTitle", async () => {
+ const calls: { conversationId: string; title: string }[] = [];
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async setConversationTitle(conversationId, title) {
+ calls.push({ conversationId, title });
+ },
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/title", {
+ method: "PUT",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ title: " trimmed title " }),
+ });
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { conversationId: string; title: string };
+ expect(body.title).toBe("trimmed title");
+ expect(calls).toHaveLength(1);
+ expect(calls[0]?.conversationId).toBe("conv1");
+ expect(calls[0]?.title).toBe("trimmed title");
+ });
+
+ it("returns 400 for invalid JSON body", async () => {
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/conversations/conv1/title", {
+ method: "PUT",
+ headers: { "Content-Type": "application/json" },
+ body: "not json",
+ });
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("JSON");
+ });
+});
+
+describe("extractLastAssistantText", () => {
+ it("returns last assistant text chunk", () => {
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "hello" }] },
+ { role: "assistant", chunks: [{ type: "text", text: "hi there" }] },
+ { role: "user", chunks: [{ type: "text", text: "how are you?" }] },
+ { role: "assistant", chunks: [{ type: "text", text: "I'm good!" }] },
+ ];
+ expect(extractLastAssistantText(messages)).toBe("I'm good!");
+ });
+
+ it("returns empty string when no assistant message", () => {
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "hello" }] },
+ { role: "system", chunks: [{ type: "text", text: "system prompt" }] },
+ ];
+ expect(extractLastAssistantText(messages)).toBe("");
+ });
+
+ it("returns the LAST text chunk when an assistant message has multiple", () => {
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "hello" }] },
+ {
+ role: "assistant",
+ chunks: [
+ { type: "text", text: "first" },
+ { type: "thinking", text: "internal reasoning" },
+ { type: "text", text: "second" },
+ ],
+ },
+ ];
+ expect(extractLastAssistantText(messages)).toBe("second");
+ });
+
+ it("returns empty string when the last assistant message has no text chunk", () => {
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "hello" }] },
+ {
+ role: "assistant",
+ chunks: [
+ {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "read_file",
+ input: { path: "/tmp" },
+ },
+ ],
+ },
+ ];
+ expect(extractLastAssistantText(messages)).toBe("");
+ });
+
+ it("returns empty string for an empty message list", () => {
+ expect(extractLastAssistantText([])).toBe("");
+ });
+});
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index 8cb85c9..86bac0d 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -1,15 +1,19 @@
-import type { AgentEvent, Logger } from "@dispatch/kernel";
+import type { AgentEvent, HostAPI, Logger } from "@dispatch/kernel";
import type {
CloseConversationResponse,
ConversationHistoryResponse,
+ ConversationListResponse,
ConversationMetricsResponse,
CwdResponse,
+ LastMessageResponse,
LspServerInfo,
LspStatusResponse,
ModelsResponse,
+ OpenConversationResponse,
QueueResponse,
ReasoningEffortResponse,
ThroughputResponse,
+ TitleResponse,
WarmResponse,
} from "@dispatch/transport-contract";
import { Hono } from "hono";
@@ -17,6 +21,7 @@ import { cors } from "hono/cors";
import {
computeCachePct,
computeExpectedCacheRate,
+ extractLastAssistantText,
isParseError,
isReasoningEffortParseError,
isSinceSeqError,
@@ -32,6 +37,7 @@ import {
import {
type ConversationStore,
type CredentialStore,
+ conversationOpened,
type LspServerStatus,
type LspService,
type SessionOrchestrator,
@@ -52,6 +58,12 @@ export interface CreateServerOptions {
readonly generateId?: () => string;
/** Injectable clock for sample timestamps (default Date.now). */
readonly now?: () => number;
+ /**
+ * Fire-and-forget event-bus emit (bound `host.emit`). Required by
+ * `POST /conversations/:id/open` to signal the frontend. When absent,
+ * that endpoint responds `500 { error: "not available" }`.
+ */
+ readonly emit?: HostAPI["emit"];
}
const noopLogger: Logger = {
@@ -531,5 +543,131 @@ export function createApp(opts: CreateServerOptions): Hono {
}
});
+ app.get("/conversations", async (c) => {
+ try {
+ const all = await opts.conversationStore.listConversations();
+ // Optional `?q=` filters by id prefix (short-id resolution). A
+ // missing/empty/whitespace-only `q` is ignored → return all.
+ const rawQ = c.req.query("q");
+ const q = rawQ?.trim() ?? "";
+ const conversations = q.length > 0 ? all.filter((m) => m.id.startsWith(q)) : all;
+ log.info("conversations: list", {
+ count: conversations.length,
+ ...(q.length > 0 ? { q } : {}),
+ });
+ const body: ConversationListResponse = { conversations };
+ return c.json(body, 200);
+ } catch (err) {
+ log.error("conversations: list failure", { err });
+ return c.json({ error: "Failed to list conversations" }, 500);
+ }
+ });
+
+ app.get("/conversations/:id/last", async (c) => {
+ const conversationId = c.req.param("id");
+
+ // Subscribe BEFORE checking isActive — closes the race where a seal
+ // fires between the check and the subscribe (we'd miss it). If idle,
+ // unsubscribe immediately; if active, wait for a `turn-sealed` event
+ // (or a 60s timeout, then proceed regardless of what's available).
+ let turnId: string | undefined;
+ let unsubscribe: (() => void) | undefined;
+ try {
+ await new Promise<void>((resolve) => {
+ let settled = false;
+ let timer: ReturnType<typeof setTimeout> | undefined;
+ const finish = (): void => {
+ if (settled) return;
+ settled = true;
+ if (timer !== undefined) clearTimeout(timer);
+ resolve();
+ };
+ unsubscribe = opts.orchestrator.subscribe(conversationId, (event) => {
+ if (event.type === "turn-sealed") {
+ turnId = event.turnId;
+ finish();
+ }
+ });
+ if (!opts.orchestrator.isActive(conversationId)) {
+ finish();
+ return;
+ }
+ // A seal may have fired synchronously during subscribe (the
+ // real orchestrator never does this, but a fake might) — don't
+ // arm a 60s timer for an already-settled promise.
+ if (settled) return;
+ timer = setTimeout(finish, 60_000);
+ });
+ } finally {
+ unsubscribe?.();
+ }
+
+ let content = "";
+ try {
+ const messages = await opts.conversationStore.load(conversationId);
+ content = extractLastAssistantText(messages);
+ } catch (err) {
+ log.error("conversations: last message load failure", { err });
+ return c.json({ error: "Failed to load conversation" }, 500);
+ }
+
+ log.info("conversations: last read", {
+ conversationId,
+ hasContent: content.length > 0,
+ });
+ const body: LastMessageResponse = {
+ conversationId,
+ content,
+ ...(turnId !== undefined ? { turnId } : {}),
+ };
+ return c.json(body, 200);
+ });
+
+ app.post("/conversations/:id/open", (c) => {
+ const conversationId = c.req.param("id");
+ if (opts.emit === undefined) {
+ log.warn("conversations: open requested but emit is not available", {
+ conversationId,
+ });
+ return c.json({ error: "not available" }, 500);
+ }
+ opts.emit(conversationOpened, { conversationId });
+ log.info("conversations: opened", { conversationId });
+ const body: OpenConversationResponse = { conversationId };
+ return c.json(body, 200);
+ });
+
+ app.put("/conversations/:id/title", async (c) => {
+ const conversationId = c.req.param("id");
+ let body: unknown;
+ try {
+ body = await c.req.json();
+ } catch {
+ log.warn("conversations/title: invalid JSON body");
+ return c.json({ error: "Invalid JSON body" }, 400);
+ }
+
+ if (body === null || typeof body !== "object") {
+ return c.json({ error: "Request body must be a JSON object" }, 400);
+ }
+ const obj = body as Record<string, unknown>;
+ if (typeof obj.title !== "string" || obj.title.trim().length === 0) {
+ return c.json({ error: "Field 'title' is required and must be a non-empty string" }, 400);
+ }
+ // Trim before persisting (mirrors how `parseQueueBody` / `parseChatBody`
+ // forward trimmed text), so a title never carries surrounding whitespace.
+ const title = obj.title.trim();
+
+ try {
+ await opts.conversationStore.setConversationTitle(conversationId, title);
+ log.info("conversations: title set", { conversationId });
+ const response: TitleResponse = { conversationId, title };
+ return c.json(response, 200);
+ } catch (err) {
+ log.error("conversations: title set failure", { err });
+ return c.json({ error: "Failed to set conversation title" }, 500);
+ }
+ });
+
return app;
}
diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts
index 9e5f037..e402213 100644
--- a/packages/transport-http/src/extension.ts
+++ b/packages/transport-http/src/extension.ts
@@ -72,6 +72,7 @@ export function createTransportHttpExtension(): Extension & {
warmService,
lspService,
logger,
+ emit: host.emit.bind(host),
});
const port = host.config.get<number>("httpPort") ?? 24203;
diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts
index 735dc38..f1a5a41 100644
--- a/packages/transport-http/src/index.ts
+++ b/packages/transport-http/src/index.ts
@@ -12,6 +12,7 @@ export type {
} from "./logic.js";
export {
computeCachePct,
+ extractLastAssistantText,
isParseError,
isReasoningEffortParseError,
isSinceSeqError,
diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts
index aa5394c..d20713c 100644
--- a/packages/transport-http/src/logic.ts
+++ b/packages/transport-http/src/logic.ts
@@ -1,4 +1,4 @@
-import type { AgentEvent, ReasoningEffort } from "@dispatch/kernel";
+import type { AgentEvent, ChatMessage, ReasoningEffort } from "@dispatch/kernel";
const VALID_REASONING_EFFORTS: readonly ReasoningEffort[] = [
"low",
@@ -221,3 +221,33 @@ export function isReasoningEffortParseError(
): result is ParseError {
return typeof result === "object" && result !== null && "error" in result;
}
+
+/**
+ * Extract the text of the last assistant message's last `text` chunk — the
+ * "show me the last reply" affordance for `GET /conversations/:id/last`.
+ *
+ * Scan from the END for the last message with `role: "assistant"`, then within
+ * THAT message for the last `type: "text"` chunk. Returns its `text`. Returns
+ * `""` when there is no assistant message, or when the last assistant message
+ * has no text chunk (e.g. only tool-call chunks).
+ *
+ * Pure (input → output); zero I/O, so it tests directly without mocks.
+ */
+export function extractLastAssistantText(messages: readonly ChatMessage[]): string {
+ for (let i = messages.length - 1; i >= 0; i--) {
+ const msg = messages[i];
+ if (msg === undefined || msg.role !== "assistant") continue;
+ // Found the last assistant message — scan its chunks from the end for
+ // the last `text` chunk. Stop here (do not keep scanning earlier
+ // assistant messages): the contract is "the last assistant message's
+ // last text chunk", not "the most recent text chunk anywhere".
+ for (let j = msg.chunks.length - 1; j >= 0; j--) {
+ const chunk = msg.chunks[j];
+ if (chunk !== undefined && chunk.type === "text") {
+ return chunk.text;
+ }
+ }
+ return "";
+ }
+ return "";
+}
diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts
index 43c9d4d..1c89a34 100644
--- a/packages/transport-http/src/seam.ts
+++ b/packages/transport-http/src/seam.ts
@@ -5,6 +5,10 @@ export { credentialStoreHandle } from "@dispatch/credential-store";
export type { LspServerStatus, LspService } from "@dispatch/lsp";
export { lspServiceHandle } from "@dispatch/lsp";
export type { SessionOrchestrator, WarmService } from "@dispatch/session-orchestrator";
-export { cacheWarmHandle, sessionOrchestratorHandle } from "@dispatch/session-orchestrator";
+export {
+ cacheWarmHandle,
+ conversationOpened,
+ sessionOrchestratorHandle,
+} from "@dispatch/session-orchestrator";
export type { ThroughputStore } from "@dispatch/throughput-store";
export { ThroughputQueryError, throughputStoreHandle } from "@dispatch/throughput-store";
diff --git a/packages/transport-ws/src/extension.ts b/packages/transport-ws/src/extension.ts
index 7a4e707..8b07a54 100644
--- a/packages/transport-ws/src/extension.ts
+++ b/packages/transport-ws/src/extension.ts
@@ -8,7 +8,7 @@
import type { Extension, HostAPI } from "@dispatch/kernel";
import type { SessionOrchestrator } from "@dispatch/session-orchestrator";
-import { sessionOrchestratorHandle } from "@dispatch/session-orchestrator";
+import { conversationOpened, sessionOrchestratorHandle } from "@dispatch/session-orchestrator";
import type { SurfaceContext, SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry";
import { surfaceRegistryHandle } from "@dispatch/surface-registry";
import type { WsClientMessage, WsServerMessage } from "@dispatch/transport-contract";
@@ -27,6 +27,10 @@ type Ws = Bun.ServerWebSocket<ConnectionState>;
export function createTransportWsExtension(): Extension {
let server: ReturnType<typeof Bun.serve<ConnectionState>> | undefined;
+ /** Every currently-connected WS client — used for global fan-out broadcasts. */
+ const connections = new Set<Ws>();
+ /** Disposers for host hook subscriptions (drained on deactivate). */
+ const disposers: Array<() => void> = [];
return {
manifest,
@@ -44,6 +48,13 @@ export function createTransportWsExtension(): Extension {
}
}
+ /** Broadcast a message to EVERY connected WS client (global fan-out). */
+ function broadcast(msg: WsServerMessage): void {
+ for (const ws of connections) {
+ send(ws, msg);
+ }
+ }
+
/**
* Ensure this connection is subscribed to a conversation's chat events.
* Idempotent — no-op if already subscribed. The orchestrator replays
@@ -113,6 +124,17 @@ export function createTransportWsExtension(): Extension {
}
}
+ // Broadcast a `conversation.open` WS message to ALL connected clients
+ // whenever the orchestrator signals a conversation was opened (e.g. the
+ // CLI `--open` flag). The frontend decides whether to open/focus a tab —
+ // the backend just signals. This is a GLOBAL fan-out (like the catalog),
+ // NOT a per-conversation chat broadcast.
+ disposers.push(
+ host.on(conversationOpened, ({ conversationId }) => {
+ broadcast({ type: "conversation.open", conversationId });
+ }),
+ );
+
server = Bun.serve<ConnectionState>({
port,
fetch(req, srv) {
@@ -126,6 +148,7 @@ export function createTransportWsExtension(): Extension {
},
websocket: {
open(ws) {
+ connections.add(ws);
logger.debug("transport-ws: connection open");
send(ws, catalogMessage(registry));
},
@@ -297,6 +320,7 @@ export function createTransportWsExtension(): Extension {
},
close(ws) {
+ connections.delete(ws);
const state = ws.data;
if (state) {
// Dispose all chat subscriptions (does NOT abort turns).
@@ -318,6 +342,11 @@ export function createTransportWsExtension(): Extension {
},
deactivate() {
+ for (const dispose of disposers) {
+ dispose();
+ }
+ disposers.length = 0;
+ connections.clear();
if (server) {
server.stop();
server = undefined;
diff --git a/packages/transport-ws/src/server.bun.test.ts b/packages/transport-ws/src/server.bun.test.ts
index da3be5e..e723766 100644
--- a/packages/transport-ws/src/server.bun.test.ts
+++ b/packages/transport-ws/src/server.bun.test.ts
@@ -240,7 +240,20 @@ function startServer(
logger?: Logger,
) {
const log = logger ?? fakeLogger();
- return Bun.serve<ConnectionState>({
+ const connections = new Set<Bun.ServerWebSocket<ConnectionState>>();
+
+ /** Broadcast a message to every connected client (mirrors extension.ts). */
+ function broadcast(msg: WsServerMessage): void {
+ for (const ws of connections) {
+ try {
+ ws.send(JSON.stringify(msg));
+ } catch {
+ // Connection may have been dropped; swallow.
+ }
+ }
+ }
+
+ const server = Bun.serve<ConnectionState>({
port,
fetch(req, srv) {
const initial: ConnectionState = {
@@ -253,6 +266,7 @@ function startServer(
},
websocket: {
open(ws) {
+ connections.add(ws);
log.debug("transport-ws: connection open");
ws.send(JSON.stringify(catalogMessage(registry)));
},
@@ -392,6 +406,7 @@ function startServer(
},
close(ws) {
+ connections.delete(ws);
const state = ws.data;
if (state) {
for (const dispose of state.chatSubscriptions.values()) {
@@ -406,6 +421,17 @@ function startServer(
},
},
});
+
+ /**
+ * Simulate the `conversationOpened` hook firing — mirrors the
+ * `host.on(conversationOpened, ...)` subscription in extension.ts, which
+ * broadcasts a `conversation.open` WS message to every connected client.
+ */
+ return Object.assign(server, {
+ triggerConversationOpen(conversationId: string): void {
+ broadcast({ type: "conversation.open", conversationId });
+ },
+ });
}
// ── Helpers ─────────────────────────────────────────────────────────────────
@@ -938,3 +964,54 @@ describe("logging", () => {
expect(abortLogs).toHaveLength(0);
});
});
+
+describe("conversation.open broadcast (conversationOpened hook)", () => {
+ let server: ReturnType<typeof startServer>;
+ let port: number;
+
+ afterEach(() => {
+ server.stop();
+ });
+
+ test("conversation.open broadcast on conversationOpened hook", async () => {
+ const orch = fakeOrchestrator();
+ const registry = fakeRegistry([fakeProvider("demo", "Demo Surface")]);
+ server = startServer(registry, orch);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ // Simulate the conversationOpened hook firing (extension.ts's
+ // `host.on(conversationOpened, ...)` handler runs and broadcasts).
+ server.triggerConversationOpen("conv-42");
+
+ const msg = await waitForMessage(ws);
+ expect(msg).toEqual({ type: "conversation.open", conversationId: "conv-42" });
+
+ ws.close();
+ });
+
+ test("conversation.open sent to all connected clients", async () => {
+ const orch = fakeOrchestrator();
+ const registry = fakeRegistry([fakeProvider("demo", "Demo Surface")]);
+ server = startServer(registry, orch);
+ port = server.port as number;
+
+ const ws1 = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws1); // drain catalog
+ const ws2 = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws2); // drain catalog
+
+ // Global fan-out: BOTH connected clients receive the broadcast,
+ // regardless of any per-conversation subscription state.
+ server.triggerConversationOpen("shared-conv");
+
+ const [msg1, msg2] = await Promise.all([waitForMessage(ws1), waitForMessage(ws2)]);
+ expect(msg1).toEqual({ type: "conversation.open", conversationId: "shared-conv" });
+ expect(msg2).toEqual({ type: "conversation.open", conversationId: "shared-conv" });
+
+ ws1.close();
+ ws2.close();
+ });
+});