summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--GLOSSARY.md3
-rw-r--r--bun.lock28
-rw-r--r--frontend-message-queue-handoff.md189
-rw-r--r--packages/host-bin/package.json1
-rw-r--r--packages/host-bin/src/main.ts2
-rw-r--r--packages/host-bin/tsconfig.json1
-rw-r--r--packages/kernel/src/contracts/events.ts1
-rw-r--r--packages/kernel/src/contracts/index.ts1
-rw-r--r--packages/kernel/src/contracts/runtime.ts17
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts246
-rw-r--r--packages/kernel/src/runtime/run-turn.ts13
-rw-r--r--packages/message-queue/package.json14
-rw-r--r--packages/message-queue/src/extension.ts82
-rw-r--r--packages/message-queue/src/index.ts27
-rw-r--r--packages/message-queue/src/pure.test.ts144
-rw-r--r--packages/message-queue/src/pure.ts105
-rw-r--r--packages/message-queue/src/service.test.ts101
-rw-r--r--packages/message-queue/src/service.ts88
-rw-r--r--packages/message-queue/tsconfig.json11
-rw-r--r--packages/session-orchestrator/package.json3
-rw-r--r--packages/session-orchestrator/src/extension.ts10
-rw-r--r--packages/session-orchestrator/src/index.ts2
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts110
-rw-r--r--packages/session-orchestrator/src/queue.test.ts497
-rw-r--r--packages/session-orchestrator/tsconfig.json3
-rw-r--r--packages/transport-contract/package.json2
-rw-r--r--packages/transport-contract/src/index.ts64
-rw-r--r--packages/transport-http/src/app.test.ts278
-rw-r--r--packages/transport-http/src/app.ts36
-rw-r--r--packages/transport-http/src/extension.ts1
-rw-r--r--packages/transport-http/src/index.ts2
-rw-r--r--packages/transport-http/src/logic.test.ts58
-rw-r--r--packages/transport-http/src/logic.ts35
-rw-r--r--packages/transport-http/src/server.bun.test.ts3
-rw-r--r--packages/transport-ws/src/extension.ts22
-rw-r--r--packages/transport-ws/src/index.ts1
-rw-r--r--packages/transport-ws/src/router.test.ts111
-rw-r--r--packages/transport-ws/src/router.ts40
-rw-r--r--packages/transport-ws/src/server.bun.test.ts179
-rw-r--r--packages/wire/package.json2
-rw-r--r--packages/wire/src/index.ts58
-rw-r--r--tasks.md78
-rw-r--r--tsconfig.json1
43 files changed, 2636 insertions, 34 deletions
diff --git a/GLOSSARY.md b/GLOSSARY.md
index 61a555d..a1ef33c 100644
--- a/GLOSSARY.md
+++ b/GLOSSARY.md
@@ -34,6 +34,9 @@
| **hook** | A typed extension point. **event** = fire-and-forget, N listeners, error-isolated. **filter** = ordered value-in→value-out chain, in-band. | callback (when meaning a hook), listener |
| **service** | A single-responder request/response capability fetched via a typed handle. NOT a hook. | — |
| **dispatch policy** | `{ maxConcurrent, eager }` controlling how the turn loop runs a step's tool calls. | — |
+| **message queue** | The per-conversation buffer of user messages a client (FE or CLI) enqueues while a turn is GENERATING, awaiting mid-turn steering delivery. Owned by the `message-queue` extension; exposed to the frontend as a per-conversation `custom` surface (`rendererId: "message-queue"`, `QueuePayload`). Enqueuing when no turn is active starts a new turn instead. | steering queue, pending queue |
+| **steering** | A user message injected into an in-flight turn at the tool-result boundary (drawn from the message queue), so the model sees it alongside the tool results and can adjust course mid-turn. Emitted on the chat stream as a `steering` `AgentEvent`. If the turn ENDS with a non-empty queue (no tool call fired), the queue is instead carried into a NEW turn as its opening prompt. | interjection, mid-turn input |
+| **queued message** | An item in the message queue: `{ id, text, queuedAt }`. The unit the frontend renders in the queue surface. | pending message |
| **reconcile** | The pure function run on load that repairs a partial/interrupted turn into a valid history. | recover, repair |
| **session-orchestrator** | The core extension that drives a turn: load history → resolve provider/tools → call `runTurn` → persist. | — |
| **conversation-store** | The core extension persisting the append-only turn/chunk log. | message store |
diff --git a/bun.lock b/bun.lock
index f98aa26..5716c6a 100644
--- a/bun.lock
+++ b/bun.lock
@@ -97,6 +97,16 @@
"@dispatch/kernel": "workspace:*",
},
},
+ "packages/message-queue": {
+ "name": "@dispatch/message-queue",
+ "version": "0.0.0",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ "@dispatch/surface-registry": "workspace:*",
+ "@dispatch/ui-contract": "workspace:*",
+ "@dispatch/wire": "workspace:*",
+ },
+ },
"packages/observability-collector": {
"name": "@dispatch/observability-collector",
"version": "0.0.0",
@@ -120,6 +130,7 @@
"@dispatch/conversation-store": "workspace:*",
"@dispatch/credential-store": "workspace:*",
"@dispatch/kernel": "workspace:*",
+ "@dispatch/message-queue": "workspace:*",
},
},
"packages/skills": {
@@ -182,6 +193,13 @@
"@dispatch/kernel": "workspace:*",
},
},
+ "packages/tool-web-search": {
+ "name": "@dispatch/tool-web-search",
+ "version": "0.0.0",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ },
+ },
"packages/tool-write-file": {
"name": "@dispatch/tool-write-file",
"version": "0.0.0",
@@ -202,7 +220,7 @@
},
"packages/transport-contract": {
"name": "@dispatch/transport-contract",
- "version": "0.5.0",
+ "version": "0.12.0",
"dependencies": {
"@dispatch/ui-contract": "workspace:*",
"@dispatch/wire": "workspace:*",
@@ -235,11 +253,11 @@
},
"packages/ui-contract": {
"name": "@dispatch/ui-contract",
- "version": "0.1.0",
+ "version": "0.2.0",
},
"packages/wire": {
"name": "@dispatch/wire",
- "version": "0.4.0",
+ "version": "0.8.0",
},
},
"packages": {
@@ -279,6 +297,8 @@
"@dispatch/lsp": ["@dispatch/lsp@workspace:packages/lsp"],
+ "@dispatch/message-queue": ["@dispatch/message-queue@workspace:packages/message-queue"],
+
"@dispatch/observability-collector": ["@dispatch/observability-collector@workspace:packages/observability-collector"],
"@dispatch/provider-openai-compat": ["@dispatch/provider-openai-compat@workspace:packages/provider-openai-compat"],
@@ -301,6 +321,8 @@
"@dispatch/tool-shell": ["@dispatch/tool-shell@workspace:packages/tool-shell"],
+ "@dispatch/tool-web-search": ["@dispatch/tool-web-search@workspace:packages/tool-web-search"],
+
"@dispatch/tool-write-file": ["@dispatch/tool-write-file@workspace:packages/tool-write-file"],
"@dispatch/trace-replay": ["@dispatch/trace-replay@workspace:packages/trace-replay"],
diff --git a/frontend-message-queue-handoff.md b/frontend-message-queue-handoff.md
new file mode 100644
index 0000000..2d55220
--- /dev/null
+++ b/frontend-message-queue-handoff.md
@@ -0,0 +1,189 @@
+# FE handoff — message queue + steering injection
+
+Courier this to `../dispatch-web` (cross-repo contract change; `lsp references` does
+not span repos — ORCHESTRATOR §7). All changes are ADDITIVE — nothing existing breaks.
+
+## What shipped (backend)
+
+A per-conversation **message queue** + **steering** feature. While a turn is
+GENERATING, a client can enqueue a user message onto the conversation's queue;
+it is delivered mid-turn as **steering** — injected at the next tool-result
+boundary so the model sees it alongside the tool results and can adjust course.
+If the turn ends with a non-empty queue (no tool call fired), the queue is
+carried into a NEW turn as its opening prompt.
+
+- **`message queue`** — the per-conversation buffer (owned by a new
+ `@dispatch/message-queue` extension). Transient + in-memory; the queue is
+ NOT on the chat stream — it is exposed to the frontend as a per-conversation
+ SURFACE (see below).
+- **`steering`** — a user message injected into an in-flight turn at the
+ tool-result boundary (drawn from the queue). Emitted on the chat stream as a
+ new `steering` `AgentEvent` so it appears in the transcript live.
+
+Versions: `@dispatch/wire` `0.7.0 → 0.8.0`, `@dispatch/transport-contract`
+`0.11.0 → 0.12.0`. Bump the pinned `file:` deps. (`@dispatch/ui-contract` is
+unchanged — the queue uses the existing `custom` surface field kind.)
+
+## Wire types (in `@dispatch/wire`, re-exported by `@dispatch/transport-contract`)
+
+```ts
+/** A message held in the conversation's queue, awaiting steering delivery. */
+interface QueuedMessage {
+ readonly id: string; // stable, client-visible (UI key + dedup)
+ readonly text: string;
+ readonly queuedAt: number; // epoch-ms
+}
+
+/** Payload of the message-queue surface's `custom` field (see below). */
+interface QueuePayload {
+ readonly messages: readonly QueuedMessage[];
+}
+
+/** New `AgentEvent` variant (additive to the union). */
+interface TurnSteeringEvent {
+ readonly type: "steering";
+ readonly conversationId: string;
+ readonly turnId: string;
+ readonly text: string; // the combined text of all drained messages
+}
+```
+
+## How the frontend reads queue STATE: a surface (NOT the chat stream)
+
+The queue is control/state, so it rides the **surface** channel (like
+cache-warming), not the chat event stream. The `message-queue` extension
+contributes a per-conversation surface:
+
+- **Surface id:** `"message-queue"`; **scope:** `"conversation"` (subscribe with
+ the `conversationId`).
+- **One `custom` field**, `rendererId: "message-queue"`, `payload: QueuePayload`
+ (`{ messages: QueuedMessage[] }` — the current queue snapshot).
+- The surface updates (full new spec) on every change: enqueue (queue grew) and
+ drain (queue emptied). An idle conversation's queue is empty → the field's
+ `messages` is `[]`.
+
+So: **subscribe** to the `message-queue` surface per conversation and render
+the queue list from `payload.messages`. You need a bespoke renderer for
+`rendererId: "message-queue"` (the `custom` escape hatch — see the loaded-
+extensions `table` renderer precedent). The surface is **read-only** (no
+`invoke` actions); enqueuing is a chat op (below).
+
+## How the frontend ENQUEUES: the `chat.queue` WS op
+
+```ts
+interface ChatQueueMessage {
+ readonly type: "chat.queue";
+ readonly conversationId: string;
+ readonly text: string;
+}
+```
+(additive to `WsClientMessage`.)
+
+- **Fire-and-forget.** On success the server emits NOTHING back — the
+ `message-queue` SURFACE updates (the new message appears in the snapshot).
+ On failure (empty/missing `text`, unknown conversation) the server replies
+ `chat.error` (`{ type: "chat.error"; conversationId?; message }`).
+- **`text` must be non-empty** after trim (the server 400/errors otherwise).
+- **Auto-start when idle (server-owned decision):** if NO turn is active for the
+ conversation, `chat.queue` does NOT queue — it STARTS A NEW TURN with the
+ message as its opening prompt (equivalent to `chat.send`). The sender is
+ auto-subscribed and the turn's events stream as `chat.delta`s (the opening
+ `user-message` carries the text). So a single `chat.queue` op works for both
+ "steer during generation" and "send" — you don't need to pick. When a turn IS
+ active, the message is appended to the queue (surface updates) and delivered
+ at the next tool-result boundary.
+
+## How the frontend shows steering in the TRANSCRIPT: the `steering` event
+
+When the kernel drains a non-empty queue at a tool-result boundary, the
+session-orchestrator emits a **`steering`** `AgentEvent` on the chat stream
+(arrives inside a `chat.delta` `{ event }`, like every other `AgentEvent`):
+
+```ts
+{ type: "chat.delta", event: { type: "steering", conversationId, turnId, text } }
+```
+
+- Render `text` as a **user bubble in the transcript**, positioned after the
+ tool-call/tool-result it followed (it is a user message the model saw mid-turn,
+ alongside the tool results). One `steering` event per drain; `text` is the
+ combined text of all messages drained at that boundary (joined by a blank
+ line).
+- **Move, don't duplicate:** the drained messages were already shown in the
+ queue surface; when the surface then updates to empty (the drain cleared the
+ queue), they should leave the queue UI (they now live in the transcript as the
+ `steering` bubble). A simple rule: on `steering`, append the bubble to the
+ transcript; the surface's subsequent empty snapshot clears the queue UI.
+- **Late-join safe:** like `user-message`, `steering` is buffered into the
+ in-flight turn's event buffer, so a client that subscribes mid-turn (or a
+ second device) sees it before seal (mirrors the CR-3 `user-message` fix).
+ (Carry-to-new-turn, below, does NOT emit `steering` — the new turn's
+ `user-message` covers it.)
+
+## Carry to a new turn (no `steering` event)
+
+If a turn ENDS with a non-empty queue (the model finished without making a tool
+call, so no tool-result boundary was hit), the orchestrator drains the queue,
+combines the messages, and **starts a NEW turn** whose opening prompt is the
+combined text. You will see: the old turn's `done` + `turn-sealed`, then a new
+`turn-start` + `user-message` carrying the combined text (rendered as the new
+turn's normal user bubble). The queue surface also clears (empty snapshot). No
+`steering` event in this case — handle the carried text as an ordinary new-turn
+user message.
+
+## HTTP path (for the CLI / non-WS clients; the FE uses the WS op above)
+
+`POST /conversations/:id/queue` with body `QueueRequest { text }` → `QueueResponse`:
+
+```ts
+interface QueueResponse {
+ readonly conversationId: string;
+ readonly startedTurn: boolean; // true = was idle, a new turn started
+ readonly queue: readonly QueuedMessage[]; // snapshot after the enqueue
+}
+```
+- Empty/whitespace `text` → HTTP 400 `{ error }`.
+- `startedTurn: true` means no turn was active and the enqueue started one (the
+ message is the turn's opening prompt, NOT a queued steering message).
+- `startedTurn: false` means a turn was active and the message was queued (the
+ `queue` snapshot includes it).
+
+## What we need the FE to do
+
+1. **Bump pinned deps:** `@dispatch/wire` → `0.8.0`, `@dispatch/transport-contract`
+ → `0.12.0`.
+2. **Queue UI (per conversation):** subscribe to the `message-queue` surface
+ (scope `conversation`) and render `payload.messages` (`QueuedMessage[]`) with a
+ `rendererId: "message-queue"` custom renderer — a list of pending messages
+ with their text (and maybe `queuedAt` as a timestamp). Empty `messages` =
+ nothing to show (hide the panel).
+3. **Enqueue affordance:** while a turn is generating, show an input that sends
+ `chat.queue { conversationId, text }` (NOT `chat.send` — `chat.queue` is the
+ steering entry; it auto-starts a turn if idle, so it's safe to offer it
+ whenever the user wants to add input). Trim/validate non-empty client-side
+ too; expect a `chat.error` on failure.
+4. **Steering bubble:** handle the new `steering` `AgentEvent` (type `"steering"`)
+ on the `chat.delta` stream → render `event.text` as a user bubble in the
+ transcript after the tool calls; clear the queue UI when the surface updates
+ to empty.
+5. **Carry:** no special handling — a carried queue surfaces as a normal new
+ turn (`turn-start` + `user-message`); just let the existing new-turn flow
+ render it. The queue surface clears automatically.
+
+## Notes / known gaps
+
+- **Live end-to-end (a real steering turn via a tool-calling model) is not yet
+ exercised** — the logic is unit/integration tested and the app boots clean with
+ the `message-queue` extension registered, but a live `chat.queue` → tool-call
+ → `steering` event flow against a real model has not been run. Worth a live
+ smoke once the FE wires it (or ask the backend to run one).
+- **Close-with-queued-messages (open product question):** if a client
+ `POST /conversations/:id/close` (explicit tab close) while the queue is
+ non-empty, the in-flight turn aborts and the carry currently STILL fires
+ (starting a new turn on the closed conversation). This may or may not be
+ desired (does closing discard pending steering, or honor it?). Backend flag
+ for a decision; if "discard on close" is wanted, the backend will gate the
+ carry on `finishReason !== "aborted"`. No FE action either way — just be aware
+ a closed conversation might briefly start a turn from a queued message.
+- **`steering` is additive** to the `AgentEvent` union — no exhaustive switches
+ broke on the backend (verified: `tsc -b` EXIT 0). If the FE has an exhaustive
+ switch on `AgentEvent`, add a `steering` case.
diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json
index 27a4a1b..d55b369 100644
--- a/packages/host-bin/package.json
+++ b/packages/host-bin/package.json
@@ -11,6 +11,7 @@
"@dispatch/cache-warming": "workspace:*",
"@dispatch/credential-store": "workspace:*",
"@dispatch/provider-openai-compat": "workspace:*",
+ "@dispatch/message-queue": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
"@dispatch/skills": "workspace:*",
"@dispatch/throughput-store": "workspace:*",
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index 420f12e..59ce47d 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -20,6 +20,7 @@ import {
type StorageNamespace,
} from "@dispatch/kernel";
import { extension as lspExt } from "@dispatch/lsp";
+import { extension as messageQueueExt } from "@dispatch/message-queue";
import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat";
import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator";
import { extension as skillsExt } from "@dispatch/skills";
@@ -73,6 +74,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [
toolShellExt,
toolWriteFileExt,
throughputStoreExt,
+ messageQueueExt,
sessionOrchestratorExt,
skillsExt,
cacheWarmingExt,
diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json
index efe3815..110f931 100644
--- a/packages/host-bin/tsconfig.json
+++ b/packages/host-bin/tsconfig.json
@@ -5,6 +5,7 @@
"references": [
{ "path": "../cache-warming" },
{ "path": "../kernel" },
+ { "path": "../message-queue" },
{ "path": "../storage-sqlite" },
{ "path": "../surface-loaded-extensions" },
{ "path": "../surface-registry" },
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts
index b1385a2..6c9652d 100644
--- a/packages/kernel/src/contracts/events.ts
+++ b/packages/kernel/src/contracts/events.ts
@@ -14,6 +14,7 @@ export type {
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
+ TurnSteeringEvent,
TurnStepCompleteEvent,
TurnTextDeltaEvent,
TurnToolCallEvent,
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index ffcbe76..4b1350b 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -38,6 +38,7 @@ export type {
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
+ TurnSteeringEvent,
TurnStepCompleteEvent,
TurnTextDeltaEvent,
TurnToolCallEvent,
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts
index b7fe23c..c449a68 100644
--- a/packages/kernel/src/contracts/runtime.ts
+++ b/packages/kernel/src/contracts/runtime.ts
@@ -100,6 +100,23 @@ export interface RunTurnInput {
* absent) — backward-compatible with callers that don't provide a clock.
*/
readonly now?: () => number;
+
+ /**
+ * Optional. Called by the runtime at the tool-result boundary — after a
+ * step whose tool calls have all executed, before the next step begins —
+ * to drain messages to inject alongside the tool results. Whatever it
+ * returns is appended as user-role messages to the next step's input, so
+ * a caller can inject mid-turn guidance the model sees with the tool
+ * results. When omitted or returning an empty array, no injection happens
+ * (the runtime is unchanged).
+ *
+ * Injected (not ambient) so the kernel stays pure: it owns no queue and
+ * names no feature — it just calls the callback and appends what it gets.
+ * Only invoked when a step PRODUCED tool calls (the tool-result boundary);
+ * a step that ends without tool calls does not drain (the caller decides
+ * what to do with any pending messages after the turn ends).
+ */
+ readonly drainSteering?: () => readonly ChatMessage[];
}
/**
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index dcaea7f..0d4c59d 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -5,7 +5,7 @@ import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.j
import type { ProviderContract, ProviderEvent } from "../contracts/provider.js";
import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js";
import { createLogger } from "../logging/logger.js";
-import { runTurn } from "./run-turn.js";
+import { MAX_STEPS, runTurn } from "./run-turn.js";
function delay(ms: number): Promise<void> {
return new Promise((resolve) => {
@@ -29,6 +29,28 @@ function createFakeProvider(script: ProviderEvent[][]): ProviderContract {
};
}
+function createCapturingProvider(script: ProviderEvent[][]): {
+ provider: ProviderContract;
+ capturedMessages: ChatMessage[][];
+} {
+ const capturedMessages: ChatMessage[][] = [];
+ let callIndex = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream(messages, _tools) {
+ capturedMessages.push([...messages]);
+ const events = script[callIndex] ?? [];
+ callIndex++;
+ return (async function* () {
+ for (const event of events) {
+ yield event;
+ }
+ })();
+ },
+ };
+ return { provider, capturedMessages };
+}
+
function createFakeTool(
name: string,
handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>,
@@ -2577,4 +2599,226 @@ describe("runTurn", () => {
}
});
});
+
+ describe("drainSteering", () => {
+ it("drainSteering called once at the tool-result boundary; returned messages appended to the next step's provider input (after tool results)", async () => {
+ let drainCallCount = 0;
+ const steeringMessage: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "steer!" }],
+ };
+
+ const { provider, capturedMessages } = createCapturingProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [steeringMessage];
+ },
+ });
+
+ expect(drainCallCount).toBe(1);
+ // The provider was called twice (tool-call step, then text step).
+ expect(capturedMessages).toHaveLength(2);
+ const secondStepMessages = capturedMessages[1] ?? [];
+ // user, assistant(tool-call), tool-result, steering(user) — in order,
+ // steering appended AFTER the tool results, before the next call.
+ expect(secondStepMessages).toHaveLength(4);
+ expect(secondStepMessages[0]?.role).toBe("user");
+ expect(secondStepMessages[1]?.role).toBe("assistant");
+ expect(secondStepMessages[2]?.role).toBe("tool");
+ expect(secondStepMessages[3]).toEqual(steeringMessage);
+ expect(secondStepMessages[3]?.role).toBe("user");
+ // Steering is fed to the next provider call, NOT surfaced in the
+ // turn result — the caller owns the steering messages' lifecycle.
+ expect(result.messages).toHaveLength(3);
+ });
+
+ it("drainSteering omitted → no injection; turn byte-identical to before", async () => {
+ const { provider, capturedMessages } = createCapturingProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ // drainSteering omitted — must be a strict no-op.
+ });
+
+ expect(capturedMessages).toHaveLength(2);
+ const secondStepMessages = capturedMessages[1] ?? [];
+ // user, assistant(tool-call), tool-result — NO steering injected.
+ expect(secondStepMessages).toHaveLength(3);
+ expect(secondStepMessages[0]?.role).toBe("user");
+ expect(secondStepMessages[1]?.role).toBe("assistant");
+ expect(secondStepMessages[2]?.role).toBe("tool");
+ expect(result.messages).toHaveLength(3);
+ });
+
+ it("drainSteering returns [] → no injection", async () => {
+ let drainCallCount = 0;
+ const { provider, capturedMessages } = createCapturingProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ // Called at the boundary, but returned nothing → no injection.
+ expect(drainCallCount).toBe(1);
+ expect(capturedMessages).toHaveLength(2);
+ const secondStepMessages = capturedMessages[1] ?? [];
+ expect(secondStepMessages).toHaveLength(3);
+ expect(secondStepMessages[2]?.role).toBe("tool");
+ });
+
+ it("drainSteering NOT called when a step has no tool calls (text-only turn)", async () => {
+ let drainCallCount = 0;
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hello" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ expect(drainCallCount).toBe(0);
+ });
+
+ it("multiple tool-call steps → drainSteering called once per tool-call step", async () => {
+ let drainCallCount = 0;
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "tool-call", toolCallId: "tc2", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ // Steps 0 and 1 each produced tool calls → drained once each.
+ // Step 2 (text-only) → no boundary → no drain. Total = 2.
+ expect(drainCallCount).toBe(2);
+ });
+
+ it("drainSteering NOT called when max-steps ends the turn after a tool-call step (no next step → no drain)", async () => {
+ let drainCallCount = 0;
+ // Every step produces a tool call → the turn runs to MAX_STEPS.
+ const script: ProviderEvent[][] = Array.from({ length: MAX_STEPS }, () => [
+ { type: "tool-call", toolCallId: "tc", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ]);
+ const provider = createFakeProvider(script);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ expect(result.finishReason).toBe("max-steps");
+ // MAX_STEPS tool-call steps (indices 0..MAX_STEPS-1). Drained on every
+ // step that is followed by a next step (0..MAX_STEPS-2 = MAX_STEPS-1
+ // calls); the final step is the max-steps boundary → no next step →
+ // no drain (queue left intact for the caller).
+ expect(drainCallCount).toBe(MAX_STEPS - 1);
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index bf57854..228ef8a 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -538,6 +538,19 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
if (step === MAX_STEPS - 1) {
finishReason = "max-steps";
+ // No next step → no tool-result boundary. Leave any pending
+ // steering messages for the caller (it owns the queue).
+ } else {
+ // Tool-result boundary: this step produced tool calls and we are
+ // about to call provider.stream again. Drain steering messages
+ // and append them after the tool results, before the next call.
+ // The kernel owns no queue and names no feature — it just calls
+ // the callback and appends. Emits nothing (caller emits the
+ // `steering` AgentEvent in its own wrapper).
+ const steering = input.drainSteering?.() ?? [];
+ for (const msg of steering) {
+ messages.push(msg);
+ }
}
}
} finally {
diff --git a/packages/message-queue/package.json b/packages/message-queue/package.json
new file mode 100644
index 0000000..a8fcc4b
--- /dev/null
+++ b/packages/message-queue/package.json
@@ -0,0 +1,14 @@
+{
+ "name": "@dispatch/message-queue",
+ "version": "0.0.0",
+ "type": "module",
+ "private": true,
+ "main": "dist/index.js",
+ "types": "dist/index.d.ts",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ "@dispatch/surface-registry": "workspace:*",
+ "@dispatch/ui-contract": "workspace:*",
+ "@dispatch/wire": "workspace:*"
+ }
+}
diff --git a/packages/message-queue/src/extension.ts b/packages/message-queue/src/extension.ts
new file mode 100644
index 0000000..26d19ca
--- /dev/null
+++ b/packages/message-queue/src/extension.ts
@@ -0,0 +1,82 @@
+/**
+ * message-queue extension — owns the per-conversation steering message queue
+ * (state + surface + drain-and-clear). Plugs into the kernel host via a
+ * manifest + activate(host); the session-orchestrator drains it via the
+ * `messageQueueHandle` service. Does NOT touch the turn loop.
+ */
+import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
+import type { SurfaceContext, SurfaceProvider } from "@dispatch/surface-registry";
+import { surfaceRegistryHandle } from "@dispatch/surface-registry";
+import type { SurfaceSpec } from "@dispatch/ui-contract";
+import { buildQueueSpec, MESSAGE_QUEUE_SURFACE_ID } from "./pure.js";
+import { createMessageQueueService, messageQueueHandle } from "./service.js";
+
+export const manifest: Manifest = {
+ id: "message-queue",
+ name: "Message Queue",
+ version: "0.0.0",
+ apiVersion: "^0.1.0",
+ trust: "bundled",
+ activation: "eager",
+ dependsOn: ["surface-registry"],
+ capabilities: {},
+ contributes: {
+ services: ["message-queue"],
+ },
+};
+
+export function activate(host: HostAPI): void {
+ const registry = host.getService(surfaceRegistryHandle);
+
+ const subscribers = new Set<() => void>();
+
+ const service = createMessageQueueService({
+ id: () => crypto.randomUUID(),
+ now: () => Date.now(),
+ notify: () => {
+ for (const sub of subscribers) {
+ sub();
+ }
+ },
+ logger: host.logger,
+ });
+
+ host.provideService(messageQueueHandle, service);
+
+ function getSpec(context?: SurfaceContext): SurfaceSpec {
+ const convId = context?.conversationId;
+ const messages = convId === undefined ? [] : service.getQueue(convId);
+ return buildQueueSpec(messages);
+ }
+
+ function invoke(_actionId: string, _payload?: unknown, _context?: SurfaceContext): void {
+ // The message-queue surface is read-only: a client renders the queue and
+ // the session-orchestrator drains it. No client-facing actions.
+ }
+
+ const provider: SurfaceProvider = {
+ catalogEntry: {
+ id: MESSAGE_QUEUE_SURFACE_ID,
+ region: "side",
+ title: "Message Queue",
+ scope: "conversation",
+ },
+ getSpec,
+ invoke,
+ subscribe(onChange) {
+ subscribers.add(onChange);
+ return () => {
+ subscribers.delete(onChange);
+ };
+ },
+ };
+
+ registry.register(provider);
+
+ host.logger.info("message-queue: registered");
+}
+
+export const extension: Extension = {
+ manifest,
+ activate,
+};
diff --git a/packages/message-queue/src/index.ts b/packages/message-queue/src/index.ts
new file mode 100644
index 0000000..79bbb25
--- /dev/null
+++ b/packages/message-queue/src/index.ts
@@ -0,0 +1,27 @@
+/**
+ * @dispatch/message-queue — the per-conversation message queue for mid-turn
+ * steering. Owns the queue state + a per-conversation `custom` surface; the
+ * session-orchestrator drains it via `messageQueueHandle`. The queue is
+ * transient (in-memory, per-conversation, only meaningful during generation);
+ * the surface is the ONLY way the frontend reads queue state.
+ */
+
+export type { QueuedMessage, QueuePayload } from "@dispatch/wire";
+export { extension, manifest } from "./extension.js";
+export {
+ buildQueueSpec,
+ combine,
+ drain,
+ enqueue,
+ getQueue,
+ MESSAGE_QUEUE_RENDERER_ID,
+ MESSAGE_QUEUE_SURFACE_ID,
+ type MessageQueueState,
+ type QueueDeps,
+} from "./pure.js";
+export {
+ createMessageQueueService,
+ type MessageQueueDeps,
+ type MessageQueueService,
+ messageQueueHandle,
+} from "./service.js";
diff --git a/packages/message-queue/src/pure.test.ts b/packages/message-queue/src/pure.test.ts
new file mode 100644
index 0000000..ff1a08f
--- /dev/null
+++ b/packages/message-queue/src/pure.test.ts
@@ -0,0 +1,144 @@
+import type { QueuedMessage } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import {
+ buildQueueSpec,
+ combine,
+ drain,
+ enqueue,
+ getQueue,
+ MESSAGE_QUEUE_RENDERER_ID,
+ MESSAGE_QUEUE_SURFACE_ID,
+ type MessageQueueState,
+ type QueueDeps,
+} from "./pure.js";
+
+function makeDeps(): QueueDeps {
+ let idCounter = 0;
+ let now = 1_000;
+ return {
+ id: () => `id-${++idCounter}`,
+ now: () => (now += 10),
+ };
+}
+
+describe("enqueue", () => {
+ it("enqueue appends + returns snapshot with the new message (id unique, queuedAt set)", () => {
+ const state: MessageQueueState = new Map();
+ const deps = makeDeps();
+
+ const first = enqueue(state, "c1", "first", deps);
+ expect(first).toHaveLength(1);
+ const firstMsg = first[0];
+ if (firstMsg === undefined) throw new Error("expected a message");
+ expect(firstMsg.id).toBe("id-1");
+ expect(firstMsg.text).toBe("first");
+ expect(firstMsg.queuedAt).toBe(1_010);
+
+ const second = enqueue(state, "c1", "second", deps);
+ expect(second).toHaveLength(2);
+ const secondMsg = second[1];
+ if (secondMsg === undefined) throw new Error("expected a message");
+ expect(secondMsg.id).toBe("id-2"); // unique per message
+ expect(secondMsg.text).toBe("second");
+ expect(secondMsg.queuedAt).toBe(1_020);
+
+ // a separate conversation does not share state
+ const other = enqueue(state, "c2", "other", deps);
+ expect(other).toHaveLength(1);
+ expect(getQueue(state, "c2")).toHaveLength(1);
+ });
+});
+
+describe("getQueue", () => {
+ it("getQueue returns current snapshot; empty array when none", () => {
+ const state: MessageQueueState = new Map();
+ const deps = makeDeps();
+
+ expect(getQueue(state, "unknown")).toEqual([]);
+
+ enqueue(state, "c1", "x", deps);
+ enqueue(state, "c1", "y", deps);
+ const snap = getQueue(state, "c1");
+ expect(snap.map((m) => m.text)).toEqual(["x", "y"]);
+
+ // returns a COPY — mutating it does not affect live state
+ snap.push({ id: "evil", text: "mutate", queuedAt: 0 });
+ expect(getQueue(state, "c1")).toHaveLength(2);
+ });
+});
+
+describe("drain", () => {
+ it("drain returns all + clears; second drain returns []", () => {
+ const state: MessageQueueState = new Map();
+ const deps = makeDeps();
+ enqueue(state, "c1", "a", deps);
+ enqueue(state, "c1", "b", deps);
+
+ const drained = drain(state, "c1");
+ expect(drained.map((m) => m.text)).toEqual(["a", "b"]);
+
+ // cleared
+ expect(getQueue(state, "c1")).toEqual([]);
+ expect(drain(state, "c1")).toEqual([]);
+ });
+
+ it("drain on an empty queue returns [] and is a no-op", () => {
+ const state: MessageQueueState = new Map();
+
+ // never had a queue
+ expect(drain(state, "c1")).toEqual([]);
+ expect(getQueue(state, "c1")).toEqual([]);
+
+ // and after a drain that emptied it, draining again is a no-op
+ const deps = makeDeps();
+ enqueue(state, "c1", "once", deps);
+ drain(state, "c1");
+ expect(drain(state, "c1")).toEqual([]);
+ expect(getQueue(state, "c1")).toEqual([]);
+ });
+});
+
+describe("combine", () => {
+ it("combine joins texts with blank-line separator", () => {
+ const msgs: QueuedMessage[] = [
+ { id: "1", text: "alpha", queuedAt: 1 },
+ { id: "2", text: "beta", queuedAt: 2 },
+ ];
+ expect(combine(msgs)).toBe("alpha\n\nbeta");
+ expect(combine([])).toBe("");
+ expect(combine([{ id: "1", text: "solo", queuedAt: 1 }])).toBe("solo");
+ });
+});
+
+describe("buildQueueSpec", () => {
+ it("renders a custom field with the queue snapshot", () => {
+ const msgs: QueuedMessage[] = [
+ { id: "1", text: "a", queuedAt: 1 },
+ { id: "2", text: "b", queuedAt: 2 },
+ ];
+ const spec = buildQueueSpec(msgs);
+ expect(spec.id).toBe(MESSAGE_QUEUE_SURFACE_ID);
+ expect(spec.region).toBe("side");
+ expect(spec.title).toBe("Message Queue");
+ expect(spec.fields).toHaveLength(1);
+
+ const field = spec.fields[0];
+ if (field === undefined || field.kind !== "custom") {
+ throw new Error("expected a custom field");
+ }
+ expect(field.rendererId).toBe(MESSAGE_QUEUE_RENDERER_ID);
+ const payload = field.payload as { messages: readonly QueuedMessage[] };
+ expect(payload.messages).toHaveLength(2);
+ expect(payload.messages.map((m) => m.text)).toEqual(["a", "b"]);
+ });
+
+ it("renders an empty list for an empty queue", () => {
+ const spec = buildQueueSpec([]);
+ const field = spec.fields[0];
+ if (field === undefined || field.kind !== "custom") {
+ throw new Error("expected a custom field");
+ }
+ const payload = field.payload as { messages: readonly QueuedMessage[] };
+ expect(payload.messages).toEqual([]);
+ });
+});
diff --git a/packages/message-queue/src/pure.ts b/packages/message-queue/src/pure.ts
new file mode 100644
index 0000000..834018b
--- /dev/null
+++ b/packages/message-queue/src/pure.ts
@@ -0,0 +1,105 @@
+/**
+ * Pure core for message-queue — zero I/O, zero ambient state.
+ *
+ * Every function is input → output; testable without mocks. State is a plain
+ * `Map<conversationId, QueuedMessage[]>` OWNED by the caller (the service
+ * shell); the pure functions mutate it in place and return snapshots (fresh
+ * array copies), so a caller can never reach into or mutate live state through
+ * a returned value. The id factory + clock are injected (`QueueDeps`) so tests
+ * are deterministic.
+ */
+
+import type { CustomField, SurfaceSpec } from "@dispatch/ui-contract";
+import type { QueuedMessage, QueuePayload } from "@dispatch/wire";
+
+/** The queue state: a per-conversation map of queued messages. */
+export type MessageQueueState = Map<string, QueuedMessage[]>;
+
+/** Injected effectful factories kept out of the pure core. */
+export interface QueueDeps {
+ /** Stable (client-visible) id factory for UI keying + dedup. */
+ readonly id: () => string;
+ /** Clock returning epoch-ms for `queuedAt`. */
+ readonly now: () => number;
+}
+
+/** Surface id this extension contributes (also the manifest + catalog id). */
+export const MESSAGE_QUEUE_SURFACE_ID = "message-queue";
+/** The custom renderer id a frontend switches on to render the queue. */
+export const MESSAGE_QUEUE_RENDERER_ID = "message-queue";
+
+/**
+ * Append a message to a conversation's queue. Mutates `state` and returns the
+ * CURRENT snapshot (post-append) — a fresh array copy, so callers cannot mutate
+ * live state through the returned value.
+ */
+export function enqueue(
+ state: MessageQueueState,
+ conversationId: string,
+ text: string,
+ deps: QueueDeps,
+): QueuedMessage[] {
+ const message: QueuedMessage = { id: deps.id(), text, queuedAt: deps.now() };
+ const existing = state.get(conversationId);
+ if (existing === undefined) {
+ state.set(conversationId, [message]);
+ } else {
+ existing.push(message);
+ }
+ return getQueue(state, conversationId);
+}
+
+/**
+ * Current queue snapshot for a conversation — a fresh array copy. Empty array
+ * if the conversation has no queue / is unknown.
+ */
+export function getQueue(state: MessageQueueState, conversationId: string): QueuedMessage[] {
+ const existing = state.get(conversationId);
+ if (existing === undefined) return [];
+ return [...existing];
+}
+
+/**
+ * Drain: return all queued messages for a conversation and CLEAR its queue.
+ * Returns a fresh array copy of the drained messages (empty array if the queue
+ * was empty or unknown). The caller (session-orchestrator) combines these into
+ * a steering ChatMessage; this returns the raw `QueuedMessage[]`, NOT a
+ * ChatMessage.
+ */
+export function drain(state: MessageQueueState, conversationId: string): QueuedMessage[] {
+ const existing = state.get(conversationId);
+ if (existing === undefined || existing.length === 0) return [];
+ const drained = [...existing];
+ state.delete(conversationId);
+ return drained;
+}
+
+/**
+ * Combine drained messages' texts into a single steering string, joined by a
+ * blank line (`\n\n`). Pure — the session-orchestrator builds the final
+ * ChatMessage from this.
+ */
+export function combine(messages: readonly QueuedMessage[]): string {
+ return messages.map((m) => m.text).join("\n\n");
+}
+
+/**
+ * Build the per-conversation surface spec: a single `custom` field whose
+ * payload is the current queue snapshot (`QueuePayload`). An empty `messages`
+ * array (idle conversation / post-drain) renders as an empty list. Pure — no
+ * I/O; the surface-registry re-fetches this on every notify.
+ */
+export function buildQueueSpec(messages: readonly QueuedMessage[]): SurfaceSpec {
+ const payload: QueuePayload = { messages };
+ const field: CustomField = {
+ kind: "custom",
+ rendererId: MESSAGE_QUEUE_RENDERER_ID,
+ payload,
+ };
+ return {
+ id: MESSAGE_QUEUE_SURFACE_ID,
+ region: "side",
+ title: "Message Queue",
+ fields: [field],
+ };
+}
diff --git a/packages/message-queue/src/service.test.ts b/packages/message-queue/src/service.test.ts
new file mode 100644
index 0000000..04cde91
--- /dev/null
+++ b/packages/message-queue/src/service.test.ts
@@ -0,0 +1,101 @@
+import type { SurfaceSpec } from "@dispatch/ui-contract";
+import type { QueuedMessage, QueuePayload } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import { buildQueueSpec, combine } from "./pure.js";
+import { createMessageQueueService, type MessageQueueDeps } from "./service.js";
+
+interface Recorder extends MessageQueueDeps {
+ readonly calls: { value: number };
+}
+
+function makeDeps(): Recorder {
+ let idCounter = 0;
+ let now = 1_000;
+ const calls = { value: 0 };
+ return {
+ id: () => `q-${++idCounter}`,
+ now: () => (now += 10),
+ notify: () => {
+ calls.value += 1;
+ },
+ calls,
+ };
+}
+
+function queueMessages(spec: SurfaceSpec): readonly QueuedMessage[] {
+ const field = spec.fields[0];
+ if (field === undefined || field.kind !== "custom") {
+ throw new Error("expected a custom field");
+ }
+ return (field.payload as QueuePayload).messages;
+}
+
+describe("message-queue service", () => {
+ it("enqueue pushes a surface update (snapshot grew)", () => {
+ const deps = makeDeps();
+ const svc = createMessageQueueService(deps);
+ expect(deps.calls.value).toBe(0);
+
+ const snapshot = svc.enqueue("c1", "hello");
+ expect(deps.calls.value).toBe(1); // surface update pushed
+ expect(snapshot).toHaveLength(1);
+ const first = snapshot[0];
+ if (first === undefined) throw new Error("expected a message");
+ expect(first.text).toBe("hello");
+ expect(typeof first.id).toBe("string");
+ expect(first.queuedAt).toBe(1_010);
+
+ // the surface spec reflects the grown snapshot
+ expect(queueMessages(buildQueueSpec(svc.getQueue("c1")))).toHaveLength(1);
+ });
+
+ it("drain pushes a surface update (empty)", () => {
+ const deps = makeDeps();
+ const svc = createMessageQueueService(deps);
+ svc.enqueue("c1", "a");
+ svc.enqueue("c1", "b");
+ expect(deps.calls.value).toBe(2); // two enqueues notified
+
+ const drained = svc.drain("c1");
+ expect(deps.calls.value).toBe(3); // drain pushed a surface update
+ expect(drained).toHaveLength(2);
+ expect(drained.map((m) => m.text)).toEqual(["a", "b"]);
+
+ // cleared + surface now shows empty
+ expect(svc.getQueue("c1")).toEqual([]);
+ expect(queueMessages(buildQueueSpec(svc.getQueue("c1")))).toEqual([]);
+ });
+
+ it("drain on empty does NOT push a surface update (already empty)", () => {
+ const deps = makeDeps();
+ const svc = createMessageQueueService(deps);
+
+ expect(svc.drain("c1")).toEqual([]);
+ expect(deps.calls.value).toBe(0); // no notify — no change
+
+ // after a real drain empties it, a further drain is a no-op (no notify)
+ svc.enqueue("c1", "x");
+ expect(deps.calls.value).toBe(1);
+ svc.drain("c1");
+ expect(deps.calls.value).toBe(2);
+ svc.drain("c1");
+ expect(deps.calls.value).toBe(2); // unchanged — queue was already empty
+ });
+
+ it("getQueue returns current snapshot; empty for unknown conversation", () => {
+ const deps = makeDeps();
+ const svc = createMessageQueueService(deps);
+ expect(svc.getQueue("nope")).toEqual([]);
+ svc.enqueue("c1", "hi");
+ expect(svc.getQueue("c1")).toHaveLength(1);
+ });
+
+ it("drain returns the raw QueuedMessage[] the orchestrator combines", () => {
+ const deps = makeDeps();
+ const svc = createMessageQueueService(deps);
+ svc.enqueue("c1", "alpha");
+ svc.enqueue("c1", "beta");
+ const drained = svc.drain("c1");
+ expect(combine(drained)).toBe("alpha\n\nbeta");
+ });
+});
diff --git a/packages/message-queue/src/service.ts b/packages/message-queue/src/service.ts
new file mode 100644
index 0000000..91d0fc5
--- /dev/null
+++ b/packages/message-queue/src/service.ts
@@ -0,0 +1,88 @@
+/**
+ * Message-queue service — the typed handle the session-orchestrator obtains
+ * (lazily) to enqueue / read / drain a conversation's steering queue.
+ *
+ * The queue is transient + per-conversation (in-memory, only meaningful during
+ * generation). `drain` returns + clears AND pushes a surface update (empty).
+ * The orchestrator owns the delivery (drain → combine → inject → carry-to-new-
+ * turn); this service owns only the queue state, the surface, and the
+ * drain-and-clear.
+ */
+import { defineService, type Logger, type ServiceHandle } from "@dispatch/kernel";
+import type { QueuedMessage } from "@dispatch/wire";
+import type { MessageQueueState, QueueDeps } from "./pure.js";
+import { drain as drainQueue, enqueue as enqueueMessage, getQueue as readQueue } from "./pure.js";
+
+/**
+ * The message-queue service interface. Obtained via
+ * `host.getService(messageQueueHandle)`.
+ */
+export interface MessageQueueService {
+ /** Append a message; return the CURRENT queue snapshot (post-append). */
+ enqueue(conversationId: string, text: string): QueuedMessage[];
+ /** Current queue snapshot (empty array if none / unknown conversation). */
+ getQueue(conversationId: string): QueuedMessage[];
+ /**
+ * Drain: return all queued messages, CLEAR the queue, and push a surface
+ * update (empty). Returns the raw `QueuedMessage[]` (NOT a ChatMessage —
+ * the caller combines + builds the ChatMessage). Empty array if the queue
+ * was empty (and then NO surface update is pushed — no change).
+ */
+ drain(conversationId: string): QueuedMessage[];
+}
+
+/**
+ * Typed handle anchoring the message-queue service. The single symbol the
+ * session-orchestrator imports to reach the queue — no string-keyed lookup.
+ */
+export const messageQueueHandle: ServiceHandle<MessageQueueService> =
+ defineService<MessageQueueService>("message-queue");
+
+/** Deps for the service shell — the pure core's deps plus the surface notifier. */
+export interface MessageQueueDeps extends QueueDeps {
+ /**
+ * Notify the surface-registry that the queue changed, so the transport
+ * re-fetches + pushes a full new SurfaceUpdate. Called ONLY on a real
+ * change (enqueue → grew; drain-non-empty → emptied); a no-op drain of an
+ * already-empty queue does NOT notify (no change → no surface push).
+ */
+ readonly notify: () => void;
+ /** Injected host logger (optional in tests). Logs counts/lengths at debug, never bodies. */
+ readonly logger?: Logger;
+}
+
+/**
+ * Create a MessageQueueService backed by an in-memory per-conversation Map.
+ * Pure decision logic (enqueue/getQueue/drain) lives in ./pure.js; this shell
+ * owns the state + the notify edge. State is owned (not ambient): the Map lives
+ * in this closure, reachable only through the returned service.
+ */
+export function createMessageQueueService(deps: MessageQueueDeps): MessageQueueService {
+ const state: MessageQueueState = new Map();
+
+ return {
+ enqueue(conversationId, text) {
+ const snapshot = enqueueMessage(state, conversationId, text, deps);
+ deps.logger?.debug("message-queue: enqueued", {
+ conversationId,
+ queueLen: snapshot.length,
+ textLen: text.length,
+ });
+ deps.notify();
+ return snapshot;
+ },
+ getQueue(conversationId) {
+ return readQueue(state, conversationId);
+ },
+ drain(conversationId) {
+ const drained = drainQueue(state, conversationId);
+ if (drained.length === 0) return drained;
+ deps.logger?.debug("message-queue: drained", {
+ conversationId,
+ count: drained.length,
+ });
+ deps.notify();
+ return drained;
+ },
+ };
+}
diff --git a/packages/message-queue/tsconfig.json b/packages/message-queue/tsconfig.json
new file mode 100644
index 0000000..da5dfb7
--- /dev/null
+++ b/packages/message-queue/tsconfig.json
@@ -0,0 +1,11 @@
+{
+ "extends": "../../tsconfig.base.json",
+ "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true },
+ "include": ["src/**/*.ts"],
+ "references": [
+ { "path": "../kernel" },
+ { "path": "../surface-registry" },
+ { "path": "../ui-contract" },
+ { "path": "../wire" }
+ ]
+}
diff --git a/packages/session-orchestrator/package.json b/packages/session-orchestrator/package.json
index 47ab1a2..8aa6c0d 100644
--- a/packages/session-orchestrator/package.json
+++ b/packages/session-orchestrator/package.json
@@ -8,6 +8,7 @@
"dependencies": {
"@dispatch/kernel": "workspace:*",
"@dispatch/conversation-store": "workspace:*",
- "@dispatch/credential-store": "workspace:*"
+ "@dispatch/credential-store": "workspace:*",
+ "@dispatch/message-queue": "workspace:*"
}
}
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index 781164a..6e56c2b 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -2,6 +2,7 @@ import { conversationStoreHandle } from "@dispatch/conversation-store";
import { credentialStoreHandle } from "@dispatch/credential-store";
import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
import { runTurn } from "@dispatch/kernel";
+import { messageQueueHandle } from "@dispatch/message-queue";
import {
cacheWarmHandle,
createSessionOrchestrator,
@@ -49,6 +50,15 @@ export function activate(host: HostAPI): void {
logger: host.logger,
now: () => Date.now(),
emit: (hook, payload) => host.emit(hook, payload),
+ resolveQueue: () => {
+ // Lazily resolve the message-queue service. Returns undefined when the
+ // extension isn't loaded (feature degrades off) — checked via the
+ // activated-manifests list so `host.getService` is only called when the
+ // service is registered. Lazy so activation order with message-queue
+ // doesn't matter; called per-turn / per-enqueue, not at activate time.
+ const loaded = host.getExtensions().some((m) => m.id === "message-queue");
+ return loaded ? host.getService(messageQueueHandle) : undefined;
+ },
});
host.provideService(sessionOrchestratorHandle, orchestrator);
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index 711fa5a..afec2b4 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -5,6 +5,8 @@ export {
conversationClosed,
createSessionOrchestrator,
createWarmService,
+ type EnqueueInput,
+ type EnqueueResult,
type SessionOrchestrator,
type SessionOrchestratorBundle,
type SessionOrchestratorDeps,
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 5b2f264..3a74c2d 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -15,6 +15,7 @@ import type {
UsageEvent,
} from "@dispatch/kernel";
import { defineEventHook, defineService, type ServiceHandle } from "@dispatch/kernel";
+import type { MessageQueueService, QueuedMessage } from "@dispatch/message-queue";
import { createMetricsAccumulator } from "./metrics.js";
import {
buildUserMessage,
@@ -38,6 +39,25 @@ export type StartTurnResult =
| { readonly started: true; readonly turnId: string }
| { readonly started: false; readonly reason: "already-active" };
+/** Input to `SessionOrchestrator.enqueue` — the single entry transports call. */
+export interface EnqueueInput {
+ readonly conversationId: string;
+ readonly text: string;
+}
+
+/**
+ * Result of `SessionOrchestrator.enqueue`. When `startedTurn` is true the
+ * conversation was idle and a turn was started (the message is the opening
+ * prompt — nothing queued). When false the conversation was active: the message
+ * was enqueued onto the steering queue and `queue` is the post-enqueue snapshot
+ * (empty when the message-queue extension isn't loaded — degraded: the message
+ * is dropped, see `enqueue` docs).
+ */
+export interface EnqueueResult {
+ readonly startedTurn: boolean;
+ readonly queue: readonly QueuedMessage[];
+}
+
export type TurnEventListener = (event: AgentEvent) => void;
interface ActiveTurn {
@@ -109,6 +129,16 @@ export const cacheWarmHandle: ServiceHandle<WarmService> = defineService<WarmSer
export interface SessionOrchestrator {
startTurn(input: StartTurnInput): StartTurnResult;
+ /**
+ * The single entry transports call to deliver a user message. Owns the
+ * idle→startTurn vs active→queue decision (no separate `isActive` race —
+ * `startTurn`'s single-flight guard is authoritative). When the conversation
+ * is idle, starts a turn (the message is the opening prompt). When active,
+ * enqueues onto the steering queue (if the message-queue extension is
+ * loaded); with no queue extension loaded the message is dropped and the
+ * returned snapshot is empty (degraded — feature off).
+ */
+ enqueue(input: EnqueueInput): EnqueueResult;
subscribe(conversationId: string, listener: TurnEventListener): () => void;
isActive(conversationId: string): boolean;
/**
@@ -143,6 +173,16 @@ export interface SessionOrchestratorDeps {
modelName: string,
) => { provider: ProviderContract; model: string } | undefined;
readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>;
+ /**
+ * Lazily resolves the message-queue service (the steering queue), or
+ * `undefined` when the message-queue extension isn't loaded (the feature
+ * degrades off: no `drainSteering`, no post-seal carry, `enqueue` drops
+ * messages when active). host-bin wires this via `host.getService`; the
+ * orchestrator calls it per-turn / per-enqueue so activation order with the
+ * message-queue extension doesn't matter. Injected (not ambient) so a turn
+ * stays reproducible from its inputs and tests use a fake queue.
+ */
+ readonly resolveQueue?: () => MessageQueueService | undefined;
/** Apply the per-turn tools filter chain. Injected for testability. */
readonly applyToolsFilter: (assembly: ToolAssembly) => Promise<ToolAssembly>;
/** Base logger (auto-scoped to this extension); childed per turn for span capture. */
@@ -184,6 +224,25 @@ export function createSessionOrchestrator(
}
}
+ /**
+ * Post-seal carry: if a steering queue is available and non-empty, drain it,
+ * combine, and start a NEW detached turn whose opening `user-message` carries
+ * the combined text (no `steering` event — that's only for mid-turn drain).
+ * Returns true iff a new turn was started. Called from `runTurnDetached`'s
+ * finally AFTER `activeTurns.delete` (so the new turn's single-flight guard
+ * passes) and BEFORE `activeConversations.delete` (skipped when carried, since
+ * the new turn re-adds it). May chain — the new turn's own finally re-checks.
+ */
+ function tryCarryQueue(conversationId: string): boolean {
+ const queue = deps.resolveQueue?.();
+ if (queue === undefined) return false;
+ if (queue.getQueue(conversationId).length === 0) return false;
+ const drained = queue.drain(conversationId);
+ const combined = drained.map((q) => q.text).join("\n\n");
+ const result = orchestrator.startTurn({ conversationId, text: combined });
+ return result.started;
+ }
+
function runTurnDetached(
conversationId: string,
text: string,
@@ -218,6 +277,7 @@ export function createSessionOrchestrator(
});
void (async () => {
+ let sealed = false;
try {
const [effectiveCwd, storedEffort] = await Promise.all([
effectiveCwdPromise,
@@ -273,6 +333,30 @@ export function createSessionOrchestrator(
...(modelOverride !== undefined ? { model: modelOverride } : {}),
};
+ // Resolve the steering queue once for this turn. When present, wire
+ // `drainSteering`: the kernel calls it at the tool-result boundary and
+ // appends whatever it returns as user-role messages alongside the tool
+ // results (mid-turn steering). The wrapper emits a `steering` AgentEvent
+ // into the hub (buffered for late-join like `user-message`) so a
+ // frontend can place a user bubble in the transcript live; the kernel
+ // only appends the returned messages — it does NOT emit the event.
+ const queue = deps.resolveQueue?.();
+ const drainSteering =
+ queue === undefined
+ ? undefined
+ : (): readonly ChatMessage[] => {
+ const queued = queue.drain(conversationId);
+ if (queued.length === 0) return [];
+ const steerText = queued.map((q) => q.text).join("\n\n");
+ emitToHub(conversationId, {
+ type: "steering",
+ conversationId,
+ turnId,
+ text: steerText,
+ });
+ return [{ role: "user", chunks: [{ type: "text", text: steerText }] }];
+ };
+
const opts: RunTurnInput = {
provider,
messages: [...history, userMsg],
@@ -286,6 +370,7 @@ export function createSessionOrchestrator(
...(turnLogger !== undefined ? { logger: turnLogger } : {}),
...(effectiveCwd !== undefined ? { cwd: effectiveCwd } : {}),
...(deps.now !== undefined ? { now: deps.now } : {}),
+ ...(drainSteering !== undefined ? { drainSteering } : {}),
};
const result = await deps.runTurn(opts);
@@ -297,6 +382,7 @@ export function createSessionOrchestrator(
await deps.conversationStore.appendMetrics(conversationId, turnMetrics);
emitToHub(conversationId, { type: "turn-sealed", conversationId, turnId });
+ sealed = true;
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
emitToHub(conversationId, {
@@ -307,7 +393,16 @@ export function createSessionOrchestrator(
});
} finally {
activeTurns.delete(conversationId);
- activeConversations.delete(conversationId);
+ // Post-seal carry: if the turn sealed with a non-empty steering queue
+ // (no tool call fired → drainSteering never drained it), start a NEW
+ // detached turn whose opening user-message carries the combined text.
+ // The new turn re-adds to activeTurns + activeConversations, so skip
+ // the activeConversations.delete when carried. May chain (user keeps
+ // steering) — each carried turn's own finally re-checks the queue.
+ const carried = sealed && tryCarryQueue(conversationId);
+ if (!carried) {
+ activeConversations.delete(conversationId);
+ }
void payloadPromise.then((payload) => {
deps.emit?.(turnSettled, payload);
});
@@ -326,6 +421,19 @@ export function createSessionOrchestrator(
return { started: true, turnId };
},
+ enqueue({ conversationId, text }) {
+ const result = orchestrator.startTurn({ conversationId, text });
+ if (result.started) {
+ return { startedTurn: true, queue: [] };
+ }
+ // Already active → enqueue onto the steering queue. When the
+ // message-queue extension isn't loaded this degrades: the message is
+ // dropped and the snapshot is empty (feature off).
+ const queue = deps.resolveQueue?.();
+ const snapshot = queue !== undefined ? queue.enqueue(conversationId, text) : [];
+ return { startedTurn: false, queue: snapshot };
+ },
+
subscribe(conversationId, listener) {
let listeners = subscribers.get(conversationId);
if (listeners === undefined) {
diff --git a/packages/session-orchestrator/src/queue.test.ts b/packages/session-orchestrator/src/queue.test.ts
new file mode 100644
index 0000000..c1f12da
--- /dev/null
+++ b/packages/session-orchestrator/src/queue.test.ts
@@ -0,0 +1,497 @@
+import type { ConversationStore } from "@dispatch/conversation-store";
+import type {
+ AgentEvent,
+ ChatMessage,
+ ProviderContract,
+ ProviderEvent,
+ ReasoningEffort,
+ RunTurnInput,
+ RunTurnResult,
+ StoredChunk,
+ ToolContract,
+ TurnMetrics,
+} from "@dispatch/kernel";
+import { runTurn } from "@dispatch/kernel";
+import { createMessageQueueService } from "@dispatch/message-queue";
+import { describe, expect, it } from "vitest";
+import { createSessionOrchestrator } from "./orchestrator.js";
+import type { ToolAssembly } from "./tools-filter.js";
+
+// --- Shared test helpers (duplicated from orchestrator.test.ts per isolation-over-dRY;
+// a shared test-helper module wired between test files is a coupling smell) ---
+
+function createInMemoryStore(): ConversationStore & {
+ readonly data: Map<string, ChatMessage[]>;
+ readonly metricsData: Map<string, TurnMetrics[]>;
+ readonly cwdData: Map<string, string>;
+ readonly effortData: Map<string, ReasoningEffort>;
+} {
+ const data = new Map<string, ChatMessage[]>();
+ const metricsData = new Map<string, TurnMetrics[]>();
+ const cwdData = new Map<string, string>();
+ const effortData = new Map<string, ReasoningEffort>();
+ return {
+ data,
+ metricsData,
+ cwdData,
+ effortData,
+ async append(conversationId, messages) {
+ const existing = data.get(conversationId) ?? [];
+ data.set(conversationId, [...existing, ...messages]);
+ },
+ async load(conversationId) {
+ return [...(data.get(conversationId) ?? [])];
+ },
+ async loadSince(conversationId, sinceSeq) {
+ const messages = data.get(conversationId) ?? [];
+ const result: StoredChunk[] = [];
+ let seq = 1;
+ for (const msg of messages) {
+ for (const chunk of msg.chunks) {
+ if (sinceSeq === undefined || seq > sinceSeq) {
+ result.push({ seq, role: msg.role, chunk });
+ }
+ seq++;
+ }
+ }
+ return result;
+ },
+ async appendMetrics(conversationId, metrics) {
+ const existing = metricsData.get(conversationId) ?? [];
+ metricsData.set(conversationId, [...existing, metrics]);
+ },
+ async loadMetrics(conversationId) {
+ return [...(metricsData.get(conversationId) ?? [])];
+ },
+ async getCwd(conversationId) {
+ return cwdData.get(conversationId) ?? null;
+ },
+ async setCwd(conversationId, cwd) {
+ cwdData.set(conversationId, cwd);
+ },
+ async getReasoningEffort(conversationId) {
+ return effortData.get(conversationId) ?? null;
+ },
+ async setReasoningEffort(conversationId, effort) {
+ effortData.set(conversationId, effort);
+ },
+ };
+}
+
+function identityApplyToolsFilter(assembly: ToolAssembly): Promise<ToolAssembly> {
+ return Promise.resolve(assembly);
+}
+
+function noTools(): readonly ToolContract[] {
+ return [];
+}
+
+function simpleProvider(): ProviderContract {
+ return {
+ id: "fake",
+ stream: async function* () {
+ yield { type: "text-delta", delta: "ok" } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ },
+ };
+}
+
+/**
+ * A capturing runTurn that simulates the kernel calling `drainSteering` at the
+ * tool-result boundary. It records the RunTurnInput (so the test can assert
+ * drainSteering was wired) and collects what drainSteering returned. NOT a mock
+ * of @dispatch/* — it's a plain fake of the outermost runTurn edge.
+ */
+function createDrainingCaptureRunTurn(): {
+ captured: RunTurnInput[];
+ drainedMessages: ChatMessage[];
+ wasDrainCalled: () => boolean;
+ runTurn: (input: RunTurnInput) => Promise<RunTurnResult>;
+} {
+ const captured: RunTurnInput[] = [];
+ const drainedMessages: ChatMessage[] = [];
+ let drainCalled = false;
+ return {
+ captured,
+ drainedMessages,
+ wasDrainCalled: () => drainCalled,
+ runTurn: async (input) => {
+ captured.push(input);
+ if (input.drainSteering !== undefined) {
+ drainCalled = true;
+ const drained = input.drainSteering();
+ drainedMessages.push(...drained);
+ }
+ return {
+ messages: [{ role: "assistant", chunks: [{ type: "text", text: "ok" }] }],
+ usage: { inputTokens: 1, outputTokens: 1 },
+ finishReason: "stop",
+ };
+ },
+ };
+}
+
+function waitForSealed(
+ orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"],
+ conversationId: string,
+): Promise<void> {
+ return new Promise((resolve) => {
+ const unsub = orchestrator.subscribe(conversationId, (e) => {
+ if (e.type === "turn-sealed") {
+ unsub();
+ resolve();
+ }
+ });
+ });
+}
+
+function waitForSealedCount(
+ orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"],
+ conversationId: string,
+ count: number,
+): Promise<void> {
+ return new Promise((resolve) => {
+ let seen = 0;
+ const unsub = orchestrator.subscribe(conversationId, (e) => {
+ if (e.type === "turn-sealed") {
+ seen++;
+ if (seen >= count) {
+ unsub();
+ resolve();
+ }
+ }
+ });
+ });
+}
+
+function isSteering(e: AgentEvent): e is Extract<AgentEvent, { type: "steering" }> {
+ return e.type === "steering";
+}
+
+function isUserMessage(e: AgentEvent): e is Extract<AgentEvent, { type: "user-message" }> {
+ return e.type === "user-message";
+}
+
+function createTestQueue() {
+ return createMessageQueueService({
+ id: () => `q-${Math.random().toString(36).slice(2, 8)}`,
+ now: () => 1000,
+ notify: () => {},
+ });
+}
+
+// --- drainSteering (mid-turn, at the tool-result boundary) ---
+
+describe("drainSteering", () => {
+ it("drainSteering drains the queue + emits a steering event + returns one combined user message", async () => {
+ const store = createInMemoryStore();
+ const queue = createTestQueue();
+ queue.enqueue("conv-drain", "first");
+ queue.enqueue("conv-drain", "second");
+
+ const { captured, drainedMessages, runTurn: captureRunTurn } = createDrainingCaptureRunTurn();
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => ({ id: "p", stream: async function* () {} }),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: captureRunTurn,
+ resolveQueue: () => queue,
+ });
+
+ const events: AgentEvent[] = [];
+ const unsub = orchestrator.subscribe("conv-drain", (e) => events.push(e));
+
+ orchestrator.startTurn({ conversationId: "conv-drain", text: "go" });
+ await waitForSealed(orchestrator, "conv-drain");
+ unsub();
+
+ // drainSteering was wired on the RunTurnInput
+ expect(captured).toHaveLength(1);
+ expect(captured[0]?.drainSteering).toBeDefined();
+ expect(typeof captured[0]?.drainSteering).toBe("function");
+
+ // The fake runTurn called drainSteering → returned one combined user message
+ expect(drainedMessages).toHaveLength(1);
+ const steerMsg = drainedMessages[0];
+ if (steerMsg === undefined) throw new Error("expected drained message");
+ expect(steerMsg.role).toBe("user");
+ expect(steerMsg.chunks).toHaveLength(1);
+ const chunk = steerMsg.chunks[0];
+ if (chunk === undefined) throw new Error("expected chunk");
+ expect(chunk.type).toBe("text");
+ if (chunk.type === "text") {
+ expect(chunk.text).toBe("first\n\nsecond");
+ }
+
+ // The queue was drained (cleared)
+ expect(queue.getQueue("conv-drain")).toHaveLength(0);
+
+ // A steering event was emitted into the hub with the combined text
+ const steering = events.find(isSteering);
+ expect(steering).toBeDefined();
+ expect(steering?.conversationId).toBe("conv-drain");
+ expect(steering?.text).toBe("first\n\nsecond");
+ expect(steering?.turnId).toMatch(/^turn-/);
+ });
+
+ it("drainSteering on an empty queue returns [] and emits nothing", async () => {
+ const store = createInMemoryStore();
+ const queue = createTestQueue();
+
+ const {
+ drainedMessages,
+ wasDrainCalled,
+ runTurn: captureRunTurn,
+ } = createDrainingCaptureRunTurn();
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => ({ id: "p", stream: async function* () {} }),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: captureRunTurn,
+ resolveQueue: () => queue,
+ });
+
+ const events: AgentEvent[] = [];
+ const unsub = orchestrator.subscribe("conv-empty", (e) => events.push(e));
+
+ orchestrator.startTurn({ conversationId: "conv-empty", text: "go" });
+ await waitForSealed(orchestrator, "conv-empty");
+ unsub();
+
+ // drainSteering was wired and called, but returned []
+ expect(wasDrainCalled()).toBe(true);
+ expect(drainedMessages).toHaveLength(0);
+
+ // No steering event was emitted
+ expect(events.filter(isSteering)).toHaveLength(0);
+ });
+
+ it("no queue ext (resolveQueue undefined) → drainSteering omitted; turn unchanged", async () => {
+ const store = createInMemoryStore();
+
+ const { captured, wasDrainCalled, runTurn: captureRunTurn } = createDrainingCaptureRunTurn();
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => ({ id: "p", stream: async function* () {} }),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: captureRunTurn,
+ // resolveQueue intentionally omitted — feature degrades off
+ });
+
+ const events: AgentEvent[] = [];
+ const unsub = orchestrator.subscribe("conv-noqueue", (e) => events.push(e));
+
+ orchestrator.startTurn({ conversationId: "conv-noqueue", text: "go" });
+ await waitForSealed(orchestrator, "conv-noqueue");
+ unsub();
+
+ // drainSteering is absent from the RunTurnInput (not undefined — omitted)
+ expect(captured).toHaveLength(1);
+ expect(captured[0]?.drainSteering).toBeUndefined();
+ expect(wasDrainCalled()).toBe(false);
+
+ // No steering event; turn sealed normally
+ expect(events.filter(isSteering)).toHaveLength(0);
+ expect(events.filter((e) => e.type === "turn-sealed")).toHaveLength(1);
+ });
+});
+
+// --- Post-seal carry (turn ended with a non-empty queue → new turn) ---
+
+describe("post-seal carry", () => {
+ it("post-seal: non-empty queue → a new turn starts with the combined message", async () => {
+ const store = createInMemoryStore();
+ const queue = createTestQueue();
+ queue.enqueue("conv-carry", "queued-a");
+ queue.enqueue("conv-carry", "queued-b");
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => simpleProvider(),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ resolveQueue: () => queue,
+ });
+
+ const events: AgentEvent[] = [];
+ const unsub = orchestrator.subscribe("conv-carry", (e) => events.push(e));
+
+ orchestrator.startTurn({ conversationId: "conv-carry", text: "original" });
+ // Wait for the original turn + the carried turn to both seal.
+ await waitForSealedCount(orchestrator, "conv-carry", 2);
+ unsub();
+
+ // Two user-message events: the original prompt + the carried combined text.
+ const userMessages = events.filter(isUserMessage);
+ expect(userMessages).toHaveLength(2);
+ expect(userMessages[0]?.text).toBe("original");
+ expect(userMessages[1]?.text).toBe("queued-a\n\nqueued-b");
+
+ // No steering event — the carry case emits user-message, not steering.
+ expect(events.filter(isSteering)).toHaveLength(0);
+
+ // The queue was drained by the carry.
+ expect(queue.getQueue("conv-carry")).toHaveLength(0);
+
+ // Both turns persisted (original + carry).
+ expect(store.data.get("conv-carry")?.length).toBeGreaterThanOrEqual(4);
+ });
+
+ it("post-seal: empty queue → no new turn", async () => {
+ const store = createInMemoryStore();
+ const queue = createTestQueue();
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => simpleProvider(),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ resolveQueue: () => queue,
+ });
+
+ const events: AgentEvent[] = [];
+ const unsub = orchestrator.subscribe("conv-no-carry", (e) => events.push(e));
+
+ orchestrator.startTurn({ conversationId: "conv-no-carry", text: "original" });
+ await waitForSealed(orchestrator, "conv-no-carry");
+ // Give the carry check a chance to run (it's in the finally, synchronous
+ // after turn-sealed, but await yields first).
+ await new Promise<void>((resolve) => setTimeout(resolve, 10));
+ unsub();
+
+ // Only one user-message (the original) — no carry turn.
+ expect(events.filter(isUserMessage)).toHaveLength(1);
+ expect(events.filter((e) => e.type === "turn-sealed")).toHaveLength(1);
+ });
+});
+
+// --- enqueue facade (the single entry transports call) ---
+
+describe("enqueue", () => {
+ it("enqueue when idle → starts a turn (startedTurn:true)", async () => {
+ const store = createInMemoryStore();
+ const queue = createTestQueue();
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => simpleProvider(),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ resolveQueue: () => queue,
+ });
+
+ const result = orchestrator.enqueue({ conversationId: "conv-idle", text: "hello" });
+ expect(result.startedTurn).toBe(true);
+ expect(result.queue).toHaveLength(0);
+
+ await waitForSealed(orchestrator, "conv-idle");
+
+ // The turn ran and persisted.
+ expect(store.data.get("conv-idle")).toBeDefined();
+ expect(store.data.get("conv-idle")?.length).toBeGreaterThanOrEqual(2);
+ });
+
+ it("enqueue when active → queues (startedTurn:false, snapshot with the message)", async () => {
+ const store = createInMemoryStore();
+ const queue = createTestQueue();
+
+ let resolveFirst: (() => void) | undefined;
+ const firstBlocker = new Promise<void>((resolve) => {
+ resolveFirst = resolve;
+ });
+ let callCount = 0;
+ const blockingFirstRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => {
+ callCount++;
+ if (callCount === 1) {
+ await firstBlocker;
+ }
+ return {
+ messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }],
+ usage: { inputTokens: 1, outputTokens: 1 },
+ finishReason: "stop",
+ };
+ };
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => simpleProvider(),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: blockingFirstRunTurn,
+ resolveQueue: () => queue,
+ });
+
+ // Start the original turn (it blocks in runTurn).
+ orchestrator.startTurn({ conversationId: "conv-active", text: "first" });
+ // Let the turn reach the blocked runTurn call.
+ await new Promise<void>((resolve) => setTimeout(resolve, 10));
+
+ // Enqueue while active.
+ const result = orchestrator.enqueue({ conversationId: "conv-active", text: "second" });
+ expect(result.startedTurn).toBe(false);
+ expect(result.queue).toHaveLength(1);
+ expect(result.queue[0]?.text).toBe("second");
+
+ // The queue holds the enqueued message.
+ expect(queue.getQueue("conv-active")).toHaveLength(1);
+
+ // Release the original turn → it seals → post-seal carry starts a new
+ // turn with the enqueued message. Subscribe before releasing to catch
+ // both turn-sealed events.
+ const sealed = waitForSealedCount(orchestrator, "conv-active", 2);
+ resolveFirst?.();
+ await sealed;
+ });
+
+ it("enqueue when active + no queue ext → startedTurn:false, empty queue (degraded)", async () => {
+ const store = createInMemoryStore();
+
+ let resolveFirst: (() => void) | undefined;
+ const firstBlocker = new Promise<void>((resolve) => {
+ resolveFirst = resolve;
+ });
+ let callCount = 0;
+ const blockingFirstRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => {
+ callCount++;
+ if (callCount === 1) {
+ await firstBlocker;
+ }
+ return {
+ messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }],
+ usage: { inputTokens: 1, outputTokens: 1 },
+ finishReason: "stop",
+ };
+ };
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => simpleProvider(),
+ resolveTools: noTools,
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: blockingFirstRunTurn,
+ // resolveQueue omitted — no queue extension loaded (degraded)
+ });
+
+ orchestrator.startTurn({ conversationId: "conv-degraded", text: "first" });
+ await new Promise<void>((resolve) => setTimeout(resolve, 10));
+
+ // Enqueue while active, but no queue ext → message dropped, empty snapshot.
+ const result = orchestrator.enqueue({ conversationId: "conv-degraded", text: "second" });
+ expect(result.startedTurn).toBe(false);
+ expect(result.queue).toHaveLength(0);
+
+ // Release the original turn; no carry (no queue ext).
+ const sealed = waitForSealed(orchestrator, "conv-degraded");
+ resolveFirst?.();
+ await sealed;
+ });
+});
diff --git a/packages/session-orchestrator/tsconfig.json b/packages/session-orchestrator/tsconfig.json
index 4401407..782c1c8 100644
--- a/packages/session-orchestrator/tsconfig.json
+++ b/packages/session-orchestrator/tsconfig.json
@@ -5,6 +5,7 @@
"references": [
{ "path": "../kernel" },
{ "path": "../conversation-store" },
- { "path": "../credential-store" }
+ { "path": "../credential-store" },
+ { "path": "../message-queue" }
]
}
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index 3a0a983..ec7ccd1 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/transport-contract",
- "version": "0.11.0",
+ "version": "0.12.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index e992f8b..e71feb7 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -20,10 +20,17 @@
*/
import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
-import type { AgentEvent, ReasoningEffort, StoredChunk, TurnMetrics } from "@dispatch/wire";
+import type {
+ AgentEvent,
+ QueuedMessage,
+ ReasoningEffort,
+ StoredChunk,
+ TurnMetrics,
+} from "@dispatch/wire";
export type {
AgentEvent,
+ QueuedMessage,
ReasoningEffort,
StepMetrics,
StoredChunk,
@@ -249,6 +256,41 @@ export interface CloseConversationResponse {
readonly abortedTurn: boolean;
}
+// ─── Message queue (steering) ─────────────────────────────────────────────────
+
+/**
+ * Request body for `POST /conversations/:id/queue` — enqueue a user message
+ * onto a conversation's message queue for mid-turn steering delivery.
+ *
+ * When a turn is ACTIVE for the conversation, the message is appended to the
+ * queue (the message-queue extension's per-conversation SURFACE updates) and
+ * delivered at the next tool-result boundary as a steering message the model
+ * sees alongside the tool results (a `steering` `AgentEvent` is emitted). When
+ * NO turn is active, enqueuing instead STARTS a new turn with the message as its
+ * opening prompt (equivalent to `POST /chat`) — so a fire-and-forget enqueue
+ * works regardless of generation state. The resolved queue + whether a turn was
+ * started are returned in `QueueResponse`.
+ *
+ * `text` must be non-empty (after trim) → HTTP 400 `{ error }` otherwise.
+ */
+export interface QueueRequest {
+ readonly text: string;
+}
+
+/**
+ * Response body for `POST /conversations/:id/queue` — the conversation's queue
+ * snapshot AFTER the enqueue, so a client renders the queue from this alone.
+ * `conversationId` echoes the path. `startedTurn` is true when no turn was
+ * active and the enqueue started a new turn (the message is now the turn's
+ * opening prompt, not a queued steering message); the turn's events stream on
+ * the chat channel as usual.
+ */
+export interface QueueResponse {
+ readonly conversationId: string;
+ readonly startedTurn: boolean;
+ readonly queue: readonly QueuedMessage[];
+}
+
// ─── Per-conversation LSP status ──────────────────────────────────────────────
/** The connection state of a single language server for a workspace. */
@@ -404,6 +446,23 @@ export interface ChatUnsubscribeMessage {
}
/**
+ * Client → server: enqueue a message onto a conversation's message queue while
+ * a turn is generating (steering). The WebSocket counterpart of the HTTP
+ * `POST /conversations/:id/queue` (`QueueRequest`). Fire-and-forget: success is
+ * confirmed by the message-queue SURFACE updating (the FE renders the queue
+ * from the surface, not from a reply here); a failure (malformed/empty text,
+ * unknown conversation) arrives as a `chat.error`. When no turn is active, the
+ * enqueue starts a new turn (the turn's events stream as `chat.delta`s), so a
+ * client reuses this op for both "queue while generating" and "send" (the
+ * latter being equivalent to `chat.send`).
+ */
+export interface ChatQueueMessage {
+ readonly type: "chat.queue";
+ readonly conversationId: string;
+ readonly text: string;
+}
+
+/**
* Every client → server WS message: surface ops (`@dispatch/ui-contract`) + chat
* ops. A server discriminates on `type`.
*/
@@ -411,7 +470,8 @@ export type WsClientMessage =
| SurfaceClientMessage
| ChatSendMessage
| ChatSubscribeMessage
- | ChatUnsubscribeMessage;
+ | ChatUnsubscribeMessage
+ | ChatQueueMessage;
/**
* Every server → client WS message: surface ops (`@dispatch/ui-contract`) + chat
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts
index 1f95dd8..49e240f 100644
--- a/packages/transport-http/src/app.test.ts
+++ b/packages/transport-http/src/app.test.ts
@@ -8,7 +8,11 @@ import type {
TurnMetrics,
} from "@dispatch/kernel";
import { createThroughputStore, dayKeyOf } from "@dispatch/throughput-store";
-import type { ThroughputResponse } from "@dispatch/transport-contract";
+import type {
+ QueuedMessage,
+ QueueResponse,
+ ThroughputResponse,
+} from "@dispatch/transport-contract";
import { describe, expect, it } from "vitest";
import { createApp } from "./app.js";
import type {
@@ -132,6 +136,9 @@ function createFakeOrchestrator(events: AgentEvent[]): SessionOrchestrator {
isActive() {
return false;
},
+ enqueue() {
+ return { startedTurn: false, queue: [] };
+ },
closeConversation() {
return { abortedTurn: false };
},
@@ -162,6 +169,9 @@ function createCapturingOrchestrator(): SessionOrchestrator & {
isActive() {
return false;
},
+ enqueue() {
+ return { startedTurn: false, queue: [] };
+ },
closeConversation() {
return { abortedTurn: false };
},
@@ -182,6 +192,9 @@ function createThrowingOrchestrator(error: Error): SessionOrchestrator {
isActive() {
return false;
},
+ enqueue() {
+ return { startedTurn: false, queue: [] };
+ },
closeConversation() {
return { abortedTurn: false };
},
@@ -1319,6 +1332,269 @@ describe("POST /conversations/:id/close", () => {
});
});
+describe("POST /conversations/:id/queue", () => {
+ it("with valid text → 200 + QueueResponse (startedTurn + queue)", async () => {
+ const queue: readonly QueuedMessage[] = [
+ { id: "q1", text: "queued-msg", queuedAt: 1700000000000 },
+ ];
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ enqueue() {
+ return { startedTurn: false, queue };
+ },
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: "hello" }),
+ });
+
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as QueueResponse;
+ expect(body.conversationId).toBe("conv1");
+ expect(body.startedTurn).toBe(false);
+ expect(body.queue).toEqual(queue);
+ });
+
+ it("with empty/whitespace text → 400 { error } and enqueue is never called", async () => {
+ let enqueueCalled = false;
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ enqueue() {
+ enqueueCalled = true;
+ return { startedTurn: false, queue: [] };
+ },
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: " " }),
+ });
+
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("text");
+ expect(enqueueCalled).toBe(false);
+ });
+
+ it("with missing text field → 400 { error } and enqueue is never called", async () => {
+ let enqueueCalled = false;
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ enqueue() {
+ enqueueCalled = true;
+ return { startedTurn: false, queue: [] };
+ },
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({}),
+ });
+
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("text");
+ expect(enqueueCalled).toBe(false);
+ });
+
+ it("enqueue returns startedTurn:true (was idle) → response echoes it", async () => {
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ enqueue() {
+ return { startedTurn: true, queue: [] };
+ },
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv-idle/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: "go" }),
+ });
+
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as QueueResponse;
+ expect(body.conversationId).toBe("conv-idle");
+ expect(body.startedTurn).toBe(true);
+ expect(body.queue).toEqual([]);
+ });
+
+ it("enqueue returns startedTurn:false (was active) → response carries the queue snapshot", async () => {
+ const queue: readonly QueuedMessage[] = [
+ { id: "q1", text: "second", queuedAt: 1700000000000 },
+ { id: "q2", text: "third", queuedAt: 1700000001000 },
+ ];
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ enqueue() {
+ return { startedTurn: false, queue };
+ },
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv-active/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: "steer" }),
+ });
+
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as QueueResponse;
+ expect(body.conversationId).toBe("conv-active");
+ expect(body.startedTurn).toBe(false);
+ expect(body.queue).toEqual(queue);
+ });
+
+ it("forwards the path conversationId and trimmed text to enqueue", async () => {
+ const calls: { conversationId: string; text: string }[] = [];
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ enqueue(input) {
+ calls.push(input);
+ return { startedTurn: false, queue: [] };
+ },
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv-1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: " hello world " }),
+ });
+
+ expect(res.status).toBe(200);
+ expect(calls).toHaveLength(1);
+ expect(calls[0]?.conversationId).toBe("conv-1");
+ expect(calls[0]?.text).toBe("hello world");
+ });
+
+ it("returns 400 for invalid JSON body", async () => {
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: "not json",
+ });
+
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("JSON");
+ });
+
+ it("returns 400 for a non-string text", async () => {
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+
+ const res = await app.request("/conversations/conv1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: 42 }),
+ });
+
+ expect(res.status).toBe(400);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("text");
+ });
+
+ it("logs an info line on success and never logs the enqueued text", async () => {
+ const logger = createFakeLogger();
+ const orchestrator: SessionOrchestrator = {
+ ...createFakeOrchestrator([]),
+ enqueue() {
+ return { startedTurn: true, queue: [] };
+ },
+ };
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator,
+ credentialStore: createFakeCredentialStore([]),
+ logger,
+ });
+
+ await app.request("/conversations/conv1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: "secret-ish user message" }),
+ });
+
+ const infoLogs = logger.records.filter((r) => r.level === "info");
+ expect(infoLogs).toHaveLength(1);
+ expect(infoLogs[0]?.msg).toBe("conversations: enqueued");
+ expect(infoLogs[0]?.attrs?.conversationId).toBe("conv1");
+ expect(infoLogs[0]?.attrs?.startedTurn).toBe(true);
+ expect(infoLogs[0]?.attrs?.queueLength).toBe(0);
+ // Restraint: the user's message text is never logged (mirrors POST /chat).
+ expect(JSON.stringify(logger.records)).not.toContain("secret-ish user message");
+ });
+
+ it("logs a warn on a malformed body (400)", async () => {
+ const logger = createFakeLogger();
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger,
+ });
+
+ await app.request("/conversations/conv1/queue", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({ text: "" }),
+ });
+
+ const warnLogs = logger.records.filter((r) => r.level === "warn");
+ expect(warnLogs.length).toBeGreaterThanOrEqual(1);
+ expect(warnLogs[0]?.msg).toBe("conversations/queue: validation failed");
+ });
+});
+
describe("GET /conversations/:id/cwd", () => {
it("returns null when unset", async () => {
const app = createApp({
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index 2788bf9..8cb85c9 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -7,6 +7,7 @@ import type {
LspServerInfo,
LspStatusResponse,
ModelsResponse,
+ QueueResponse,
ReasoningEffortResponse,
ThroughputResponse,
WarmResponse,
@@ -21,6 +22,7 @@ import {
isSinceSeqError,
isWindowParamError,
parseChatBody,
+ parseQueueBody,
parseReasoningEffortBody,
parseSinceSeq,
parseWarmBody,
@@ -370,6 +372,40 @@ export function createApp(opts: CreateServerOptions): Hono {
return c.json(body, 200);
});
+ app.post("/conversations/:id/queue", async (c) => {
+ const conversationId = c.req.param("id");
+
+ let body: unknown;
+ try {
+ body = await c.req.json();
+ } catch {
+ log.warn("conversations/queue: invalid JSON body");
+ return c.json({ error: "Invalid JSON body" }, 400);
+ }
+
+ const parsed = parseQueueBody(body);
+ if (isParseError(parsed)) {
+ log.warn("conversations/queue: validation failed", { reason: parsed.error });
+ return c.json({ error: parsed.error }, 400);
+ }
+
+ // `enqueue` is synchronous and owns the idle→startTurn vs active→queue
+ // decision (no separate `isActive` race) — it does not throw for an
+ // unknown/idle conversation, which instead starts a turn. Mirrors the
+ // direct sync call used by `POST /conversations/:id/close`.
+ const { startedTurn, queue } = opts.orchestrator.enqueue({
+ conversationId,
+ text: parsed.text,
+ });
+ log.info("conversations: enqueued", {
+ conversationId,
+ startedTurn,
+ queueLength: queue.length,
+ });
+ const response: QueueResponse = { conversationId, startedTurn, queue };
+ return c.json(response, 200);
+ });
+
app.get("/conversations/:id/cwd", async (c) => {
const conversationId = c.req.param("id");
try {
diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts
index ab23b65..3fcc473 100644
--- a/packages/transport-http/src/extension.ts
+++ b/packages/transport-http/src/extension.ts
@@ -31,6 +31,7 @@ export const manifest: Manifest = {
"/conversations/:id/close",
"/conversations/:id/cwd",
"/conversations/:id/lsp",
+ "/conversations/:id/queue",
"/conversations/:id/reasoning-effort",
"/health",
"/models",
diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts
index b231b7e..735dc38 100644
--- a/packages/transport-http/src/index.ts
+++ b/packages/transport-http/src/index.ts
@@ -5,6 +5,7 @@ export type {
ChatCommand,
ParseError,
ParseResult,
+ QueueBodyParsed,
SinceSeqResult,
WarmBodyParsed,
WindowParamResult,
@@ -17,6 +18,7 @@ export {
isValidReasoningEffort,
isWindowParamError,
parseChatBody,
+ parseQueueBody,
parseReasoningEffortBody,
parseSinceSeq,
parseWindowParam,
diff --git a/packages/transport-http/src/logic.test.ts b/packages/transport-http/src/logic.test.ts
index f91b38c..40a82fd 100644
--- a/packages/transport-http/src/logic.test.ts
+++ b/packages/transport-http/src/logic.test.ts
@@ -8,6 +8,7 @@ import {
isValidReasoningEffort,
isWindowParamError,
parseChatBody,
+ parseQueueBody,
parseReasoningEffortBody,
parseSinceSeq,
parseWindowParam,
@@ -368,3 +369,60 @@ describe("parseReasoningEffortBody", () => {
expect(isReasoningEffortParseError(parseReasoningEffortBody("string"))).toBe(true);
});
});
+
+describe("parseQueueBody", () => {
+ it("returns error for null body", () => {
+ const result = parseQueueBody(null);
+ expect(isParseError(result)).toBe(true);
+ if (isParseError(result)) {
+ expect(result.error).toContain("JSON object");
+ }
+ });
+
+ it("returns error for non-object body", () => {
+ const result = parseQueueBody("hello");
+ expect(isParseError(result)).toBe(true);
+ });
+
+ it("returns error when text is missing", () => {
+ const result = parseQueueBody({});
+ expect(isParseError(result)).toBe(true);
+ if (isParseError(result)) {
+ expect(result.error).toContain("text");
+ }
+ });
+
+ it("returns error when text is empty string", () => {
+ const result = parseQueueBody({ text: "" });
+ expect(isParseError(result)).toBe(true);
+ });
+
+ it("returns error when text is whitespace only", () => {
+ const result = parseQueueBody({ text: " " });
+ expect(isParseError(result)).toBe(true);
+ });
+
+ it("returns error when text is not a string", () => {
+ const result = parseQueueBody({ text: 42 });
+ expect(isParseError(result)).toBe(true);
+ if (isParseError(result)) {
+ expect(result.error).toContain("text");
+ }
+ });
+
+ it("returns the trimmed text for a valid body", () => {
+ const result = parseQueueBody({ text: "hello" });
+ expect(isParseError(result)).toBe(false);
+ if (!isParseError(result)) {
+ expect(result.text).toBe("hello");
+ }
+ });
+
+ it("trims text whitespace", () => {
+ const result = parseQueueBody({ text: " hello world " });
+ expect(isParseError(result)).toBe(false);
+ if (!isParseError(result)) {
+ expect(result.text).toBe("hello world");
+ }
+ });
+});
diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts
index e0adfeb..aa5394c 100644
--- a/packages/transport-http/src/logic.ts
+++ b/packages/transport-http/src/logic.ts
@@ -73,8 +73,8 @@ export function parseChatBody(body: unknown, generateId: () => string): ParseRes
return result;
}
-export function isParseError(result: ParseResult): result is ParseError {
- return "error" in result;
+export function isParseError<T>(result: T | ParseError): result is ParseError {
+ return typeof result === "object" && result !== null && "error" in result;
}
export function serializeEventLine(event: AgentEvent): string {
@@ -172,6 +172,37 @@ export function computeExpectedCacheRate(
return Math.round((cacheReadTokens / denom) * 100);
}
+/**
+ * Parsed body for `POST /conversations/:id/queue` (`QueueRequest`). Only the
+ * `text` field — `conversationId` comes from the path param, not the body, so it
+ * is deliberately NOT part of this parse result.
+ */
+export interface QueueBodyParsed {
+ readonly text: string;
+}
+
+/**
+ * Parse + validate a `POST /conversations/:id/queue` body (`QueueRequest`).
+ * `text` must be a non-empty string after trim — invalid/missing →
+ * {@link ParseError}. The TRIMMED text is returned (forwarded to
+ * `orchestrator.enqueue`), mirroring how `parseChatBody` forwards a trimmed
+ * `message`.
+ */
+export function parseQueueBody(body: unknown): QueueBodyParsed | ParseError {
+ if (body === null || typeof body !== "object") {
+ return { error: "Request body must be a JSON object" };
+ }
+
+ const obj = body as Record<string, unknown>;
+
+ const text = obj.text;
+ if (typeof text !== "string" || text.trim().length === 0) {
+ return { error: "Field 'text' is required and must be a non-empty string" };
+ }
+
+ return { text: text.trim() };
+}
+
export function parseReasoningEffortBody(body: unknown): ReasoningEffort | ParseError {
if (body === null || typeof body !== "object") {
return { error: "Request body must be a JSON object" };
diff --git a/packages/transport-http/src/server.bun.test.ts b/packages/transport-http/src/server.bun.test.ts
index 36b05a5..151ad24 100644
--- a/packages/transport-http/src/server.bun.test.ts
+++ b/packages/transport-http/src/server.bun.test.ts
@@ -68,6 +68,9 @@ function fakeOrchestrator(): SessionOrchestrator {
isActive() {
return false;
},
+ enqueue() {
+ return { startedTurn: false, queue: [] };
+ },
closeConversation() {
return { abortedTurn: false };
},
diff --git a/packages/transport-ws/src/extension.ts b/packages/transport-ws/src/extension.ts
index 332ebe0..7a4e707 100644
--- a/packages/transport-ws/src/extension.ts
+++ b/packages/transport-ws/src/extension.ts
@@ -255,6 +255,28 @@ export function createTransportWsExtension(): Extension {
break;
}
+ case "chat-queue": {
+ // Fire-and-forget: success is confirmed by the message-queue
+ // SURFACE updating (startedTurn:false) or by streaming
+ // chat.deltas (startedTurn:true), NOT by a reply here. On
+ // startedTurn:true the sender is auto-subscribed so the new
+ // turn's events stream to it (same as chat.send); on
+ // startedTurn:false (queued for steering) we emit NOTHING
+ // back and do not auto-subscribe.
+ const enqueueResult = orchestrator.enqueue({
+ conversationId: result.conversationId,
+ text: result.text,
+ });
+ if (enqueueResult.startedTurn) {
+ ensureChatSubscribed(ws, state, result.conversationId);
+ }
+ logger.info?.("transport-ws: chat.queue accepted", {
+ conversationId: result.conversationId,
+ startedTurn: enqueueResult.startedTurn,
+ });
+ break;
+ }
+
case "chat-error": {
logger.warn?.("transport-ws: malformed chat.send", {
reason: result.errorMessage,
diff --git a/packages/transport-ws/src/index.ts b/packages/transport-ws/src/index.ts
index baa6b0f..e0cc66b 100644
--- a/packages/transport-ws/src/index.ts
+++ b/packages/transport-ws/src/index.ts
@@ -1,6 +1,7 @@
export { createTransportWsExtension } from "./extension.js";
export { manifest } from "./manifest.js";
export type {
+ ChatQueueRouteResult,
ChatRouteError,
ChatRouteResult,
ChatSubscribeRouteResult,
diff --git a/packages/transport-ws/src/router.test.ts b/packages/transport-ws/src/router.test.ts
index a0ad104..66e2611 100644
--- a/packages/transport-ws/src/router.test.ts
+++ b/packages/transport-ws/src/router.test.ts
@@ -1,7 +1,8 @@
import type { SurfaceContext, SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry";
+import type { WsClientMessage } from "@dispatch/transport-contract";
import type { SurfaceCatalogEntry, SurfaceSpec } from "@dispatch/ui-contract";
import { describe, expect, it } from "vitest";
-import { catalogMessage, routeClientMessage, subKey } from "./router.js";
+import { catalogMessage, type RouteResult, routeClientMessage, subKey } from "./router.js";
// ── Fake in-memory registry (no mocks — just a plain implementation) ────────
@@ -450,6 +451,114 @@ describe("routeClientMessage", () => {
expect(result).toEqual({ kind: "chat-unsubscribe", conversationId: "conv-abc" });
});
});
+
+ describe("chat.queue", () => {
+ it("routes a valid chat.queue → { kind: 'chat-queue', conversationId, text } (what the shell passes to orchestrator.enqueue)", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.queue",
+ conversationId: "conv-1",
+ text: "steer here",
+ });
+
+ expect(result).toEqual({
+ kind: "chat-queue",
+ conversationId: "conv-1",
+ text: "steer here",
+ });
+ });
+
+ it("rejects empty/whitespace text → chat-error (no enqueue signal)", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ for (const text of ["", " ", "\t\n"]) {
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.queue",
+ conversationId: "conv-1",
+ text,
+ });
+
+ expect(result.kind).toBe("chat-error");
+ if (result.kind !== "chat-error") throw new Error("expected chat-error");
+ expect(result.conversationId).toBe("conv-1");
+ expect(result.errorMessage).toContain("non-empty string");
+ expect(result.errorMessage).toContain("text");
+ }
+ });
+
+ it("rejects missing text → chat-error (no enqueue signal)", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.queue",
+ conversationId: "conv-1",
+ text: undefined as unknown as string,
+ });
+
+ expect(result.kind).toBe("chat-error");
+ if (result.kind !== "chat-error") throw new Error("expected chat-error");
+ expect(result.errorMessage).toContain("non-empty string");
+ });
+
+ it("does not trim the stored text — passes the original through to the shell", () => {
+ const registry = fakeRegistry([]);
+ const connSubs = new Set<string>();
+
+ // Non-empty after trim (so valid), but the value carries surrounding
+ // whitespace: the router passes it through unchanged (validation uses
+ // trim; the orchestrator receives the original text).
+ const result = routeClientMessage(registry, connSubs, {
+ type: "chat.queue",
+ conversationId: "conv-1",
+ text: " steer ",
+ });
+
+ expect(result.kind).toBe("chat-queue");
+ if (result.kind !== "chat-queue") throw new Error("expected chat-queue");
+ expect(result.text).toBe(" steer ");
+ });
+ });
+
+ describe("exhaustive switch (regression guard for Wave-0 fan-out)", () => {
+ // Every WsClientMessage variant must route to a defined result with a
+ // known kind — no fall-through / undefined return. If the union is
+ // widened again, `tsc` catches the missing case (the switch is
+ // exhaustive); this test guards the runtime side of that contract.
+ it("routes every WsClientMessage variant to a defined RouteResult", () => {
+ const provider = fakeProvider("a", "Surface A", ["toggle"]);
+ const registry = fakeRegistry([provider]);
+ const connSubs = new Set<string>();
+
+ const samples: WsClientMessage[] = [
+ { type: "subscribe", surfaceId: "a" },
+ { type: "unsubscribe", surfaceId: "a" },
+ { type: "invoke", surfaceId: "a", actionId: "toggle", payload: true },
+ { type: "chat.send", message: "hi" },
+ { type: "chat.subscribe", conversationId: "c1" },
+ { type: "chat.unsubscribe", conversationId: "c1" },
+ { type: "chat.queue", conversationId: "c1", text: "steer" },
+ ];
+
+ const validKinds = new Set<RouteResult["kind"]>([
+ "surface",
+ "chat",
+ "chat-error",
+ "chat-subscribe",
+ "chat-unsubscribe",
+ "chat-queue",
+ ]);
+
+ for (const msg of samples) {
+ const result = routeClientMessage(registry, connSubs, msg);
+ expect(result).toBeDefined();
+ expect(validKinds.has(result.kind)).toBe(true);
+ }
+ });
+ });
});
describe("catalogMessage", () => {
diff --git a/packages/transport-ws/src/router.ts b/packages/transport-ws/src/router.ts
index 93f97f8..e1f53c5 100644
--- a/packages/transport-ws/src/router.ts
+++ b/packages/transport-ws/src/router.ts
@@ -9,6 +9,7 @@
import type { SurfaceContext, SurfaceRegistry } from "@dispatch/surface-registry";
import type {
+ ChatQueueMessage,
ChatSendMessage,
ChatSubscribeMessage,
ChatUnsubscribeMessage,
@@ -68,13 +69,27 @@ export interface ChatUnsubscribeRouteResult {
readonly conversationId: string;
}
+/**
+ * The effect a validated chat.queue should produce. The shell calls
+ * `orchestrator.enqueue({ conversationId, text })` and emits NOTHING back on
+ * either path (fire-and-forget): success is confirmed by the message-queue
+ * SURFACE updating (startedTurn:false) or by streaming chat.deltas
+ * (startedTurn:true — the shell auto-subscribes the sender, same as chat.send).
+ */
+export interface ChatQueueRouteResult {
+ readonly kind: "chat-queue";
+ readonly conversationId: string;
+ readonly text: string;
+}
+
/** The effect any client WS message should produce. */
export type RouteResult =
| SurfaceRouteResult
| ChatRouteResult
| ChatRouteError
| ChatSubscribeRouteResult
- | ChatUnsubscribeRouteResult;
+ | ChatUnsubscribeRouteResult
+ | ChatQueueRouteResult;
// ── Helpers ─────────────────────────────────────────────────────────────────
@@ -118,6 +133,8 @@ export function routeClientMessage(
return handleChatSubscribe(msg);
case "chat.unsubscribe":
return handleChatUnsubscribe(msg);
+ case "chat.queue":
+ return handleChatQueue(msg);
}
}
@@ -164,6 +181,27 @@ function handleChatUnsubscribe(msg: ChatUnsubscribeMessage): ChatUnsubscribeRout
return { kind: "chat-unsubscribe", conversationId: msg.conversationId };
}
+/**
+ * Validate a chat.queue: `text` must be a non-empty string AFTER TRIM (matches
+ * the HTTP `QueueRequest` rule). Invalid → `chat-error` (the shell replies with
+ * `chat.error`, same style as a malformed `chat.send`; the orchestrator is never
+ * called). Valid → `chat-queue` (the shell calls `orchestrator.enqueue`).
+ */
+function handleChatQueue(msg: ChatQueueMessage): ChatQueueRouteResult | ChatRouteError {
+ if (typeof msg.text !== "string" || msg.text.trim().length === 0) {
+ return {
+ kind: "chat-error",
+ conversationId: msg.conversationId,
+ errorMessage: "chat.queue requires a non-empty string `text`",
+ };
+ }
+ return {
+ kind: "chat-queue",
+ conversationId: msg.conversationId,
+ text: msg.text,
+ };
+}
+
// ── Per-message handlers ────────────────────────────────────────────────────
function handleSubscribe(
diff --git a/packages/transport-ws/src/server.bun.test.ts b/packages/transport-ws/src/server.bun.test.ts
index 43d008a..da3be5e 100644
--- a/packages/transport-ws/src/server.bun.test.ts
+++ b/packages/transport-ws/src/server.bun.test.ts
@@ -96,16 +96,22 @@ interface FakeOrchestratorOpts {
readonly startTurn?: SessionOrchestrator["startTurn"];
/** If true, startTurn always returns already-active. */
readonly alreadyActive?: boolean;
+ /** Custom enqueue impl. */
+ readonly enqueue?: SessionOrchestrator["enqueue"];
+ /** If true, enqueue reports the conversation was active (startedTurn:false). */
+ readonly queueActive?: boolean;
}
function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & {
readonly listeners: Map<string, Set<TurnEventListener>>;
readonly startCalls: readonly { conversationId: string; text: string }[];
+ readonly enqueueCalls: readonly { conversationId: string; text: string }[];
readonly aborted: boolean;
} {
const listeners = opts?.listeners ?? new Map<string, Set<TurnEventListener>>();
const bufferedEvents = opts?.bufferedEvents ?? new Map<string, readonly AgentEvent[]>();
const startCalls: { conversationId: string; text: string }[] = [];
+ const enqueueCalls: { conversationId: string; text: string }[] = [];
const aborted = false;
return {
@@ -113,6 +119,9 @@ function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & {
get startCalls() {
return startCalls;
},
+ get enqueueCalls() {
+ return enqueueCalls;
+ },
get aborted() {
return aborted;
},
@@ -126,6 +135,16 @@ function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & {
}
return { started: true, turnId: "fake-turn-id" };
},
+ enqueue(input) {
+ enqueueCalls.push({ conversationId: input.conversationId, text: input.text });
+ if (opts?.enqueue) {
+ return opts.enqueue(input);
+ }
+ if (opts?.queueActive) {
+ return { startedTurn: false, queue: [] };
+ }
+ return { startedTurn: true, queue: [] };
+ },
subscribe(conversationId, listener) {
let set = listeners.get(conversationId);
if (!set) {
@@ -159,12 +178,15 @@ function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & {
/** Create a fake orchestrator that broadcasts events when `broadcast` is called. */
function fakeOrchestratorWithBroadcast(): SessionOrchestrator & {
readonly listeners: Map<string, Set<TurnEventListener>>;
+ readonly enqueueCalls: readonly { conversationId: string; text: string }[];
broadcast(conversationId: string, event: AgentEvent): void;
} {
const listeners = new Map<string, Set<TurnEventListener>>();
+ const enqueueCalls: { conversationId: string; text: string }[] = [];
return {
listeners,
+ enqueueCalls,
broadcast(conversationId, event) {
const set = listeners.get(conversationId);
if (set) {
@@ -176,6 +198,10 @@ function fakeOrchestratorWithBroadcast(): SessionOrchestrator & {
startTurn(_input) {
return { started: true, turnId: "fake-turn-id" };
},
+ enqueue(input) {
+ enqueueCalls.push({ conversationId: input.conversationId, text: input.text });
+ return { startedTurn: true, queue: [] };
+ },
subscribe(conversationId, listener) {
let set = listeners.get(conversationId);
if (!set) {
@@ -323,6 +349,29 @@ function startServer(
break;
}
+ case "chat-queue": {
+ // Mirror extension.ts: fire-and-forget. On startedTurn:true
+ // auto-subscribe the sender (deltas stream); on false emit
+ // nothing back.
+ const enqueueResult = orchestrator.enqueue({
+ conversationId: result.conversationId,
+ text: result.text,
+ });
+ if (enqueueResult.startedTurn) {
+ if (!state.chatSubscriptions.has(result.conversationId)) {
+ const unsubscribe = orchestrator.subscribe(result.conversationId, (event) => {
+ ws.send(JSON.stringify({ type: "chat.delta", event }));
+ });
+ state.chatSubscriptions.set(result.conversationId, unsubscribe);
+ }
+ }
+ log.info?.("transport-ws: chat.queue accepted", {
+ conversationId: result.conversationId,
+ startedTurn: enqueueResult.startedTurn,
+ });
+ break;
+ }
+
case "chat-error": {
log.warn?.("transport-ws: malformed chat.send", {
reason: result.errorMessage,
@@ -649,6 +698,136 @@ describe("chat ops (new orchestrator API)", () => {
});
});
+describe("chat.queue (steering enqueue)", () => {
+ let server: ReturnType<typeof Bun.serve>;
+ let port: number;
+
+ afterEach(() => {
+ server.stop();
+ });
+
+ test("chat.queue with valid text → orchestrator.enqueue called with {conversationId, text}, no reply sent", async () => {
+ const orch = fakeOrchestrator(); // idle → startedTurn:true
+ const registry = fakeRegistry([]);
+ server = startServer(registry, orch);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: "steer please" }));
+ // Allow the message handler to run.
+ await new Promise((r) => setTimeout(r, 50));
+
+ expect(orch.enqueueCalls).toEqual([{ conversationId: "c1", text: "steer please" }]);
+ // chat.send-path equivalence: enqueue NOT called via startTurn.
+ expect(orch.startCalls).toHaveLength(0);
+ // Fire-and-forget: no chat.error, no ack — only the catalog was sent.
+ // (startedTurn:true path auto-subscribes but emits nothing itself.)
+
+ ws.close();
+ });
+
+ test("chat.queue on startedTurn:true auto-subscribes the sender (deltas stream as chat.delta)", async () => {
+ const orch = fakeOrchestratorWithBroadcast(); // enqueue → startedTurn:true
+ const registry = fakeRegistry([]);
+ server = startServer(registry, orch);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: "go" }));
+ await new Promise((r) => setTimeout(r, 50));
+
+ // The sender was auto-subscribed — a broadcast reaches it as a chat.delta.
+ const event = {
+ type: "text-delta",
+ conversationId: "c1",
+ turnId: "t1",
+ delta: "Hi",
+ } as AgentEvent;
+ orch.broadcast("c1", event);
+
+ const msg = await waitForMessage(ws);
+ expect(msg.type).toBe("chat.delta");
+ if (msg.type === "chat.delta") {
+ expect(msg.event).toEqual(event);
+ }
+
+ ws.close();
+ });
+
+ test("chat.queue on startedTurn:false (queued for steering) emits NOTHING back", async () => {
+ const orch = fakeOrchestrator({ queueActive: true }); // enqueue → startedTurn:false
+ const registry = fakeRegistry([]);
+ server = startServer(registry, orch);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: "steer" }));
+ await new Promise((r) => setTimeout(r, 50));
+
+ expect(orch.enqueueCalls).toEqual([{ conversationId: "c1", text: "steer" }]);
+ // No further message should arrive within a quiet window: success is
+ // confirmed by the message-queue SURFACE, not a reply here. We assert
+ // by NOT receiving anything (a silent socket).
+ await expect(
+ Promise.race([
+ waitForMessage(ws).then((m) => new Error(`unexpected reply: ${JSON.stringify(m)}`)),
+ new Promise((resolve) => setTimeout(() => resolve("silent"), 150)),
+ ]),
+ ).resolves.toBe("silent");
+
+ ws.close();
+ });
+
+ test("chat.queue with empty text → chat.error to client, no enqueue", async () => {
+ const orch = fakeOrchestrator();
+ const registry = fakeRegistry([]);
+ server = startServer(registry, orch);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: " " }));
+ const errMsg = await waitForMessage(ws);
+
+ expect(errMsg.type).toBe("chat.error");
+ if (errMsg.type === "chat.error") {
+ expect(errMsg.conversationId).toBe("c1");
+ expect(errMsg.message).toContain("non-empty string");
+ }
+ expect(orch.enqueueCalls).toHaveLength(0);
+
+ ws.close();
+ });
+
+ test("chat.queue with missing text → chat.error to client, no enqueue", async () => {
+ const orch = fakeOrchestrator();
+ const registry = fakeRegistry([]);
+ server = startServer(registry, orch);
+ port = server.port as number;
+
+ const ws = new WebSocket(`ws://localhost:${port}`);
+ await waitForMessage(ws); // drain catalog
+
+ ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1" }));
+ const errMsg = await waitForMessage(ws);
+
+ expect(errMsg.type).toBe("chat.error");
+ if (errMsg.type === "chat.error") {
+ expect(errMsg.message).toContain("non-empty string");
+ }
+ expect(orch.enqueueCalls).toHaveLength(0);
+
+ ws.close();
+ });
+});
+
describe("logging", () => {
let server: ReturnType<typeof Bun.serve>;
let port: number;
diff --git a/packages/wire/package.json b/packages/wire/package.json
index 07c20b7..b83f127 100644
--- a/packages/wire/package.json
+++ b/packages/wire/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/wire",
- "version": "0.7.0",
+ "version": "0.8.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 6a6de7d..3edeabf 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -224,6 +224,37 @@ export interface TurnMetrics {
readonly contextSize?: number;
}
+// ─── Message queue + steering ───────────────────────────────────────────────
+
+/**
+ * A user message held in a conversation's message queue, awaiting mid-turn
+ * steering delivery. The message-queue extension owns the queue and exposes it
+ * as a per-conversation `custom` surface field; this type is the shared shape
+ * the surface payload, the enqueue response, and the extension's service all
+ * use (so a separate frontend repo can depend on the wire alone to render it).
+ */
+export interface QueuedMessage {
+ /** Stable id (client-visible) for UI keying + dedup. */
+ readonly id: string;
+ /** The message text the client enqueued. */
+ readonly text: string;
+ /** When the message was enqueued (epoch-ms). */
+ readonly queuedAt: number;
+}
+
+/**
+ * The payload of the message-queue extension's per-conversation `custom`
+ * surface field (`rendererId: "message-queue"`): the current queue snapshot a
+ * frontend renders. Carried on the SURFACE channel (NOT the chat stream) — the
+ * queue is control/state, distinct from turn content. An empty `messages`
+ * array means the queue is empty (no pending steering). The frontend moves a
+ * message from this queue surface into the transcript when it is drained (the
+ * surface clears) and/or when the matching `TurnSteeringEvent` arrives.
+ */
+export interface QueuePayload {
+ readonly messages: readonly QueuedMessage[];
+}
+
// ─── Outward events ─────────────────────────────────────────────────────────
/**
@@ -243,7 +274,8 @@ export type AgentEvent =
| TurnStepCompleteEvent
| TurnErrorEvent
| TurnDoneEvent
- | TurnSealedEvent;
+ | TurnSealedEvent
+ | TurnSteeringEvent;
/** Status change for a conversation (e.g. idle → running). */
export interface StatusEvent {
@@ -440,3 +472,27 @@ export interface TurnSealedEvent {
readonly conversationId: string;
readonly turnId: string;
}
+
+/**
+ * A steering message was injected into an in-flight turn at the tool-result
+ * boundary (the model sees it alongside the tool results and may adjust
+ * course). Drawn from the conversation's message queue (which the drain
+ * clears); the cleared queue arrives as a message-queue SURFACE update, while
+ * THIS event carries the injected `text` so a frontend can place a user bubble
+ * in the transcript live — and so a late-joining watcher sees it before seal
+ * (mirroring `TurnInputEvent` for the opening prompt; emitted into the
+ * in-flight buffer by the session-orchestrator).
+ *
+ * Emitted by the session-orchestrator (in its `drainSteering` wrapper) only
+ * when the kernel drained a non-empty queue at a tool-result boundary. If the
+ * turn instead ENDS with a non-empty queue (no tool call fired), the queue is
+ * carried into a NEW turn whose opening `user-message` event covers the
+ * transcript — so no `steering` event is emitted in that case. One `steering`
+ * event per drain; the combined text of all drained messages.
+ */
+export interface TurnSteeringEvent {
+ readonly type: "steering";
+ readonly conversationId: string;
+ readonly turnId: string;
+ readonly text: string;
+}
diff --git a/tasks.md b/tasks.md
index 189980f..7f1a793 100644
--- a/tasks.md
+++ b/tasks.md
@@ -5,7 +5,7 @@
> Keep this lean and current; do not let it re-accrete a step-by-step changelog.
## Status (current)
-`tsc -b` EXIT 0 · biome clean · **894 vitest + transport bun green**.
+`tsc -b` EXIT 0 · biome clean · **1043 vitest + transport bun green**.
Built and verified live (full-fidelity: every feature is a manifest-loaded
extension through the host):
@@ -384,6 +384,46 @@ budget_tokens; `../claude` orchestrated DIRECTLY (mode A); CLI `--effort` now.
`../dispatch-web`): ChatRequest/chat.send field + GET/PUT endpoints + ladder + default-`high`
semantics + cache note.
+## Message queue + steering injection (DONE)
+Design: this file's roadmap item 3 (now implemented). User-gated calls: a **separate
+`message-queue` standard extension** (dependsOn `surface-registry`) owns the queue STATE +
+a per-conversation `custom` surface; the **session-orchestrator** owns delivery (drain →
+inject → carry) + emits the `steering` event (it owns the chat hub — no `chatEmit` service
+needed); the **kernel** gets a generic `drainSteering` callback. Glossary: added
+**message queue**, **steering**, **queued message**. Enqueue when idle **starts a turn**
+(user choice; `chat.queue` degrades to `chat.send`). Steering text rendered live via a new
+additive `steering` `AgentEvent`; queue state via the surface (NOT the chat stream).
+- **Wave 0 (orchestrator, contracts):** `RunTurnInput.drainSteering?: () => readonly
+ ChatMessage[]` (kernel contract — generic, kernel stays pure); `QueuedMessage` +
+ `QueuePayload` + `TurnSteeringEvent` (type `"steering"`, additive to `AgentEvent`) in
+ `@dispatch/wire` (`0.7.0→0.8.0`); `POST /conversations/:id/queue` + WS `chat.queue` op +
+ `QueueRequest`/`QueueResponse` in `@dispatch/transport-contract` (`0.11.0→0.12.0`). typecheck
+ clean except the expected transport-ws exhaustive-switch fan-out (fixed in Wave 3).
+- **Wave 1 (parallel ×2, disjoint):** `kernel` runtime — calls `drainSteering` at the
+ tool-result boundary only when continuing to a next step (gated; no drain on max-steps),
+ +6 pure tests (65 total); `message-queue` (NEW ext) — pure queue core (enqueue/getQueue/
+ drain/combine) + `MessageQueueService`/`messageQueueHandle` + per-conversation `custom`
+ surface (`rendererId:"message-queue"`, `QueuePayload`), 12 tests. (The message-queue agent
+ DIED mid-task after writing all src+tests but before verifying/reporting; orchestrator
+ recovered by running `bun install` + root tsconfig ref + verifying directly — tsc/vitest/
+ biome clean, 12 tests pass; no hand-fixing of impl.)
+- **Wave 2:** `session-orchestrator` — added `enqueue` facade (idle→`startTurn`,
+ active→queue.enqueue) + `resolveQueue?` dep (self-wired lazily in `activate` via
+ `host.getService(messageQueueHandle)` — host-bin does NOT wire it) + `drainSteering` wrapper
+ (drain → emit `steering` → return one combined user `ChatMessage`) + post-seal carry
+ (non-empty queue → new turn), +8 tests (85 total). `message-queue` is an OPTIONAL dep
+ (feature degrades off if absent).
+- **Wave 3 (parallel ×3):** `host-bin` — registered `message-queue` in `CORE_EXTENSIONS`
+ (+dep+ref), 28 tests; `transport-http` — `POST /conversations/:id/queue` route + validation,
+ 145 tests; `transport-ws` — `chat.queue` op + fixed the Wave-0 exhaustive-switch fan-out,
+ 29 vitest + 20 bun.
+- Verified: `tsc -b` EXIT 0, biome clean (280 files), **1043 vitest + 199 transport bun** pass;
+ all agents in-lane. **Boot smoke:** private instance boots clean with `message-queue`
+ registered (no activation crash).
+- [x] FE courier handoff written: `frontend-message-queue-handoff.md` (user couriers to
+ `../dispatch-web`): surface (`rendererId:"message-queue"`), `chat.queue` WS op, `steering`
+ event, HTTP `POST /queue`, auto-start-when-idle, carry semantics, version bumps.
+
## Open items
- **Context window LIMIT (deferred, sibling of context size):** expose the selected model's max
context-window token limit so the FE can render `contextSize / limit` (e.g. `1286 / 200000`).
@@ -420,19 +460,12 @@ budget_tokens; `../claude` orchestrated DIRECTLY (mode A); CLI `--effort` now.
whole conversation).
- **send, no `--queue` flag (default):** BLOCKING — sends, waits for the
turn to settle, returns the AI's last message (same shape as the read).
- - **send with `--queue`:** enqueues the message into the conversation's
- message queue (roadmap item 3) and exits immediately.
-3. **Message queue + steering injection (backend core; prerequisite for the
- `--queue` flag in item 2):** a per-conversation queue a client (FE or CLI)
- can push a message onto while a turn is GENERATING. Delivery semantics:
- - On the turn's next TOOL CALL, queued messages are injected as "steering"
- messages returned alongside the tool result (the model sees them
- mid-turn and can adjust course).
- - If the turn ends before any tool call fires, the queued messages are
- COMBINED and sent as a fresh user message starting a NEW turn.
- - FE queueing UX (queue while generating) couriered to `../dispatch-web`.
- Touches the kernel turn loop (injection at the tool-result boundary) — a
- contract-first design pass needed before summoning.
+ - **send with `--queue`:** enqueues the message into the conversation's
+ message queue (DONE — see "Message queue + steering injection" above; the
+ `POST /conversations/:id/queue` endpoint + `chat.queue` WS op ship it) and
+ exits immediately.
+ 3. **Message queue + steering injection — DONE** (see the milestone section above;
+ prerequisite for item 2's `--queue` flag met).
4. **CLI flag to open/activate an FE tab:** optional CLI flag (new or existing
conversation) that makes an already-open frontend open the conversation as a
tab and mark it active. Does NOT exist today — no backend→FE "open/focus
@@ -442,7 +475,18 @@ budget_tokens; `../claude` orchestrated DIRECTLY (mode A); CLI `--effort` now.
5. **`todo` tool** — a per-conversation task-list tool the model maintains
(like opencode's todowrite/todoread), as a standard tool extension; likely a
surface so the FE can render the live list.
-6. **`web_search` tool** — a web search tool (like old dispatch's;
- reference-only source at `../dispatch-source`), as a standard tool extension.
+ 6. **`web_search` tool** — a web search tool (like old dispatch's;
+ reference-only source at `../dispatch-source`), as a standard tool extension.
+ 7. **Message queue — close-with-queued-messages (deferred product decision):**
+ if a client closes a conversation (`POST /conversations/:id/close`) while the
+ queue is non-empty, the carry currently still fires (starts a new turn on the
+ closed conversation). Decide: does closing discard pending steering, or honor
+ it? If "discard," gate the carry on `finishReason !== "aborted"` in
+ session-orchestrator (one-line). No FE action either way.
+ 8. **Live-verify the steering flow (once the frontend is complete):** run a live
+ `chat.queue` → tool-call → `steering` event flow against a real tool-calling
+ model, end-to-end. The logic is unit/integration tested + boot-smoke-clean;
+ this is the live end-to-end smoke. Blocked on the frontend wiring the queue
+ surface + `chat.queue` op (or run it backend-only with a probe client).
-(Done and dropped from the list: CLI; dedup / storage growth.)
+(Done and dropped from the list: CLI; dedup / storage growth; message queue + steering injection.)
diff --git a/tsconfig.json b/tsconfig.json
index 13a4735..66982e4 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -22,6 +22,7 @@
{ "path": "./packages/tool-write-file" },
{ "path": "./packages/skills" },
{ "path": "./packages/cache-warming" },
+ { "path": "./packages/message-queue" },
{ "path": "./packages/lsp" },
{ "path": "./packages/cli" },
{ "path": "./packages/journal-sink" },