diff options
| author | Adam Malczewski <[email protected]> | 2026-06-21 02:08:44 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-21 02:08:44 +0900 |
| commit | ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92 (patch) | |
| tree | 21d87eb847cd526a506cf274467fd1359f349705 /packages/message-queue | |
| parent | 75032313a96856a932c109efbbe6b6a7eb782222 (diff) | |
| download | dispatch-ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92.tar.gz dispatch-ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92.zip | |
feat(message-queue): per-conversation queue + steering injection
A per-conversation message queue (new message-queue extension) holds user
messages enqueued while a turn generates; delivered mid-turn as steering at the
tool-result boundary (or carried to a new turn if no tool call fires).
- kernel: RunTurnInput.drainSteering callback (generic; kernel stays pure)
- wire 0.7.0->0.8.0: QueuedMessage, QueuePayload, TurnSteeringEvent (additive)
- transport-contract 0.11.0->0.12.0: POST /conversations/:id/queue + chat.queue WS op
- message-queue ext: queue state + per-conversation custom surface (rendererId message-queue)
- session-orchestrator: enqueue facade + drainSteering wiring + post-seal carry
- transport-http/ws: queue endpoint + chat.queue op (fixes WsClientMessage exhaustive switch)
- host-bin: register message-queue
1043 vitest + 199 transport bun pass; tsc/biome clean; boot smoke clean.
FE courier: frontend-message-queue-handoff.md.
Diffstat (limited to 'packages/message-queue')
| -rw-r--r-- | packages/message-queue/package.json | 14 | ||||
| -rw-r--r-- | packages/message-queue/src/extension.ts | 82 | ||||
| -rw-r--r-- | packages/message-queue/src/index.ts | 27 | ||||
| -rw-r--r-- | packages/message-queue/src/pure.test.ts | 144 | ||||
| -rw-r--r-- | packages/message-queue/src/pure.ts | 105 | ||||
| -rw-r--r-- | packages/message-queue/src/service.test.ts | 101 | ||||
| -rw-r--r-- | packages/message-queue/src/service.ts | 88 | ||||
| -rw-r--r-- | packages/message-queue/tsconfig.json | 11 |
8 files changed, 572 insertions, 0 deletions
diff --git a/packages/message-queue/package.json b/packages/message-queue/package.json new file mode 100644 index 0000000..a8fcc4b --- /dev/null +++ b/packages/message-queue/package.json @@ -0,0 +1,14 @@ +{ + "name": "@dispatch/message-queue", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/surface-registry": "workspace:*", + "@dispatch/ui-contract": "workspace:*", + "@dispatch/wire": "workspace:*" + } +} diff --git a/packages/message-queue/src/extension.ts b/packages/message-queue/src/extension.ts new file mode 100644 index 0000000..26d19ca --- /dev/null +++ b/packages/message-queue/src/extension.ts @@ -0,0 +1,82 @@ +/** + * message-queue extension — owns the per-conversation steering message queue + * (state + surface + drain-and-clear). Plugs into the kernel host via a + * manifest + activate(host); the session-orchestrator drains it via the + * `messageQueueHandle` service. Does NOT touch the turn loop. + */ +import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; +import type { SurfaceContext, SurfaceProvider } from "@dispatch/surface-registry"; +import { surfaceRegistryHandle } from "@dispatch/surface-registry"; +import type { SurfaceSpec } from "@dispatch/ui-contract"; +import { buildQueueSpec, MESSAGE_QUEUE_SURFACE_ID } from "./pure.js"; +import { createMessageQueueService, messageQueueHandle } from "./service.js"; + +export const manifest: Manifest = { + id: "message-queue", + name: "Message Queue", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + activation: "eager", + dependsOn: ["surface-registry"], + capabilities: {}, + contributes: { + services: ["message-queue"], + }, +}; + +export function activate(host: HostAPI): void { + const registry = host.getService(surfaceRegistryHandle); + + const subscribers = new Set<() => void>(); + + const service = createMessageQueueService({ + id: () => crypto.randomUUID(), + now: () => Date.now(), + notify: () => { + for (const sub of subscribers) { + sub(); + } + }, + logger: host.logger, + }); + + host.provideService(messageQueueHandle, service); + + function getSpec(context?: SurfaceContext): SurfaceSpec { + const convId = context?.conversationId; + const messages = convId === undefined ? [] : service.getQueue(convId); + return buildQueueSpec(messages); + } + + function invoke(_actionId: string, _payload?: unknown, _context?: SurfaceContext): void { + // The message-queue surface is read-only: a client renders the queue and + // the session-orchestrator drains it. No client-facing actions. + } + + const provider: SurfaceProvider = { + catalogEntry: { + id: MESSAGE_QUEUE_SURFACE_ID, + region: "side", + title: "Message Queue", + scope: "conversation", + }, + getSpec, + invoke, + subscribe(onChange) { + subscribers.add(onChange); + return () => { + subscribers.delete(onChange); + }; + }, + }; + + registry.register(provider); + + host.logger.info("message-queue: registered"); +} + +export const extension: Extension = { + manifest, + activate, +}; diff --git a/packages/message-queue/src/index.ts b/packages/message-queue/src/index.ts new file mode 100644 index 0000000..79bbb25 --- /dev/null +++ b/packages/message-queue/src/index.ts @@ -0,0 +1,27 @@ +/** + * @dispatch/message-queue — the per-conversation message queue for mid-turn + * steering. Owns the queue state + a per-conversation `custom` surface; the + * session-orchestrator drains it via `messageQueueHandle`. The queue is + * transient (in-memory, per-conversation, only meaningful during generation); + * the surface is the ONLY way the frontend reads queue state. + */ + +export type { QueuedMessage, QueuePayload } from "@dispatch/wire"; +export { extension, manifest } from "./extension.js"; +export { + buildQueueSpec, + combine, + drain, + enqueue, + getQueue, + MESSAGE_QUEUE_RENDERER_ID, + MESSAGE_QUEUE_SURFACE_ID, + type MessageQueueState, + type QueueDeps, +} from "./pure.js"; +export { + createMessageQueueService, + type MessageQueueDeps, + type MessageQueueService, + messageQueueHandle, +} from "./service.js"; diff --git a/packages/message-queue/src/pure.test.ts b/packages/message-queue/src/pure.test.ts new file mode 100644 index 0000000..ff1a08f --- /dev/null +++ b/packages/message-queue/src/pure.test.ts @@ -0,0 +1,144 @@ +import type { QueuedMessage } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { + buildQueueSpec, + combine, + drain, + enqueue, + getQueue, + MESSAGE_QUEUE_RENDERER_ID, + MESSAGE_QUEUE_SURFACE_ID, + type MessageQueueState, + type QueueDeps, +} from "./pure.js"; + +function makeDeps(): QueueDeps { + let idCounter = 0; + let now = 1_000; + return { + id: () => `id-${++idCounter}`, + now: () => (now += 10), + }; +} + +describe("enqueue", () => { + it("enqueue appends + returns snapshot with the new message (id unique, queuedAt set)", () => { + const state: MessageQueueState = new Map(); + const deps = makeDeps(); + + const first = enqueue(state, "c1", "first", deps); + expect(first).toHaveLength(1); + const firstMsg = first[0]; + if (firstMsg === undefined) throw new Error("expected a message"); + expect(firstMsg.id).toBe("id-1"); + expect(firstMsg.text).toBe("first"); + expect(firstMsg.queuedAt).toBe(1_010); + + const second = enqueue(state, "c1", "second", deps); + expect(second).toHaveLength(2); + const secondMsg = second[1]; + if (secondMsg === undefined) throw new Error("expected a message"); + expect(secondMsg.id).toBe("id-2"); // unique per message + expect(secondMsg.text).toBe("second"); + expect(secondMsg.queuedAt).toBe(1_020); + + // a separate conversation does not share state + const other = enqueue(state, "c2", "other", deps); + expect(other).toHaveLength(1); + expect(getQueue(state, "c2")).toHaveLength(1); + }); +}); + +describe("getQueue", () => { + it("getQueue returns current snapshot; empty array when none", () => { + const state: MessageQueueState = new Map(); + const deps = makeDeps(); + + expect(getQueue(state, "unknown")).toEqual([]); + + enqueue(state, "c1", "x", deps); + enqueue(state, "c1", "y", deps); + const snap = getQueue(state, "c1"); + expect(snap.map((m) => m.text)).toEqual(["x", "y"]); + + // returns a COPY — mutating it does not affect live state + snap.push({ id: "evil", text: "mutate", queuedAt: 0 }); + expect(getQueue(state, "c1")).toHaveLength(2); + }); +}); + +describe("drain", () => { + it("drain returns all + clears; second drain returns []", () => { + const state: MessageQueueState = new Map(); + const deps = makeDeps(); + enqueue(state, "c1", "a", deps); + enqueue(state, "c1", "b", deps); + + const drained = drain(state, "c1"); + expect(drained.map((m) => m.text)).toEqual(["a", "b"]); + + // cleared + expect(getQueue(state, "c1")).toEqual([]); + expect(drain(state, "c1")).toEqual([]); + }); + + it("drain on an empty queue returns [] and is a no-op", () => { + const state: MessageQueueState = new Map(); + + // never had a queue + expect(drain(state, "c1")).toEqual([]); + expect(getQueue(state, "c1")).toEqual([]); + + // and after a drain that emptied it, draining again is a no-op + const deps = makeDeps(); + enqueue(state, "c1", "once", deps); + drain(state, "c1"); + expect(drain(state, "c1")).toEqual([]); + expect(getQueue(state, "c1")).toEqual([]); + }); +}); + +describe("combine", () => { + it("combine joins texts with blank-line separator", () => { + const msgs: QueuedMessage[] = [ + { id: "1", text: "alpha", queuedAt: 1 }, + { id: "2", text: "beta", queuedAt: 2 }, + ]; + expect(combine(msgs)).toBe("alpha\n\nbeta"); + expect(combine([])).toBe(""); + expect(combine([{ id: "1", text: "solo", queuedAt: 1 }])).toBe("solo"); + }); +}); + +describe("buildQueueSpec", () => { + it("renders a custom field with the queue snapshot", () => { + const msgs: QueuedMessage[] = [ + { id: "1", text: "a", queuedAt: 1 }, + { id: "2", text: "b", queuedAt: 2 }, + ]; + const spec = buildQueueSpec(msgs); + expect(spec.id).toBe(MESSAGE_QUEUE_SURFACE_ID); + expect(spec.region).toBe("side"); + expect(spec.title).toBe("Message Queue"); + expect(spec.fields).toHaveLength(1); + + const field = spec.fields[0]; + if (field === undefined || field.kind !== "custom") { + throw new Error("expected a custom field"); + } + expect(field.rendererId).toBe(MESSAGE_QUEUE_RENDERER_ID); + const payload = field.payload as { messages: readonly QueuedMessage[] }; + expect(payload.messages).toHaveLength(2); + expect(payload.messages.map((m) => m.text)).toEqual(["a", "b"]); + }); + + it("renders an empty list for an empty queue", () => { + const spec = buildQueueSpec([]); + const field = spec.fields[0]; + if (field === undefined || field.kind !== "custom") { + throw new Error("expected a custom field"); + } + const payload = field.payload as { messages: readonly QueuedMessage[] }; + expect(payload.messages).toEqual([]); + }); +}); diff --git a/packages/message-queue/src/pure.ts b/packages/message-queue/src/pure.ts new file mode 100644 index 0000000..834018b --- /dev/null +++ b/packages/message-queue/src/pure.ts @@ -0,0 +1,105 @@ +/** + * Pure core for message-queue — zero I/O, zero ambient state. + * + * Every function is input → output; testable without mocks. State is a plain + * `Map<conversationId, QueuedMessage[]>` OWNED by the caller (the service + * shell); the pure functions mutate it in place and return snapshots (fresh + * array copies), so a caller can never reach into or mutate live state through + * a returned value. The id factory + clock are injected (`QueueDeps`) so tests + * are deterministic. + */ + +import type { CustomField, SurfaceSpec } from "@dispatch/ui-contract"; +import type { QueuedMessage, QueuePayload } from "@dispatch/wire"; + +/** The queue state: a per-conversation map of queued messages. */ +export type MessageQueueState = Map<string, QueuedMessage[]>; + +/** Injected effectful factories kept out of the pure core. */ +export interface QueueDeps { + /** Stable (client-visible) id factory for UI keying + dedup. */ + readonly id: () => string; + /** Clock returning epoch-ms for `queuedAt`. */ + readonly now: () => number; +} + +/** Surface id this extension contributes (also the manifest + catalog id). */ +export const MESSAGE_QUEUE_SURFACE_ID = "message-queue"; +/** The custom renderer id a frontend switches on to render the queue. */ +export const MESSAGE_QUEUE_RENDERER_ID = "message-queue"; + +/** + * Append a message to a conversation's queue. Mutates `state` and returns the + * CURRENT snapshot (post-append) — a fresh array copy, so callers cannot mutate + * live state through the returned value. + */ +export function enqueue( + state: MessageQueueState, + conversationId: string, + text: string, + deps: QueueDeps, +): QueuedMessage[] { + const message: QueuedMessage = { id: deps.id(), text, queuedAt: deps.now() }; + const existing = state.get(conversationId); + if (existing === undefined) { + state.set(conversationId, [message]); + } else { + existing.push(message); + } + return getQueue(state, conversationId); +} + +/** + * Current queue snapshot for a conversation — a fresh array copy. Empty array + * if the conversation has no queue / is unknown. + */ +export function getQueue(state: MessageQueueState, conversationId: string): QueuedMessage[] { + const existing = state.get(conversationId); + if (existing === undefined) return []; + return [...existing]; +} + +/** + * Drain: return all queued messages for a conversation and CLEAR its queue. + * Returns a fresh array copy of the drained messages (empty array if the queue + * was empty or unknown). The caller (session-orchestrator) combines these into + * a steering ChatMessage; this returns the raw `QueuedMessage[]`, NOT a + * ChatMessage. + */ +export function drain(state: MessageQueueState, conversationId: string): QueuedMessage[] { + const existing = state.get(conversationId); + if (existing === undefined || existing.length === 0) return []; + const drained = [...existing]; + state.delete(conversationId); + return drained; +} + +/** + * Combine drained messages' texts into a single steering string, joined by a + * blank line (`\n\n`). Pure — the session-orchestrator builds the final + * ChatMessage from this. + */ +export function combine(messages: readonly QueuedMessage[]): string { + return messages.map((m) => m.text).join("\n\n"); +} + +/** + * Build the per-conversation surface spec: a single `custom` field whose + * payload is the current queue snapshot (`QueuePayload`). An empty `messages` + * array (idle conversation / post-drain) renders as an empty list. Pure — no + * I/O; the surface-registry re-fetches this on every notify. + */ +export function buildQueueSpec(messages: readonly QueuedMessage[]): SurfaceSpec { + const payload: QueuePayload = { messages }; + const field: CustomField = { + kind: "custom", + rendererId: MESSAGE_QUEUE_RENDERER_ID, + payload, + }; + return { + id: MESSAGE_QUEUE_SURFACE_ID, + region: "side", + title: "Message Queue", + fields: [field], + }; +} diff --git a/packages/message-queue/src/service.test.ts b/packages/message-queue/src/service.test.ts new file mode 100644 index 0000000..04cde91 --- /dev/null +++ b/packages/message-queue/src/service.test.ts @@ -0,0 +1,101 @@ +import type { SurfaceSpec } from "@dispatch/ui-contract"; +import type { QueuedMessage, QueuePayload } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { buildQueueSpec, combine } from "./pure.js"; +import { createMessageQueueService, type MessageQueueDeps } from "./service.js"; + +interface Recorder extends MessageQueueDeps { + readonly calls: { value: number }; +} + +function makeDeps(): Recorder { + let idCounter = 0; + let now = 1_000; + const calls = { value: 0 }; + return { + id: () => `q-${++idCounter}`, + now: () => (now += 10), + notify: () => { + calls.value += 1; + }, + calls, + }; +} + +function queueMessages(spec: SurfaceSpec): readonly QueuedMessage[] { + const field = spec.fields[0]; + if (field === undefined || field.kind !== "custom") { + throw new Error("expected a custom field"); + } + return (field.payload as QueuePayload).messages; +} + +describe("message-queue service", () => { + it("enqueue pushes a surface update (snapshot grew)", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + expect(deps.calls.value).toBe(0); + + const snapshot = svc.enqueue("c1", "hello"); + expect(deps.calls.value).toBe(1); // surface update pushed + expect(snapshot).toHaveLength(1); + const first = snapshot[0]; + if (first === undefined) throw new Error("expected a message"); + expect(first.text).toBe("hello"); + expect(typeof first.id).toBe("string"); + expect(first.queuedAt).toBe(1_010); + + // the surface spec reflects the grown snapshot + expect(queueMessages(buildQueueSpec(svc.getQueue("c1")))).toHaveLength(1); + }); + + it("drain pushes a surface update (empty)", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + svc.enqueue("c1", "a"); + svc.enqueue("c1", "b"); + expect(deps.calls.value).toBe(2); // two enqueues notified + + const drained = svc.drain("c1"); + expect(deps.calls.value).toBe(3); // drain pushed a surface update + expect(drained).toHaveLength(2); + expect(drained.map((m) => m.text)).toEqual(["a", "b"]); + + // cleared + surface now shows empty + expect(svc.getQueue("c1")).toEqual([]); + expect(queueMessages(buildQueueSpec(svc.getQueue("c1")))).toEqual([]); + }); + + it("drain on empty does NOT push a surface update (already empty)", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + + expect(svc.drain("c1")).toEqual([]); + expect(deps.calls.value).toBe(0); // no notify — no change + + // after a real drain empties it, a further drain is a no-op (no notify) + svc.enqueue("c1", "x"); + expect(deps.calls.value).toBe(1); + svc.drain("c1"); + expect(deps.calls.value).toBe(2); + svc.drain("c1"); + expect(deps.calls.value).toBe(2); // unchanged — queue was already empty + }); + + it("getQueue returns current snapshot; empty for unknown conversation", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + expect(svc.getQueue("nope")).toEqual([]); + svc.enqueue("c1", "hi"); + expect(svc.getQueue("c1")).toHaveLength(1); + }); + + it("drain returns the raw QueuedMessage[] the orchestrator combines", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + svc.enqueue("c1", "alpha"); + svc.enqueue("c1", "beta"); + const drained = svc.drain("c1"); + expect(combine(drained)).toBe("alpha\n\nbeta"); + }); +}); diff --git a/packages/message-queue/src/service.ts b/packages/message-queue/src/service.ts new file mode 100644 index 0000000..91d0fc5 --- /dev/null +++ b/packages/message-queue/src/service.ts @@ -0,0 +1,88 @@ +/** + * Message-queue service — the typed handle the session-orchestrator obtains + * (lazily) to enqueue / read / drain a conversation's steering queue. + * + * The queue is transient + per-conversation (in-memory, only meaningful during + * generation). `drain` returns + clears AND pushes a surface update (empty). + * The orchestrator owns the delivery (drain → combine → inject → carry-to-new- + * turn); this service owns only the queue state, the surface, and the + * drain-and-clear. + */ +import { defineService, type Logger, type ServiceHandle } from "@dispatch/kernel"; +import type { QueuedMessage } from "@dispatch/wire"; +import type { MessageQueueState, QueueDeps } from "./pure.js"; +import { drain as drainQueue, enqueue as enqueueMessage, getQueue as readQueue } from "./pure.js"; + +/** + * The message-queue service interface. Obtained via + * `host.getService(messageQueueHandle)`. + */ +export interface MessageQueueService { + /** Append a message; return the CURRENT queue snapshot (post-append). */ + enqueue(conversationId: string, text: string): QueuedMessage[]; + /** Current queue snapshot (empty array if none / unknown conversation). */ + getQueue(conversationId: string): QueuedMessage[]; + /** + * Drain: return all queued messages, CLEAR the queue, and push a surface + * update (empty). Returns the raw `QueuedMessage[]` (NOT a ChatMessage — + * the caller combines + builds the ChatMessage). Empty array if the queue + * was empty (and then NO surface update is pushed — no change). + */ + drain(conversationId: string): QueuedMessage[]; +} + +/** + * Typed handle anchoring the message-queue service. The single symbol the + * session-orchestrator imports to reach the queue — no string-keyed lookup. + */ +export const messageQueueHandle: ServiceHandle<MessageQueueService> = + defineService<MessageQueueService>("message-queue"); + +/** Deps for the service shell — the pure core's deps plus the surface notifier. */ +export interface MessageQueueDeps extends QueueDeps { + /** + * Notify the surface-registry that the queue changed, so the transport + * re-fetches + pushes a full new SurfaceUpdate. Called ONLY on a real + * change (enqueue → grew; drain-non-empty → emptied); a no-op drain of an + * already-empty queue does NOT notify (no change → no surface push). + */ + readonly notify: () => void; + /** Injected host logger (optional in tests). Logs counts/lengths at debug, never bodies. */ + readonly logger?: Logger; +} + +/** + * Create a MessageQueueService backed by an in-memory per-conversation Map. + * Pure decision logic (enqueue/getQueue/drain) lives in ./pure.js; this shell + * owns the state + the notify edge. State is owned (not ambient): the Map lives + * in this closure, reachable only through the returned service. + */ +export function createMessageQueueService(deps: MessageQueueDeps): MessageQueueService { + const state: MessageQueueState = new Map(); + + return { + enqueue(conversationId, text) { + const snapshot = enqueueMessage(state, conversationId, text, deps); + deps.logger?.debug("message-queue: enqueued", { + conversationId, + queueLen: snapshot.length, + textLen: text.length, + }); + deps.notify(); + return snapshot; + }, + getQueue(conversationId) { + return readQueue(state, conversationId); + }, + drain(conversationId) { + const drained = drainQueue(state, conversationId); + if (drained.length === 0) return drained; + deps.logger?.debug("message-queue: drained", { + conversationId, + count: drained.length, + }); + deps.notify(); + return drained; + }, + }; +} diff --git a/packages/message-queue/tsconfig.json b/packages/message-queue/tsconfig.json new file mode 100644 index 0000000..da5dfb7 --- /dev/null +++ b/packages/message-queue/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [ + { "path": "../kernel" }, + { "path": "../surface-registry" }, + { "path": "../ui-contract" }, + { "path": "../wire" } + ] +} |
