diff options
| author | Adam Malczewski <[email protected]> | 2026-06-04 23:50:34 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-04 23:50:34 +0900 |
| commit | 357ad3567480b5483220e2cea266a4f1417d174d (patch) | |
| tree | f9eba8ebad94d7396546e9e953631109cac43f84 | |
| parent | 3390f5ed73674ba12f08ee801869ffa2d5b9b38d (diff) | |
| download | dispatch-357ad3567480b5483220e2cea266a4f1417d174d.tar.gz dispatch-357ad3567480b5483220e2cea266a4f1417d174d.zip | |
feat(core-ext): session-orchestrator + transport-http (parallel); wire into build graph (164 tests)
| -rw-r--r-- | bun.lock | 32 | ||||
| -rw-r--r-- | packages/session-orchestrator/package.json | 12 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/extension.ts | 64 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/index.ts | 13 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 200 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 60 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/pure.test.ts | 69 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/pure.ts | 27 | ||||
| -rw-r--r-- | packages/session-orchestrator/tsconfig.json | 6 | ||||
| -rw-r--r-- | packages/transport-http/package.json | 13 | ||||
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 154 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 70 | ||||
| -rw-r--r-- | packages/transport-http/src/extension.ts | 30 | ||||
| -rw-r--r-- | packages/transport-http/src/index.ts | 7 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.test.ts | 102 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.ts | 40 | ||||
| -rw-r--r-- | packages/transport-http/src/seam.ts | 2 | ||||
| -rw-r--r-- | packages/transport-http/tsconfig.json | 6 | ||||
| -rw-r--r-- | tsconfig.json | 22 |
19 files changed, 914 insertions, 15 deletions
@@ -18,6 +18,13 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/conversation-store": { + "name": "@dispatch/conversation-store", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + }, + }, "packages/kernel": { "name": "@dispatch/kernel", "version": "0.0.0", @@ -29,6 +36,14 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/session-orchestrator": { + "name": "@dispatch/session-orchestrator", + "version": "0.0.0", + "dependencies": { + "@dispatch/conversation-store": "workspace:*", + "@dispatch/kernel": "workspace:*", + }, + }, "packages/storage-sqlite": { "name": "@dispatch/storage-sqlite", "version": "0.0.0", @@ -36,6 +51,15 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/transport-http": { + "name": "@dispatch/transport-http", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*", + "hono": "^4.0.0", + }, + }, }, "packages": { "@biomejs/biome": ["@biomejs/[email protected]", "", { "optionalDependencies": { "@biomejs/cli-darwin-arm64": "2.4.16", "@biomejs/cli-darwin-x64": "2.4.16", "@biomejs/cli-linux-arm64": "2.4.16", "@biomejs/cli-linux-arm64-musl": "2.4.16", "@biomejs/cli-linux-x64": "2.4.16", "@biomejs/cli-linux-x64-musl": "2.4.16", "@biomejs/cli-win32-arm64": "2.4.16", "@biomejs/cli-win32-x64": "2.4.16" }, "bin": { "biome": "bin/biome" } }, "sha512-x9ajFh1zChVybCiM3TN6OD4phAqLgtPZjFrZF+aTMYCPjwBO+k529TX7PPsAqtGNLeV4UgzwQnowEgS7bGmzcA=="], @@ -58,12 +82,18 @@ "@dispatch/auth-apikey": ["@dispatch/auth-apikey@workspace:packages/auth-apikey"], + "@dispatch/conversation-store": ["@dispatch/conversation-store@workspace:packages/conversation-store"], + "@dispatch/kernel": ["@dispatch/kernel@workspace:packages/kernel"], "@dispatch/provider-openai-compat": ["@dispatch/provider-openai-compat@workspace:packages/provider-openai-compat"], + "@dispatch/session-orchestrator": ["@dispatch/session-orchestrator@workspace:packages/session-orchestrator"], + "@dispatch/storage-sqlite": ["@dispatch/storage-sqlite@workspace:packages/storage-sqlite"], + "@dispatch/transport-http": ["@dispatch/transport-http@workspace:packages/transport-http"], + "@esbuild/aix-ppc64": ["@esbuild/[email protected]", "", { "os": "aix", "cpu": "ppc64" }, "sha512-EKX3Qwmhz1eMdEJokhALr0YiD0lhQNwDqkPYyPhiSwKrh7/4KRjQc04sZ8db+5DVVnZ1LmbNDI1uAMPEUBnQPg=="], "@esbuild/android-arm": ["@esbuild/[email protected]", "", { "os": "android", "cpu": "arm" }, "sha512-jbPXvB4Yj2yBV7HUfE2KHe4GJX51QplCN1pGbYjvsyCZbQmies29EoJbkEc+vYuU5o45AfQn37vZlyXy4YJ8RQ=="], @@ -218,6 +248,8 @@ "fsevents": ["[email protected]", "", { "os": "darwin" }, "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw=="], + "hono": ["[email protected]", "", {}, "sha512-eIaZ9qDgu7XV0pxOCrg7/WhnQ6Ivm22UcxhXx/A3dcbqbbYgBEkc6e/J/s7j2tS96zoB0S9VBdLwQNCWwUo4LA=="], + "js-tokens": ["[email protected]", "", {}, "sha512-mxa9E9ITFOt0ban3j6L5MpjwegGz6lBQmM1IJkWeBZGcMxto50+eWdjC/52xDbS2vy0k7vIMK0Fe2wfL9OQSpQ=="], "loupe": ["[email protected]", "", {}, "sha512-CdzqowRJCeLU72bHvWqwRBBlLcMEtIvGrlvef74kMnV2AolS9Y8xUv1I0U/MNAWMhBlKIoyuEgoJ0t/bbwHbLQ=="], diff --git a/packages/session-orchestrator/package.json b/packages/session-orchestrator/package.json new file mode 100644 index 0000000..2556ead --- /dev/null +++ b/packages/session-orchestrator/package.json @@ -0,0 +1,12 @@ +{ + "name": "@dispatch/session-orchestrator", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/conversation-store": "workspace:*" + } +} diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts new file mode 100644 index 0000000..c75bf29 --- /dev/null +++ b/packages/session-orchestrator/src/extension.ts @@ -0,0 +1,64 @@ +import { conversationStoreHandle } from "@dispatch/conversation-store"; +import type { + Extension, + HostAPI, + Manifest, + ProviderContract, + ToolContract, +} from "@dispatch/kernel"; +import { runTurn } from "@dispatch/kernel"; +import { + createSessionOrchestrator, + type SessionOrchestrator, + sessionOrchestratorHandle, +} from "./orchestrator.js"; +import { selectFirstProvider } from "./pure.js"; + +export const manifest: Manifest = { + id: "session-orchestrator", + name: "Session Orchestrator", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + dependsOn: ["conversation-store"], + activation: "eager", + contributes: { + services: ["session-orchestrator/orchestrator"], + }, +}; + +interface ProviderResolvingHostAPI extends HostAPI { + readonly getProviders?: () => ReadonlyMap<string, ProviderContract>; + readonly getTools?: () => ReadonlyMap<string, ToolContract>; +} + +export function activate(host: HostAPI): void { + const conversationStore = host.getService(conversationStoreHandle); + const extendedHost = host as ProviderResolvingHostAPI; + + const orchestrator: SessionOrchestrator = createSessionOrchestrator({ + conversationStore, + resolveProvider: () => { + if (extendedHost.getProviders !== undefined) { + return selectFirstProvider(extendedHost.getProviders()); + } + throw new Error( + "HostAPI does not expose getProviders() — change-request: add provider resolution to HostAPI", + ); + }, + resolveTools: () => { + if (extendedHost.getTools !== undefined) { + return [...extendedHost.getTools().values()]; + } + return []; + }, + runTurn, + }); + + host.provideService(sessionOrchestratorHandle, orchestrator); +} + +export const extension: Extension = { + manifest, + activate, +}; diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts new file mode 100644 index 0000000..3270c46 --- /dev/null +++ b/packages/session-orchestrator/src/index.ts @@ -0,0 +1,13 @@ +export { extension, manifest } from "./extension.js"; +export { + createSessionOrchestrator, + type SessionOrchestrator, + type SessionOrchestratorDeps, + sessionOrchestratorHandle, +} from "./orchestrator.js"; +export { + buildUserMessage, + defaultDispatchPolicy, + generateTurnId, + selectFirstProvider, +} from "./pure.js"; diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts new file mode 100644 index 0000000..0d908b2 --- /dev/null +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -0,0 +1,200 @@ +import type { ConversationStore } from "@dispatch/conversation-store"; +import type { AgentEvent, ChatMessage, ProviderContract, ProviderEvent } from "@dispatch/kernel"; +import { runTurn } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import { createSessionOrchestrator } from "./orchestrator.js"; + +function createInMemoryStore(): ConversationStore & { + readonly data: Map<string, ChatMessage[]>; +} { + const data = new Map<string, ChatMessage[]>(); + return { + data, + async append(conversationId, messages) { + const existing = data.get(conversationId) ?? []; + data.set(conversationId, [...existing, ...messages]); + }, + async load(conversationId) { + return [...(data.get(conversationId) ?? [])]; + }, + }; +} + +function createFakeProvider(script: ProviderEvent[][]): ProviderContract { + let callIndex = 0; + return { + id: "fake", + stream(_messages, _tools) { + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) { + yield event; + } + })(); + }, + }; +} + +function collectEvents(): { events: AgentEvent[]; onEvent: (event: AgentEvent) => void } { + const events: AgentEvent[] = []; + return { events, onEvent: (event) => events.push(event) }; +} + +describe("handleMessage integration", () => { + it("loads history, runs turn, emits events, and persists result", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "text-delta", delta: " there" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn, + }); + + const { events, onEvent } = collectEvents(); + + await orchestrator.handleMessage({ + conversationId: "conv-1", + text: "Hi", + onEvent, + }); + + expect(events.length).toBeGreaterThan(0); + const textDeltas = events.filter((e) => e.type === "text-delta"); + expect(textDeltas).toHaveLength(2); + + const stored = store.data.get("conv-1"); + expect(stored).toBeDefined(); + expect(stored).toHaveLength(2); + expect(stored?.[0]?.role).toBe("user"); + expect(stored?.[1]?.role).toBe("assistant"); + + const userChunks = stored?.[0]?.chunks ?? []; + expect(userChunks[0]).toEqual({ type: "text", text: "Hi" }); + + const assistantChunks = stored?.[1]?.chunks ?? []; + expect(assistantChunks.some((c) => c.type === "text")).toBe(true); + }); + + it("multi-turn: second call sees first turn in history", async () => { + const store = createInMemoryStore(); + let capturedMessages: ChatMessage[] | undefined; + + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream(messages, _tools) { + if (callCount === 1) { + capturedMessages = [...messages]; + } + callCount++; + return (async function* () { + yield { type: "text-delta", delta: `Reply ${callCount}` } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-multi", + text: "First message", + onEvent: () => {}, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-multi", + text: "Second message", + onEvent: () => {}, + }); + + expect(capturedMessages).toBeDefined(); + expect(capturedMessages?.length).toBeGreaterThanOrEqual(3); + + expect(capturedMessages?.[0]?.role).toBe("user"); + const firstUserText = capturedMessages?.[0]?.chunks[0]; + expect(firstUserText).toEqual({ type: "text", text: "First message" }); + + expect(capturedMessages?.[1]?.role).toBe("assistant"); + + const lastUser = capturedMessages?.findLast((m) => m.role === "user"); + expect(lastUser).toBeDefined(); + const lastUserText = lastUser?.chunks[0]; + expect(lastUserText).toEqual({ type: "text", text: "Second message" }); + }); + + it("passes abort signal through to runTurn", async () => { + const store = createInMemoryStore(); + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-abort", + text: "test", + onEvent: () => {}, + signal: ac.signal, + }); + + const stored = store.data.get("conv-abort"); + expect(stored).toBeDefined(); + expect(stored).toHaveLength(1); + expect(stored?.[0]?.role).toBe("user"); + }); + + it("uses custom dispatch policy when resolveDispatch is provided", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + resolveDispatch: () => ({ maxConcurrent: 4, eager: false }), + runTurn, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-dispatch", + text: "test", + onEvent: () => {}, + }); + + const stored = store.data.get("conv-dispatch"); + expect(stored).toBeDefined(); + expect(stored?.length).toBeGreaterThanOrEqual(1); + }); +}); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts new file mode 100644 index 0000000..f209d2d --- /dev/null +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -0,0 +1,60 @@ +import type { ConversationStore } from "@dispatch/conversation-store"; +import type { + AgentEvent, + ChatMessage, + ProviderContract, + RunTurnInput, + RunTurnResult, + ToolContract, + ToolDispatchPolicy, +} from "@dispatch/kernel"; +import { defineService } from "@dispatch/kernel"; +import { buildUserMessage, defaultDispatchPolicy, generateTurnId } from "./pure.js"; + +export interface SessionOrchestrator { + handleMessage(input: { + conversationId: string; + text: string; + onEvent: (event: AgentEvent) => void; + signal?: AbortSignal; + }): Promise<void>; +} + +export const sessionOrchestratorHandle = defineService<SessionOrchestrator>( + "session-orchestrator/orchestrator", +); + +export interface SessionOrchestratorDeps { + readonly conversationStore: ConversationStore; + readonly resolveProvider: () => ProviderContract; + readonly resolveTools: () => readonly ToolContract[]; + readonly resolveDispatch?: () => ToolDispatchPolicy; + readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>; +} + +export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator { + return { + async handleMessage({ conversationId, text, onEvent, signal }) { + const history = await deps.conversationStore.load(conversationId); + const userMsg = buildUserMessage(text); + const provider = deps.resolveProvider(); + const tools = deps.resolveTools(); + const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy(); + const turnId = generateTurnId(); + + const result = await deps.runTurn({ + provider, + messages: [...history, userMsg], + tools, + dispatch, + emit: onEvent, + tabId: conversationId, + turnId, + ...(signal !== undefined ? { signal } : {}), + }); + + const toPersist: ChatMessage[] = [userMsg, ...result.messages]; + await deps.conversationStore.append(conversationId, toPersist); + }, + }; +} diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts new file mode 100644 index 0000000..e233fca --- /dev/null +++ b/packages/session-orchestrator/src/pure.test.ts @@ -0,0 +1,69 @@ +import type { ProviderContract } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import { + buildUserMessage, + defaultDispatchPolicy, + generateTurnId, + selectFirstProvider, +} from "./pure.js"; + +describe("buildUserMessage", () => { + it("creates a user message with a single text chunk", () => { + const msg = buildUserMessage("hello world"); + expect(msg.role).toBe("user"); + expect(msg.chunks).toHaveLength(1); + expect(msg.chunks[0]).toEqual({ type: "text", text: "hello world" }); + }); + + it("preserves empty text", () => { + const msg = buildUserMessage(""); + expect(msg.role).toBe("user"); + expect(msg.chunks[0]).toEqual({ type: "text", text: "" }); + }); +}); + +describe("selectFirstProvider", () => { + it("returns the first provider from a non-empty map", () => { + const provider: ProviderContract = { + id: "test-provider", + stream: async function* () {}, + }; + const providers = new Map<string, ProviderContract>(); + providers.set("test-provider", provider); + + expect(selectFirstProvider(providers)).toBe(provider); + }); + + it("throws when the map is empty", () => { + const providers = new Map<string, ProviderContract>(); + expect(() => selectFirstProvider(providers)).toThrow("No providers registered"); + }); + + it("returns the first inserted provider when multiple exist", () => { + const first: ProviderContract = { id: "first", stream: async function* () {} }; + const second: ProviderContract = { id: "second", stream: async function* () {} }; + const providers = new Map<string, ProviderContract>(); + providers.set("first", first); + providers.set("second", second); + + expect(selectFirstProvider(providers).id).toBe("first"); + }); +}); + +describe("defaultDispatchPolicy", () => { + it("returns maxConcurrent: 1, eager: true", () => { + expect(defaultDispatchPolicy()).toEqual({ maxConcurrent: 1, eager: true }); + }); +}); + +describe("generateTurnId", () => { + it("returns a string starting with 'turn-'", () => { + const id = generateTurnId(); + expect(id).toMatch(/^turn-/); + }); + + it("returns unique ids", () => { + const ids = new Set(Array.from({ length: 100 }, () => generateTurnId())); + expect(ids.size).toBe(100); + }); +}); diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts new file mode 100644 index 0000000..46cb79a --- /dev/null +++ b/packages/session-orchestrator/src/pure.ts @@ -0,0 +1,27 @@ +import type { ChatMessage, ProviderContract, ToolDispatchPolicy } from "@dispatch/kernel"; + +export function buildUserMessage(text: string): ChatMessage { + return { role: "user", chunks: [{ type: "text", text }] }; +} + +export function selectFirstProvider( + providers: ReadonlyMap<string, ProviderContract>, +): ProviderContract { + const first = providers.values().next(); + if (first.done === true || first.value === undefined) { + throw new Error("No providers registered — at least one provider is required to run a turn."); + } + return first.value; +} + +export function resolveTools(tools: ReadonlyMap<string, unknown>): readonly unknown[] { + return [...tools.values()]; +} + +export function defaultDispatchPolicy(): ToolDispatchPolicy { + return { maxConcurrent: 1, eager: true }; +} + +export function generateTurnId(): string { + return `turn-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} diff --git a/packages/session-orchestrator/tsconfig.json b/packages/session-orchestrator/tsconfig.json new file mode 100644 index 0000000..6b137f4 --- /dev/null +++ b/packages/session-orchestrator/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [{ "path": "../kernel" }, { "path": "../conversation-store" }] +} diff --git a/packages/transport-http/package.json b/packages/transport-http/package.json new file mode 100644 index 0000000..c9273f4 --- /dev/null +++ b/packages/transport-http/package.json @@ -0,0 +1,13 @@ +{ + "name": "@dispatch/transport-http", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*", + "hono": "^4.0.0" + } +} diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts new file mode 100644 index 0000000..0125952 --- /dev/null +++ b/packages/transport-http/src/app.test.ts @@ -0,0 +1,154 @@ +import type { AgentEvent } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import { createApp } from "./app.js"; +import type { SessionOrchestrator } from "./seam.js"; + +function createFakeOrchestrator(events: AgentEvent[]): SessionOrchestrator { + return { + async handleMessage(input) { + for (const event of events) { + input.onEvent(event); + } + }, + }; +} + +function createThrowingOrchestrator(error: Error): SessionOrchestrator { + return { + async handleMessage() { + throw error; + }, + }; +} + +describe("GET /health", () => { + it("returns ok", async () => { + const app = createApp({ orchestrator: createFakeOrchestrator([]) }); + const res = await app.request("/health"); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body).toEqual({ ok: true }); + }); +}); + +describe("POST /chat", () => { + it("returns 400 for invalid JSON", async () => { + const app = createApp({ orchestrator: createFakeOrchestrator([]) }); + const res = await app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "not json", + }); + expect(res.status).toBe(400); + }); + + it("returns 400 for missing message", async () => { + const app = createApp({ orchestrator: createFakeOrchestrator([]) }); + const res = await app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ conversationId: "c1" }), + }); + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("message"); + }); + + it("returns 400 for empty message", async () => { + const app = createApp({ orchestrator: createFakeOrchestrator([]) }); + const res = await app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message: "" }), + }); + expect(res.status).toBe(400); + }); + + it("streams events as NDJSON", async () => { + const events: AgentEvent[] = [ + { type: "turn-start", tabId: "tab1", turnId: "turn1" }, + { type: "text-delta", tabId: "tab1", turnId: "turn1", delta: "Hello" }, + { type: "text-delta", tabId: "tab1", turnId: "turn1", delta: " world" }, + { type: "done", tabId: "tab1", turnId: "turn1", reason: "stop" }, + ]; + const app = createApp({ orchestrator: createFakeOrchestrator(events) }); + + const res = await app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message: "hi", conversationId: "conv1" }), + }); + + expect(res.status).toBe(200); + expect(res.headers.get("Content-Type")).toBe("application/x-ndjson"); + expect(res.headers.get("X-Conversation-Id")).toBe("conv1"); + + const text = await res.text(); + const lines = text.trim().split("\n"); + expect(lines).toHaveLength(4); + + const parsed = lines.map((line) => JSON.parse(line) as AgentEvent); + expect(parsed[0]?.type).toBe("turn-start"); + expect(parsed[1]?.type).toBe("text-delta"); + expect((parsed[1] as { delta: string }).delta).toBe("Hello"); + expect(parsed[2]?.type).toBe("text-delta"); + expect(parsed[3]?.type).toBe("done"); + }); + + it("generates conversationId when not provided", async () => { + const app = createApp({ + orchestrator: createFakeOrchestrator([ + { type: "done", tabId: "tab1", turnId: "turn1", reason: "stop" }, + ]), + generateId: () => "generated-uuid", + }); + + const res = await app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message: "hi" }), + }); + + expect(res.status).toBe(200); + expect(res.headers.get("X-Conversation-Id")).toBe("generated-uuid"); + }); + + it("emits error event when orchestrator throws", async () => { + const app = createApp({ + orchestrator: createThrowingOrchestrator(new Error("provider unavailable")), + }); + + const res = await app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message: "hi", conversationId: "conv1" }), + }); + + expect(res.status).toBe(200); + const text = await res.text(); + const lines = text.trim().split("\n"); + expect(lines.length).toBeGreaterThanOrEqual(1); + + const lastLine = lines[lines.length - 1]; + if (!lastLine) throw new Error("expected at least one line"); + const lastEvent = JSON.parse(lastLine) as AgentEvent; + expect(lastEvent.type).toBe("error"); + if (lastEvent.type === "error") { + expect(lastEvent.message).toContain("provider unavailable"); + } + }); + + it("handles empty event list", async () => { + const app = createApp({ orchestrator: createFakeOrchestrator([]) }); + + const res = await app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message: "hi" }), + }); + + expect(res.status).toBe(200); + const text = await res.text(); + expect(text).toBe(""); + }); +}); diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts new file mode 100644 index 0000000..92553b7 --- /dev/null +++ b/packages/transport-http/src/app.ts @@ -0,0 +1,70 @@ +import type { AgentEvent } from "@dispatch/kernel"; +import { Hono } from "hono"; +import { isParseError, parseChatBody, serializeEventLine } from "./logic.js"; +import type { SessionOrchestrator } from "./seam.js"; + +export interface CreateServerOptions { + readonly orchestrator: SessionOrchestrator; + readonly generateId?: () => string; +} + +export function createApp(opts: CreateServerOptions): Hono { + const app = new Hono(); + const generateId = opts.generateId ?? (() => crypto.randomUUID()); + + app.get("/health", (c) => c.json({ ok: true })); + + app.post("/chat", async (c) => { + let body: unknown; + try { + body = await c.req.json(); + } catch { + return c.json({ error: "Invalid JSON body" }, 400); + } + + const result = parseChatBody(body, generateId); + if (isParseError(result)) { + return c.json({ error: result.error }, 400); + } + + const { conversationId, message } = result; + const events: AgentEvent[] = []; + let resolveStream: () => void; + const streamReady = new Promise<void>((resolve) => { + resolveStream = resolve; + }); + + const orchestratorPromise = opts.orchestrator + .handleMessage({ + conversationId, + text: message, + onEvent: (event) => { + events.push(event); + }, + }) + .then(() => { + resolveStream(); + }) + .catch((err) => { + events.push({ + type: "error", + tabId: conversationId, + turnId: "", + message: err instanceof Error ? err.message : String(err), + }); + resolveStream(); + }); + + await streamReady; + await orchestratorPromise.catch(() => {}); + + const ndjson = events.map(serializeEventLine).join(""); + + return c.text(ndjson, 200, { + "Content-Type": "application/x-ndjson", + "X-Conversation-Id": conversationId, + }); + }); + + return app; +} diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts new file mode 100644 index 0000000..3ed98a8 --- /dev/null +++ b/packages/transport-http/src/extension.ts @@ -0,0 +1,30 @@ +import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; +import type { Hono } from "hono"; +import { createApp } from "./app.js"; +import { sessionOrchestratorHandle } from "./seam.js"; + +export const manifest: Manifest = { + id: "transport-http", + name: "Transport HTTP", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + dependsOn: ["session-orchestrator"], + capabilities: { network: true }, + contributes: { routes: ["/chat", "/health"] }, + activation: "eager", +}; + +export interface CreateServerOptions { + readonly port?: number; +} + +export function createServer(host: HostAPI, _opts?: CreateServerOptions): Hono { + const orchestrator = host.getService(sessionOrchestratorHandle); + return createApp({ orchestrator }); +} + +export const extension: Extension = { + manifest, + activate: (_host: HostAPI) => {}, +}; diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts new file mode 100644 index 0000000..39a80ac --- /dev/null +++ b/packages/transport-http/src/index.ts @@ -0,0 +1,7 @@ +export type { CreateServerOptions } from "./app.js"; +export { createApp } from "./app.js"; +export { createServer, extension, manifest } from "./extension.js"; +export type { ChatCommand, ParseError, ParseResult } from "./logic.js"; +export { isParseError, parseChatBody, serializeEventLine } from "./logic.js"; +export type { SessionOrchestrator } from "./seam.js"; +export { sessionOrchestratorHandle } from "./seam.js"; diff --git a/packages/transport-http/src/logic.test.ts b/packages/transport-http/src/logic.test.ts new file mode 100644 index 0000000..4a77643 --- /dev/null +++ b/packages/transport-http/src/logic.test.ts @@ -0,0 +1,102 @@ +import type { AgentEvent } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import { isParseError, parseChatBody, serializeEventLine } from "./logic.js"; + +describe("parseChatBody", () => { + const fakeId = () => "test-uuid"; + + it("returns error for null body", () => { + const result = parseChatBody(null, fakeId); + expect(isParseError(result)).toBe(true); + if (isParseError(result)) { + expect(result.error).toContain("JSON object"); + } + }); + + it("returns error for non-object body", () => { + const result = parseChatBody("hello", fakeId); + expect(isParseError(result)).toBe(true); + }); + + it("returns error when message is missing", () => { + const result = parseChatBody({ conversationId: "c1" }, fakeId); + expect(isParseError(result)).toBe(true); + if (isParseError(result)) { + expect(result.error).toContain("message"); + } + }); + + it("returns error when message is empty string", () => { + const result = parseChatBody({ message: "" }, fakeId); + expect(isParseError(result)).toBe(true); + }); + + it("returns error when message is whitespace only", () => { + const result = parseChatBody({ message: " " }, fakeId); + expect(isParseError(result)).toBe(true); + }); + + it("returns error when message is not a string", () => { + const result = parseChatBody({ message: 42 }, fakeId); + expect(isParseError(result)).toBe(true); + }); + + it("generates conversationId when absent", () => { + const result = parseChatBody({ message: "hello" }, fakeId); + expect(isParseError(result)).toBe(false); + if (!isParseError(result)) { + expect(result.conversationId).toBe("test-uuid"); + expect(result.message).toBe("hello"); + } + }); + + it("generates conversationId when empty string", () => { + const result = parseChatBody({ message: "hello", conversationId: "" }, fakeId); + expect(isParseError(result)).toBe(false); + if (!isParseError(result)) { + expect(result.conversationId).toBe("test-uuid"); + } + }); + + it("uses provided conversationId", () => { + const result = parseChatBody({ message: "hello", conversationId: "my-conv" }, fakeId); + expect(isParseError(result)).toBe(false); + if (!isParseError(result)) { + expect(result.conversationId).toBe("my-conv"); + } + }); + + it("trims message whitespace", () => { + const result = parseChatBody({ message: " hello world " }, fakeId); + expect(isParseError(result)).toBe(false); + if (!isParseError(result)) { + expect(result.message).toBe("hello world"); + } + }); +}); + +describe("serializeEventLine", () => { + it("serializes an event as JSON followed by newline", () => { + const event: AgentEvent = { + type: "text-delta", + tabId: "tab1", + turnId: "turn1", + delta: "hello", + }; + const line = serializeEventLine(event); + expect(line).toBe(`${JSON.stringify(event)}\n`); + }); + + it("serializes a done event", () => { + const event: AgentEvent = { + type: "done", + tabId: "tab1", + turnId: "turn1", + reason: "stop", + }; + const line = serializeEventLine(event); + const parsed = JSON.parse(line.trim()); + expect(parsed.type).toBe("done"); + expect(parsed.reason).toBe("stop"); + }); +}); diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts new file mode 100644 index 0000000..a1a1638 --- /dev/null +++ b/packages/transport-http/src/logic.ts @@ -0,0 +1,40 @@ +import type { AgentEvent } from "@dispatch/kernel"; + +export interface ChatCommand { + readonly conversationId: string; + readonly message: string; +} + +export interface ParseError { + readonly error: string; +} + +export type ParseResult = ChatCommand | ParseError; + +export function parseChatBody(body: unknown, generateId: () => string): ParseResult { + if (body === null || typeof body !== "object") { + return { error: "Request body must be a JSON object" }; + } + + const obj = body as Record<string, unknown>; + + const message = obj.message; + if (typeof message !== "string" || message.trim().length === 0) { + return { error: "Field 'message' is required and must be a non-empty string" }; + } + + const conversationId = + typeof obj.conversationId === "string" && obj.conversationId.length > 0 + ? obj.conversationId + : generateId(); + + return { conversationId, message: message.trim() }; +} + +export function isParseError(result: ParseResult): result is ParseError { + return "error" in result; +} + +export function serializeEventLine(event: AgentEvent): string { + return `${JSON.stringify(event)}\n`; +} diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts new file mode 100644 index 0000000..c6ce04f --- /dev/null +++ b/packages/transport-http/src/seam.ts @@ -0,0 +1,2 @@ +export type { SessionOrchestrator } from "@dispatch/session-orchestrator"; +export { sessionOrchestratorHandle } from "@dispatch/session-orchestrator"; diff --git a/packages/transport-http/tsconfig.json b/packages/transport-http/tsconfig.json new file mode 100644 index 0000000..2ae3233 --- /dev/null +++ b/packages/transport-http/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [{ "path": "../kernel" }, { "path": "../session-orchestrator" }] +} diff --git a/tsconfig.json b/tsconfig.json index 0bb3a16..fd2e4c1 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,20 +1,12 @@ { "files": [], "references": [ - { - "path": "./packages/auth-apikey" - }, - { - "path": "./packages/conversation-store" - }, - { - "path": "./packages/kernel" - }, - { - "path": "./packages/provider-openai-compat" - }, - { - "path": "./packages/storage-sqlite" - } + { "path": "./packages/kernel" }, + { "path": "./packages/storage-sqlite" }, + { "path": "./packages/auth-apikey" }, + { "path": "./packages/provider-openai-compat" }, + { "path": "./packages/conversation-store" }, + { "path": "./packages/session-orchestrator" }, + { "path": "./packages/transport-http" } ] } |
