diff options
| author | Adam Malczewski <[email protected]> | 2026-06-21 16:56:42 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-21 16:56:42 +0900 |
| commit | ea0e938eca3072649dc8707c999ec00cf87b986a (patch) | |
| tree | 41f96ed517fe779ed612fc4cb3e8359a841ad088 /packages/transport-http | |
| parent | 36e950ba2cd2591e86f0dcc898f740481c59d912 (diff) | |
| download | dispatch-ea0e938eca3072649dc8707c999ec00cf87b986a.tar.gz dispatch-ea0e938eca3072649dc8707c999ec00cf87b986a.zip | |
feat(transport): CLI endpoints + conversation.open broadcast (Wave 2)
transport-http: GET /conversations (list with ?q= prefix filter),
GET /conversations/:id/last (blocks until turn settles, returns last AI
text), POST /conversations/:id/open (emits conversationOpened hook),
PUT /conversations/:id/title (set title). emit threaded from host.emit.
extractLastAssistantText pure helper. 21 new tests (166 total).
transport-ws: subscribes to conversationOpened hook, broadcasts
ConversationOpenMessage to all connected WS clients. 2 new tests.
session-orchestrator: conversationOpened hook descriptor (exported).
Diffstat (limited to 'packages/transport-http')
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 375 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 140 | ||||
| -rw-r--r-- | packages/transport-http/src/extension.ts | 1 | ||||
| -rw-r--r-- | packages/transport-http/src/index.ts | 1 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.ts | 32 | ||||
| -rw-r--r-- | packages/transport-http/src/seam.ts | 6 |
6 files changed, 552 insertions, 3 deletions
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index 9078a07..cb14648 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -1,5 +1,8 @@ import type { AgentEvent, + ChatMessage, + ConversationMeta, + HostAPI, Logger, ReasoningEffort, StepId, @@ -15,6 +18,7 @@ import type { } from "@dispatch/transport-contract"; import { describe, expect, it } from "vitest"; import { createApp } from "./app.js"; +import { extractLastAssistantText } from "./logic.js"; import type { ConversationStore, CredentialStore, @@ -22,6 +26,7 @@ import type { SessionOrchestrator, WarmService, } from "./seam.js"; +import { conversationOpened } from "./seam.js"; function createMemStorage(): StorageNamespace { const map = new Map<string, string>(); @@ -2026,3 +2031,373 @@ describe("PUT /conversations/:id/reasoning-effort", () => { expect(storeCalled).toBe(false); }); }); + +describe("GET /conversations", () => { + const sampleConvos: ConversationMeta[] = [ + { id: "conv-1", createdAt: 1000, lastActivityAt: 2000, title: "First" }, + { id: "conv-2", createdAt: 1500, lastActivityAt: 2500, title: "Second" }, + { id: "other-1", createdAt: 3000, lastActivityAt: 4000, title: "Other" }, + ]; + + function appWithList(list: ConversationMeta[]) { + const store: ConversationStore = { + ...createFakeConversationStore(), + async listConversations() { + return list; + }, + }; + return createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + } + + it("returns 200 with list", async () => { + const app = appWithList(sampleConvos); + const res = await app.request("/conversations"); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversations: ConversationMeta[] }; + expect(body.conversations).toHaveLength(3); + expect(body.conversations.map((c) => c.id)).toEqual(["conv-1", "conv-2", "other-1"]); + }); + + it("?q= filters by id prefix", async () => { + const app = appWithList(sampleConvos); + const res = await app.request("/conversations?q=conv-"); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversations: ConversationMeta[] }; + expect(body.conversations).toHaveLength(2); + expect(body.conversations.map((c) => c.id)).toEqual(["conv-1", "conv-2"]); + }); + + it("?q= returns all when q is empty", async () => { + const app = appWithList(sampleConvos); + const res = await app.request("/conversations?q="); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversations: ConversationMeta[] }; + expect(body.conversations).toHaveLength(3); + }); + + it("?q= with whitespace-only returns all (trimmed to empty)", async () => { + const app = appWithList(sampleConvos); + const res = await app.request("/conversations?q=%20%20%20"); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversations: ConversationMeta[] }; + expect(body.conversations).toHaveLength(3); + }); + + it("returns 500 when listConversations throws", async () => { + const store: ConversationStore = { + ...createFakeConversationStore(), + async listConversations() { + throw new Error("db down"); + }, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/conversations"); + expect(res.status).toBe(500); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("Failed to list conversations"); + }); +}); + +describe("GET /conversations/:id/last", () => { + function appWithMessages(messagesByConv: Map<string, ChatMessage[]>) { + const store: ConversationStore = { + ...createFakeConversationStore(), + async load(conversationId) { + return messagesByConv.get(conversationId) ?? []; + }, + }; + return createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + } + + it("returns last assistant text", async () => { + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "hello" }] }, + { role: "assistant", chunks: [{ type: "text", text: "hi there" }] }, + { role: "user", chunks: [{ type: "text", text: "more" }] }, + { role: "assistant", chunks: [{ type: "text", text: "final reply" }] }, + ]; + const app = appWithMessages(new Map([["conv1", messages]])); + const res = await app.request("/conversations/conv1/last"); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversationId: string; content: string; turnId?: string }; + expect(body.conversationId).toBe("conv1"); + expect(body.content).toBe("final reply"); + expect(body.turnId).toBeUndefined(); + }); + + it("returns empty content for unknown conversation", async () => { + const app = appWithMessages(new Map()); + const res = await app.request("/conversations/unknown/last"); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversationId: string; content: string; turnId?: string }; + expect(body.conversationId).toBe("unknown"); + expect(body.content).toBe(""); + expect(body.turnId).toBeUndefined(); + }); + + it("blocks until turn settles", async () => { + const turnId = "sealed-turn"; + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + subscribe(conversationId, listener) { + const event = { type: "turn-sealed" as const, conversationId, turnId }; + setTimeout(() => listener(event), 0); + return () => {}; + }, + isActive() { + return true; + }, + }; + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "hello" }] }, + { role: "assistant", chunks: [{ type: "text", text: "after seal" }] }, + ]; + const store: ConversationStore = { + ...createFakeConversationStore(), + async load() { + return messages; + }, + }; + const app = createApp({ + conversationStore: store, + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv1/last"); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversationId: string; content: string; turnId?: string }; + expect(body.conversationId).toBe("conv1"); + expect(body.content).toBe("after seal"); + expect(body.turnId).toBe(turnId); + }); +}); + +describe("POST /conversations/:id/open", () => { + it("returns 200", async () => { + const emit: HostAPI["emit"] = () => {}; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + emit, + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/open", { method: "POST" }); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversationId: string }; + expect(body.conversationId).toBe("conv1"); + }); + + it("calls emit with conversationOpened", async () => { + const emitCalls: Array<{ readonly hook: unknown; readonly payload: unknown }> = []; + const emit: HostAPI["emit"] = (hook, payload) => { + emitCalls.push({ hook, payload }); + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + emit, + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/open", { method: "POST" }); + expect(res.status).toBe(200); + expect(emitCalls).toHaveLength(1); + expect(emitCalls[0]?.hook).toBe(conversationOpened); + expect(emitCalls[0]?.payload).toEqual({ conversationId: "conv1" }); + }); + + it("returns 500 when emit is absent", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/open", { method: "POST" }); + expect(res.status).toBe(500); + const body = (await res.json()) as { error: string }; + expect(body.error).toBe("not available"); + }); +}); + +describe("PUT /conversations/:id/title", () => { + it("returns 200 with title", async () => { + const store = createFakeConversationStore(); + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/title", { + method: "PUT", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ title: "My Conversation" }), + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversationId: string; title: string }; + expect(body.conversationId).toBe("conv1"); + expect(body.title).toBe("My Conversation"); + }); + + it("rejects empty title with 400", async () => { + let setTitleCalled = false; + const store: ConversationStore = { + ...createFakeConversationStore(), + async setConversationTitle() { + setTitleCalled = true; + }, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/title", { + method: "PUT", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ title: " " }), + }); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("title"); + expect(setTitleCalled).toBe(false); + }); + + it("returns 400 when title is missing", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/title", { + method: "PUT", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("title"); + }); + + it("forwards the trimmed title to setConversationTitle", async () => { + const calls: { conversationId: string; title: string }[] = []; + const store: ConversationStore = { + ...createFakeConversationStore(), + async setConversationTitle(conversationId, title) { + calls.push({ conversationId, title }); + }, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/title", { + method: "PUT", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ title: " trimmed title " }), + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { conversationId: string; title: string }; + expect(body.title).toBe("trimmed title"); + expect(calls).toHaveLength(1); + expect(calls[0]?.conversationId).toBe("conv1"); + expect(calls[0]?.title).toBe("trimmed title"); + }); + + it("returns 400 for invalid JSON body", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/conversations/conv1/title", { + method: "PUT", + headers: { "Content-Type": "application/json" }, + body: "not json", + }); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("JSON"); + }); +}); + +describe("extractLastAssistantText", () => { + it("returns last assistant text chunk", () => { + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "hello" }] }, + { role: "assistant", chunks: [{ type: "text", text: "hi there" }] }, + { role: "user", chunks: [{ type: "text", text: "how are you?" }] }, + { role: "assistant", chunks: [{ type: "text", text: "I'm good!" }] }, + ]; + expect(extractLastAssistantText(messages)).toBe("I'm good!"); + }); + + it("returns empty string when no assistant message", () => { + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "hello" }] }, + { role: "system", chunks: [{ type: "text", text: "system prompt" }] }, + ]; + expect(extractLastAssistantText(messages)).toBe(""); + }); + + it("returns the LAST text chunk when an assistant message has multiple", () => { + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "hello" }] }, + { + role: "assistant", + chunks: [ + { type: "text", text: "first" }, + { type: "thinking", text: "internal reasoning" }, + { type: "text", text: "second" }, + ], + }, + ]; + expect(extractLastAssistantText(messages)).toBe("second"); + }); + + it("returns empty string when the last assistant message has no text chunk", () => { + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "hello" }] }, + { + role: "assistant", + chunks: [ + { + type: "tool-call", + toolCallId: "tc1", + toolName: "read_file", + input: { path: "/tmp" }, + }, + ], + }, + ]; + expect(extractLastAssistantText(messages)).toBe(""); + }); + + it("returns empty string for an empty message list", () => { + expect(extractLastAssistantText([])).toBe(""); + }); +}); diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 8cb85c9..86bac0d 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -1,15 +1,19 @@ -import type { AgentEvent, Logger } from "@dispatch/kernel"; +import type { AgentEvent, HostAPI, Logger } from "@dispatch/kernel"; import type { CloseConversationResponse, ConversationHistoryResponse, + ConversationListResponse, ConversationMetricsResponse, CwdResponse, + LastMessageResponse, LspServerInfo, LspStatusResponse, ModelsResponse, + OpenConversationResponse, QueueResponse, ReasoningEffortResponse, ThroughputResponse, + TitleResponse, WarmResponse, } from "@dispatch/transport-contract"; import { Hono } from "hono"; @@ -17,6 +21,7 @@ import { cors } from "hono/cors"; import { computeCachePct, computeExpectedCacheRate, + extractLastAssistantText, isParseError, isReasoningEffortParseError, isSinceSeqError, @@ -32,6 +37,7 @@ import { import { type ConversationStore, type CredentialStore, + conversationOpened, type LspServerStatus, type LspService, type SessionOrchestrator, @@ -52,6 +58,12 @@ export interface CreateServerOptions { readonly generateId?: () => string; /** Injectable clock for sample timestamps (default Date.now). */ readonly now?: () => number; + /** + * Fire-and-forget event-bus emit (bound `host.emit`). Required by + * `POST /conversations/:id/open` to signal the frontend. When absent, + * that endpoint responds `500 { error: "not available" }`. + */ + readonly emit?: HostAPI["emit"]; } const noopLogger: Logger = { @@ -531,5 +543,131 @@ export function createApp(opts: CreateServerOptions): Hono { } }); + app.get("/conversations", async (c) => { + try { + const all = await opts.conversationStore.listConversations(); + // Optional `?q=` filters by id prefix (short-id resolution). A + // missing/empty/whitespace-only `q` is ignored → return all. + const rawQ = c.req.query("q"); + const q = rawQ?.trim() ?? ""; + const conversations = q.length > 0 ? all.filter((m) => m.id.startsWith(q)) : all; + log.info("conversations: list", { + count: conversations.length, + ...(q.length > 0 ? { q } : {}), + }); + const body: ConversationListResponse = { conversations }; + return c.json(body, 200); + } catch (err) { + log.error("conversations: list failure", { err }); + return c.json({ error: "Failed to list conversations" }, 500); + } + }); + + app.get("/conversations/:id/last", async (c) => { + const conversationId = c.req.param("id"); + + // Subscribe BEFORE checking isActive — closes the race where a seal + // fires between the check and the subscribe (we'd miss it). If idle, + // unsubscribe immediately; if active, wait for a `turn-sealed` event + // (or a 60s timeout, then proceed regardless of what's available). + let turnId: string | undefined; + let unsubscribe: (() => void) | undefined; + try { + await new Promise<void>((resolve) => { + let settled = false; + let timer: ReturnType<typeof setTimeout> | undefined; + const finish = (): void => { + if (settled) return; + settled = true; + if (timer !== undefined) clearTimeout(timer); + resolve(); + }; + unsubscribe = opts.orchestrator.subscribe(conversationId, (event) => { + if (event.type === "turn-sealed") { + turnId = event.turnId; + finish(); + } + }); + if (!opts.orchestrator.isActive(conversationId)) { + finish(); + return; + } + // A seal may have fired synchronously during subscribe (the + // real orchestrator never does this, but a fake might) — don't + // arm a 60s timer for an already-settled promise. + if (settled) return; + timer = setTimeout(finish, 60_000); + }); + } finally { + unsubscribe?.(); + } + + let content = ""; + try { + const messages = await opts.conversationStore.load(conversationId); + content = extractLastAssistantText(messages); + } catch (err) { + log.error("conversations: last message load failure", { err }); + return c.json({ error: "Failed to load conversation" }, 500); + } + + log.info("conversations: last read", { + conversationId, + hasContent: content.length > 0, + }); + const body: LastMessageResponse = { + conversationId, + content, + ...(turnId !== undefined ? { turnId } : {}), + }; + return c.json(body, 200); + }); + + app.post("/conversations/:id/open", (c) => { + const conversationId = c.req.param("id"); + if (opts.emit === undefined) { + log.warn("conversations: open requested but emit is not available", { + conversationId, + }); + return c.json({ error: "not available" }, 500); + } + opts.emit(conversationOpened, { conversationId }); + log.info("conversations: opened", { conversationId }); + const body: OpenConversationResponse = { conversationId }; + return c.json(body, 200); + }); + + app.put("/conversations/:id/title", async (c) => { + const conversationId = c.req.param("id"); + let body: unknown; + try { + body = await c.req.json(); + } catch { + log.warn("conversations/title: invalid JSON body"); + return c.json({ error: "Invalid JSON body" }, 400); + } + + if (body === null || typeof body !== "object") { + return c.json({ error: "Request body must be a JSON object" }, 400); + } + const obj = body as Record<string, unknown>; + if (typeof obj.title !== "string" || obj.title.trim().length === 0) { + return c.json({ error: "Field 'title' is required and must be a non-empty string" }, 400); + } + // Trim before persisting (mirrors how `parseQueueBody` / `parseChatBody` + // forward trimmed text), so a title never carries surrounding whitespace. + const title = obj.title.trim(); + + try { + await opts.conversationStore.setConversationTitle(conversationId, title); + log.info("conversations: title set", { conversationId }); + const response: TitleResponse = { conversationId, title }; + return c.json(response, 200); + } catch (err) { + log.error("conversations: title set failure", { err }); + return c.json({ error: "Failed to set conversation title" }, 500); + } + }); + return app; } diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts index 9e5f037..e402213 100644 --- a/packages/transport-http/src/extension.ts +++ b/packages/transport-http/src/extension.ts @@ -72,6 +72,7 @@ export function createTransportHttpExtension(): Extension & { warmService, lspService, logger, + emit: host.emit.bind(host), }); const port = host.config.get<number>("httpPort") ?? 24203; diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts index 735dc38..f1a5a41 100644 --- a/packages/transport-http/src/index.ts +++ b/packages/transport-http/src/index.ts @@ -12,6 +12,7 @@ export type { } from "./logic.js"; export { computeCachePct, + extractLastAssistantText, isParseError, isReasoningEffortParseError, isSinceSeqError, diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts index aa5394c..d20713c 100644 --- a/packages/transport-http/src/logic.ts +++ b/packages/transport-http/src/logic.ts @@ -1,4 +1,4 @@ -import type { AgentEvent, ReasoningEffort } from "@dispatch/kernel"; +import type { AgentEvent, ChatMessage, ReasoningEffort } from "@dispatch/kernel"; const VALID_REASONING_EFFORTS: readonly ReasoningEffort[] = [ "low", @@ -221,3 +221,33 @@ export function isReasoningEffortParseError( ): result is ParseError { return typeof result === "object" && result !== null && "error" in result; } + +/** + * Extract the text of the last assistant message's last `text` chunk — the + * "show me the last reply" affordance for `GET /conversations/:id/last`. + * + * Scan from the END for the last message with `role: "assistant"`, then within + * THAT message for the last `type: "text"` chunk. Returns its `text`. Returns + * `""` when there is no assistant message, or when the last assistant message + * has no text chunk (e.g. only tool-call chunks). + * + * Pure (input → output); zero I/O, so it tests directly without mocks. + */ +export function extractLastAssistantText(messages: readonly ChatMessage[]): string { + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (msg === undefined || msg.role !== "assistant") continue; + // Found the last assistant message — scan its chunks from the end for + // the last `text` chunk. Stop here (do not keep scanning earlier + // assistant messages): the contract is "the last assistant message's + // last text chunk", not "the most recent text chunk anywhere". + for (let j = msg.chunks.length - 1; j >= 0; j--) { + const chunk = msg.chunks[j]; + if (chunk !== undefined && chunk.type === "text") { + return chunk.text; + } + } + return ""; + } + return ""; +} diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts index 43c9d4d..1c89a34 100644 --- a/packages/transport-http/src/seam.ts +++ b/packages/transport-http/src/seam.ts @@ -5,6 +5,10 @@ export { credentialStoreHandle } from "@dispatch/credential-store"; export type { LspServerStatus, LspService } from "@dispatch/lsp"; export { lspServiceHandle } from "@dispatch/lsp"; export type { SessionOrchestrator, WarmService } from "@dispatch/session-orchestrator"; -export { cacheWarmHandle, sessionOrchestratorHandle } from "@dispatch/session-orchestrator"; +export { + cacheWarmHandle, + conversationOpened, + sessionOrchestratorHandle, +} from "@dispatch/session-orchestrator"; export type { ThroughputStore } from "@dispatch/throughput-store"; export { ThroughputQueryError, throughputStoreHandle } from "@dispatch/throughput-store"; |
