diff options
43 files changed, 2636 insertions, 34 deletions
diff --git a/GLOSSARY.md b/GLOSSARY.md index 61a555d..a1ef33c 100644 --- a/GLOSSARY.md +++ b/GLOSSARY.md @@ -34,6 +34,9 @@ | **hook** | A typed extension point. **event** = fire-and-forget, N listeners, error-isolated. **filter** = ordered value-in→value-out chain, in-band. | callback (when meaning a hook), listener | | **service** | A single-responder request/response capability fetched via a typed handle. NOT a hook. | — | | **dispatch policy** | `{ maxConcurrent, eager }` controlling how the turn loop runs a step's tool calls. | — | +| **message queue** | The per-conversation buffer of user messages a client (FE or CLI) enqueues while a turn is GENERATING, awaiting mid-turn steering delivery. Owned by the `message-queue` extension; exposed to the frontend as a per-conversation `custom` surface (`rendererId: "message-queue"`, `QueuePayload`). Enqueuing when no turn is active starts a new turn instead. | steering queue, pending queue | +| **steering** | A user message injected into an in-flight turn at the tool-result boundary (drawn from the message queue), so the model sees it alongside the tool results and can adjust course mid-turn. Emitted on the chat stream as a `steering` `AgentEvent`. If the turn ENDS with a non-empty queue (no tool call fired), the queue is instead carried into a NEW turn as its opening prompt. | interjection, mid-turn input | +| **queued message** | An item in the message queue: `{ id, text, queuedAt }`. The unit the frontend renders in the queue surface. | pending message | | **reconcile** | The pure function run on load that repairs a partial/interrupted turn into a valid history. | recover, repair | | **session-orchestrator** | The core extension that drives a turn: load history → resolve provider/tools → call `runTurn` → persist. | — | | **conversation-store** | The core extension persisting the append-only turn/chunk log. | message store | @@ -97,6 +97,16 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/message-queue": { + "name": "@dispatch/message-queue", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/surface-registry": "workspace:*", + "@dispatch/ui-contract": "workspace:*", + "@dispatch/wire": "workspace:*", + }, + }, "packages/observability-collector": { "name": "@dispatch/observability-collector", "version": "0.0.0", @@ -120,6 +130,7 @@ "@dispatch/conversation-store": "workspace:*", "@dispatch/credential-store": "workspace:*", "@dispatch/kernel": "workspace:*", + "@dispatch/message-queue": "workspace:*", }, }, "packages/skills": { @@ -182,6 +193,13 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/tool-web-search": { + "name": "@dispatch/tool-web-search", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + }, + }, "packages/tool-write-file": { "name": "@dispatch/tool-write-file", "version": "0.0.0", @@ -202,7 +220,7 @@ }, "packages/transport-contract": { "name": "@dispatch/transport-contract", - "version": "0.5.0", + "version": "0.12.0", "dependencies": { "@dispatch/ui-contract": "workspace:*", "@dispatch/wire": "workspace:*", @@ -235,11 +253,11 @@ }, "packages/ui-contract": { "name": "@dispatch/ui-contract", - "version": "0.1.0", + "version": "0.2.0", }, "packages/wire": { "name": "@dispatch/wire", - "version": "0.4.0", + "version": "0.8.0", }, }, "packages": { @@ -279,6 +297,8 @@ "@dispatch/lsp": ["@dispatch/lsp@workspace:packages/lsp"], + "@dispatch/message-queue": ["@dispatch/message-queue@workspace:packages/message-queue"], + "@dispatch/observability-collector": ["@dispatch/observability-collector@workspace:packages/observability-collector"], "@dispatch/provider-openai-compat": ["@dispatch/provider-openai-compat@workspace:packages/provider-openai-compat"], @@ -301,6 +321,8 @@ "@dispatch/tool-shell": ["@dispatch/tool-shell@workspace:packages/tool-shell"], + "@dispatch/tool-web-search": ["@dispatch/tool-web-search@workspace:packages/tool-web-search"], + "@dispatch/tool-write-file": ["@dispatch/tool-write-file@workspace:packages/tool-write-file"], "@dispatch/trace-replay": ["@dispatch/trace-replay@workspace:packages/trace-replay"], diff --git a/frontend-message-queue-handoff.md b/frontend-message-queue-handoff.md new file mode 100644 index 0000000..2d55220 --- /dev/null +++ b/frontend-message-queue-handoff.md @@ -0,0 +1,189 @@ +# FE handoff — message queue + steering injection + +Courier this to `../dispatch-web` (cross-repo contract change; `lsp references` does +not span repos — ORCHESTRATOR §7). All changes are ADDITIVE — nothing existing breaks. + +## What shipped (backend) + +A per-conversation **message queue** + **steering** feature. While a turn is +GENERATING, a client can enqueue a user message onto the conversation's queue; +it is delivered mid-turn as **steering** — injected at the next tool-result +boundary so the model sees it alongside the tool results and can adjust course. +If the turn ends with a non-empty queue (no tool call fired), the queue is +carried into a NEW turn as its opening prompt. + +- **`message queue`** — the per-conversation buffer (owned by a new + `@dispatch/message-queue` extension). Transient + in-memory; the queue is + NOT on the chat stream — it is exposed to the frontend as a per-conversation + SURFACE (see below). +- **`steering`** — a user message injected into an in-flight turn at the + tool-result boundary (drawn from the queue). Emitted on the chat stream as a + new `steering` `AgentEvent` so it appears in the transcript live. + +Versions: `@dispatch/wire` `0.7.0 → 0.8.0`, `@dispatch/transport-contract` +`0.11.0 → 0.12.0`. Bump the pinned `file:` deps. (`@dispatch/ui-contract` is +unchanged — the queue uses the existing `custom` surface field kind.) + +## Wire types (in `@dispatch/wire`, re-exported by `@dispatch/transport-contract`) + +```ts +/** A message held in the conversation's queue, awaiting steering delivery. */ +interface QueuedMessage { + readonly id: string; // stable, client-visible (UI key + dedup) + readonly text: string; + readonly queuedAt: number; // epoch-ms +} + +/** Payload of the message-queue surface's `custom` field (see below). */ +interface QueuePayload { + readonly messages: readonly QueuedMessage[]; +} + +/** New `AgentEvent` variant (additive to the union). */ +interface TurnSteeringEvent { + readonly type: "steering"; + readonly conversationId: string; + readonly turnId: string; + readonly text: string; // the combined text of all drained messages +} +``` + +## How the frontend reads queue STATE: a surface (NOT the chat stream) + +The queue is control/state, so it rides the **surface** channel (like +cache-warming), not the chat event stream. The `message-queue` extension +contributes a per-conversation surface: + +- **Surface id:** `"message-queue"`; **scope:** `"conversation"` (subscribe with + the `conversationId`). +- **One `custom` field**, `rendererId: "message-queue"`, `payload: QueuePayload` + (`{ messages: QueuedMessage[] }` — the current queue snapshot). +- The surface updates (full new spec) on every change: enqueue (queue grew) and + drain (queue emptied). An idle conversation's queue is empty → the field's + `messages` is `[]`. + +So: **subscribe** to the `message-queue` surface per conversation and render +the queue list from `payload.messages`. You need a bespoke renderer for +`rendererId: "message-queue"` (the `custom` escape hatch — see the loaded- +extensions `table` renderer precedent). The surface is **read-only** (no +`invoke` actions); enqueuing is a chat op (below). + +## How the frontend ENQUEUES: the `chat.queue` WS op + +```ts +interface ChatQueueMessage { + readonly type: "chat.queue"; + readonly conversationId: string; + readonly text: string; +} +``` +(additive to `WsClientMessage`.) + +- **Fire-and-forget.** On success the server emits NOTHING back — the + `message-queue` SURFACE updates (the new message appears in the snapshot). + On failure (empty/missing `text`, unknown conversation) the server replies + `chat.error` (`{ type: "chat.error"; conversationId?; message }`). +- **`text` must be non-empty** after trim (the server 400/errors otherwise). +- **Auto-start when idle (server-owned decision):** if NO turn is active for the + conversation, `chat.queue` does NOT queue — it STARTS A NEW TURN with the + message as its opening prompt (equivalent to `chat.send`). The sender is + auto-subscribed and the turn's events stream as `chat.delta`s (the opening + `user-message` carries the text). So a single `chat.queue` op works for both + "steer during generation" and "send" — you don't need to pick. When a turn IS + active, the message is appended to the queue (surface updates) and delivered + at the next tool-result boundary. + +## How the frontend shows steering in the TRANSCRIPT: the `steering` event + +When the kernel drains a non-empty queue at a tool-result boundary, the +session-orchestrator emits a **`steering`** `AgentEvent` on the chat stream +(arrives inside a `chat.delta` `{ event }`, like every other `AgentEvent`): + +```ts +{ type: "chat.delta", event: { type: "steering", conversationId, turnId, text } } +``` + +- Render `text` as a **user bubble in the transcript**, positioned after the + tool-call/tool-result it followed (it is a user message the model saw mid-turn, + alongside the tool results). One `steering` event per drain; `text` is the + combined text of all messages drained at that boundary (joined by a blank + line). +- **Move, don't duplicate:** the drained messages were already shown in the + queue surface; when the surface then updates to empty (the drain cleared the + queue), they should leave the queue UI (they now live in the transcript as the + `steering` bubble). A simple rule: on `steering`, append the bubble to the + transcript; the surface's subsequent empty snapshot clears the queue UI. +- **Late-join safe:** like `user-message`, `steering` is buffered into the + in-flight turn's event buffer, so a client that subscribes mid-turn (or a + second device) sees it before seal (mirrors the CR-3 `user-message` fix). + (Carry-to-new-turn, below, does NOT emit `steering` — the new turn's + `user-message` covers it.) + +## Carry to a new turn (no `steering` event) + +If a turn ENDS with a non-empty queue (the model finished without making a tool +call, so no tool-result boundary was hit), the orchestrator drains the queue, +combines the messages, and **starts a NEW turn** whose opening prompt is the +combined text. You will see: the old turn's `done` + `turn-sealed`, then a new +`turn-start` + `user-message` carrying the combined text (rendered as the new +turn's normal user bubble). The queue surface also clears (empty snapshot). No +`steering` event in this case — handle the carried text as an ordinary new-turn +user message. + +## HTTP path (for the CLI / non-WS clients; the FE uses the WS op above) + +`POST /conversations/:id/queue` with body `QueueRequest { text }` → `QueueResponse`: + +```ts +interface QueueResponse { + readonly conversationId: string; + readonly startedTurn: boolean; // true = was idle, a new turn started + readonly queue: readonly QueuedMessage[]; // snapshot after the enqueue +} +``` +- Empty/whitespace `text` → HTTP 400 `{ error }`. +- `startedTurn: true` means no turn was active and the enqueue started one (the + message is the turn's opening prompt, NOT a queued steering message). +- `startedTurn: false` means a turn was active and the message was queued (the + `queue` snapshot includes it). + +## What we need the FE to do + +1. **Bump pinned deps:** `@dispatch/wire` → `0.8.0`, `@dispatch/transport-contract` + → `0.12.0`. +2. **Queue UI (per conversation):** subscribe to the `message-queue` surface + (scope `conversation`) and render `payload.messages` (`QueuedMessage[]`) with a + `rendererId: "message-queue"` custom renderer — a list of pending messages + with their text (and maybe `queuedAt` as a timestamp). Empty `messages` = + nothing to show (hide the panel). +3. **Enqueue affordance:** while a turn is generating, show an input that sends + `chat.queue { conversationId, text }` (NOT `chat.send` — `chat.queue` is the + steering entry; it auto-starts a turn if idle, so it's safe to offer it + whenever the user wants to add input). Trim/validate non-empty client-side + too; expect a `chat.error` on failure. +4. **Steering bubble:** handle the new `steering` `AgentEvent` (type `"steering"`) + on the `chat.delta` stream → render `event.text` as a user bubble in the + transcript after the tool calls; clear the queue UI when the surface updates + to empty. +5. **Carry:** no special handling — a carried queue surfaces as a normal new + turn (`turn-start` + `user-message`); just let the existing new-turn flow + render it. The queue surface clears automatically. + +## Notes / known gaps + +- **Live end-to-end (a real steering turn via a tool-calling model) is not yet + exercised** — the logic is unit/integration tested and the app boots clean with + the `message-queue` extension registered, but a live `chat.queue` → tool-call + → `steering` event flow against a real model has not been run. Worth a live + smoke once the FE wires it (or ask the backend to run one). +- **Close-with-queued-messages (open product question):** if a client + `POST /conversations/:id/close` (explicit tab close) while the queue is + non-empty, the in-flight turn aborts and the carry currently STILL fires + (starting a new turn on the closed conversation). This may or may not be + desired (does closing discard pending steering, or honor it?). Backend flag + for a decision; if "discard on close" is wanted, the backend will gate the + carry on `finishReason !== "aborted"`. No FE action either way — just be aware + a closed conversation might briefly start a turn from a queued message. +- **`steering` is additive** to the `AgentEvent` union — no exhaustive switches + broke on the backend (verified: `tsc -b` EXIT 0). If the FE has an exhaustive + switch on `AgentEvent`, add a `steering` case. diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json index 27a4a1b..d55b369 100644 --- a/packages/host-bin/package.json +++ b/packages/host-bin/package.json @@ -11,6 +11,7 @@ "@dispatch/cache-warming": "workspace:*", "@dispatch/credential-store": "workspace:*", "@dispatch/provider-openai-compat": "workspace:*", + "@dispatch/message-queue": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", "@dispatch/skills": "workspace:*", "@dispatch/throughput-store": "workspace:*", diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index 420f12e..59ce47d 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -20,6 +20,7 @@ import { type StorageNamespace, } from "@dispatch/kernel"; import { extension as lspExt } from "@dispatch/lsp"; +import { extension as messageQueueExt } from "@dispatch/message-queue"; import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat"; import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator"; import { extension as skillsExt } from "@dispatch/skills"; @@ -73,6 +74,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [ toolShellExt, toolWriteFileExt, throughputStoreExt, + messageQueueExt, sessionOrchestratorExt, skillsExt, cacheWarmingExt, diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json index efe3815..110f931 100644 --- a/packages/host-bin/tsconfig.json +++ b/packages/host-bin/tsconfig.json @@ -5,6 +5,7 @@ "references": [ { "path": "../cache-warming" }, { "path": "../kernel" }, + { "path": "../message-queue" }, { "path": "../storage-sqlite" }, { "path": "../surface-loaded-extensions" }, { "path": "../surface-registry" }, diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index b1385a2..6c9652d 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -14,6 +14,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnSteeringEvent, TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index ffcbe76..4b1350b 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -38,6 +38,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnSteeringEvent, TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index b7fe23c..c449a68 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -100,6 +100,23 @@ export interface RunTurnInput { * absent) — backward-compatible with callers that don't provide a clock. */ readonly now?: () => number; + + /** + * Optional. Called by the runtime at the tool-result boundary — after a + * step whose tool calls have all executed, before the next step begins — + * to drain messages to inject alongside the tool results. Whatever it + * returns is appended as user-role messages to the next step's input, so + * a caller can inject mid-turn guidance the model sees with the tool + * results. When omitted or returning an empty array, no injection happens + * (the runtime is unchanged). + * + * Injected (not ambient) so the kernel stays pure: it owns no queue and + * names no feature — it just calls the callback and appends what it gets. + * Only invoked when a step PRODUCED tool calls (the tool-result boundary); + * a step that ends without tool calls does not drain (the caller decides + * what to do with any pending messages after the turn ends). + */ + readonly drainSteering?: () => readonly ChatMessage[]; } /** diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index dcaea7f..0d4c59d 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -5,7 +5,7 @@ import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.j import type { ProviderContract, ProviderEvent } from "../contracts/provider.js"; import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { createLogger } from "../logging/logger.js"; -import { runTurn } from "./run-turn.js"; +import { MAX_STEPS, runTurn } from "./run-turn.js"; function delay(ms: number): Promise<void> { return new Promise((resolve) => { @@ -29,6 +29,28 @@ function createFakeProvider(script: ProviderEvent[][]): ProviderContract { }; } +function createCapturingProvider(script: ProviderEvent[][]): { + provider: ProviderContract; + capturedMessages: ChatMessage[][]; +} { + const capturedMessages: ChatMessage[][] = []; + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream(messages, _tools) { + capturedMessages.push([...messages]); + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) { + yield event; + } + })(); + }, + }; + return { provider, capturedMessages }; +} + function createFakeTool( name: string, handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>, @@ -2577,4 +2599,226 @@ describe("runTurn", () => { } }); }); + + describe("drainSteering", () => { + it("drainSteering called once at the tool-result boundary; returned messages appended to the next step's provider input (after tool results)", async () => { + let drainCallCount = 0; + const steeringMessage: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "steer!" }], + }; + + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return [steeringMessage]; + }, + }); + + expect(drainCallCount).toBe(1); + // The provider was called twice (tool-call step, then text step). + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + // user, assistant(tool-call), tool-result, steering(user) — in order, + // steering appended AFTER the tool results, before the next call. + expect(secondStepMessages).toHaveLength(4); + expect(secondStepMessages[0]?.role).toBe("user"); + expect(secondStepMessages[1]?.role).toBe("assistant"); + expect(secondStepMessages[2]?.role).toBe("tool"); + expect(secondStepMessages[3]).toEqual(steeringMessage); + expect(secondStepMessages[3]?.role).toBe("user"); + // Steering is fed to the next provider call, NOT surfaced in the + // turn result — the caller owns the steering messages' lifecycle. + expect(result.messages).toHaveLength(3); + }); + + it("drainSteering omitted → no injection; turn byte-identical to before", async () => { + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + // drainSteering omitted — must be a strict no-op. + }); + + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + // user, assistant(tool-call), tool-result — NO steering injected. + expect(secondStepMessages).toHaveLength(3); + expect(secondStepMessages[0]?.role).toBe("user"); + expect(secondStepMessages[1]?.role).toBe("assistant"); + expect(secondStepMessages[2]?.role).toBe("tool"); + expect(result.messages).toHaveLength(3); + }); + + it("drainSteering returns [] → no injection", async () => { + let drainCallCount = 0; + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + // Called at the boundary, but returned nothing → no injection. + expect(drainCallCount).toBe(1); + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + expect(secondStepMessages).toHaveLength(3); + expect(secondStepMessages[2]?.role).toBe("tool"); + }); + + it("drainSteering NOT called when a step has no tool calls (text-only turn)", async () => { + let drainCallCount = 0; + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hello" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + expect(drainCallCount).toBe(0); + }); + + it("multiple tool-call steps → drainSteering called once per tool-call step", async () => { + let drainCallCount = 0; + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "tool-call", toolCallId: "tc2", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + // Steps 0 and 1 each produced tool calls → drained once each. + // Step 2 (text-only) → no boundary → no drain. Total = 2. + expect(drainCallCount).toBe(2); + }); + + it("drainSteering NOT called when max-steps ends the turn after a tool-call step (no next step → no drain)", async () => { + let drainCallCount = 0; + // Every step produces a tool call → the turn runs to MAX_STEPS. + const script: ProviderEvent[][] = Array.from({ length: MAX_STEPS }, () => [ + { type: "tool-call", toolCallId: "tc", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ]); + const provider = createFakeProvider(script); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + expect(result.finishReason).toBe("max-steps"); + // MAX_STEPS tool-call steps (indices 0..MAX_STEPS-1). Drained on every + // step that is followed by a next step (0..MAX_STEPS-2 = MAX_STEPS-1 + // calls); the final step is the max-steps boundary → no next step → + // no drain (queue left intact for the caller). + expect(drainCallCount).toBe(MAX_STEPS - 1); + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index bf57854..228ef8a 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -538,6 +538,19 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { if (step === MAX_STEPS - 1) { finishReason = "max-steps"; + // No next step → no tool-result boundary. Leave any pending + // steering messages for the caller (it owns the queue). + } else { + // Tool-result boundary: this step produced tool calls and we are + // about to call provider.stream again. Drain steering messages + // and append them after the tool results, before the next call. + // The kernel owns no queue and names no feature — it just calls + // the callback and appends. Emits nothing (caller emits the + // `steering` AgentEvent in its own wrapper). + const steering = input.drainSteering?.() ?? []; + for (const msg of steering) { + messages.push(msg); + } } } } finally { diff --git a/packages/message-queue/package.json b/packages/message-queue/package.json new file mode 100644 index 0000000..a8fcc4b --- /dev/null +++ b/packages/message-queue/package.json @@ -0,0 +1,14 @@ +{ + "name": "@dispatch/message-queue", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/surface-registry": "workspace:*", + "@dispatch/ui-contract": "workspace:*", + "@dispatch/wire": "workspace:*" + } +} diff --git a/packages/message-queue/src/extension.ts b/packages/message-queue/src/extension.ts new file mode 100644 index 0000000..26d19ca --- /dev/null +++ b/packages/message-queue/src/extension.ts @@ -0,0 +1,82 @@ +/** + * message-queue extension — owns the per-conversation steering message queue + * (state + surface + drain-and-clear). Plugs into the kernel host via a + * manifest + activate(host); the session-orchestrator drains it via the + * `messageQueueHandle` service. Does NOT touch the turn loop. + */ +import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; +import type { SurfaceContext, SurfaceProvider } from "@dispatch/surface-registry"; +import { surfaceRegistryHandle } from "@dispatch/surface-registry"; +import type { SurfaceSpec } from "@dispatch/ui-contract"; +import { buildQueueSpec, MESSAGE_QUEUE_SURFACE_ID } from "./pure.js"; +import { createMessageQueueService, messageQueueHandle } from "./service.js"; + +export const manifest: Manifest = { + id: "message-queue", + name: "Message Queue", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + activation: "eager", + dependsOn: ["surface-registry"], + capabilities: {}, + contributes: { + services: ["message-queue"], + }, +}; + +export function activate(host: HostAPI): void { + const registry = host.getService(surfaceRegistryHandle); + + const subscribers = new Set<() => void>(); + + const service = createMessageQueueService({ + id: () => crypto.randomUUID(), + now: () => Date.now(), + notify: () => { + for (const sub of subscribers) { + sub(); + } + }, + logger: host.logger, + }); + + host.provideService(messageQueueHandle, service); + + function getSpec(context?: SurfaceContext): SurfaceSpec { + const convId = context?.conversationId; + const messages = convId === undefined ? [] : service.getQueue(convId); + return buildQueueSpec(messages); + } + + function invoke(_actionId: string, _payload?: unknown, _context?: SurfaceContext): void { + // The message-queue surface is read-only: a client renders the queue and + // the session-orchestrator drains it. No client-facing actions. + } + + const provider: SurfaceProvider = { + catalogEntry: { + id: MESSAGE_QUEUE_SURFACE_ID, + region: "side", + title: "Message Queue", + scope: "conversation", + }, + getSpec, + invoke, + subscribe(onChange) { + subscribers.add(onChange); + return () => { + subscribers.delete(onChange); + }; + }, + }; + + registry.register(provider); + + host.logger.info("message-queue: registered"); +} + +export const extension: Extension = { + manifest, + activate, +}; diff --git a/packages/message-queue/src/index.ts b/packages/message-queue/src/index.ts new file mode 100644 index 0000000..79bbb25 --- /dev/null +++ b/packages/message-queue/src/index.ts @@ -0,0 +1,27 @@ +/** + * @dispatch/message-queue — the per-conversation message queue for mid-turn + * steering. Owns the queue state + a per-conversation `custom` surface; the + * session-orchestrator drains it via `messageQueueHandle`. The queue is + * transient (in-memory, per-conversation, only meaningful during generation); + * the surface is the ONLY way the frontend reads queue state. + */ + +export type { QueuedMessage, QueuePayload } from "@dispatch/wire"; +export { extension, manifest } from "./extension.js"; +export { + buildQueueSpec, + combine, + drain, + enqueue, + getQueue, + MESSAGE_QUEUE_RENDERER_ID, + MESSAGE_QUEUE_SURFACE_ID, + type MessageQueueState, + type QueueDeps, +} from "./pure.js"; +export { + createMessageQueueService, + type MessageQueueDeps, + type MessageQueueService, + messageQueueHandle, +} from "./service.js"; diff --git a/packages/message-queue/src/pure.test.ts b/packages/message-queue/src/pure.test.ts new file mode 100644 index 0000000..ff1a08f --- /dev/null +++ b/packages/message-queue/src/pure.test.ts @@ -0,0 +1,144 @@ +import type { QueuedMessage } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { + buildQueueSpec, + combine, + drain, + enqueue, + getQueue, + MESSAGE_QUEUE_RENDERER_ID, + MESSAGE_QUEUE_SURFACE_ID, + type MessageQueueState, + type QueueDeps, +} from "./pure.js"; + +function makeDeps(): QueueDeps { + let idCounter = 0; + let now = 1_000; + return { + id: () => `id-${++idCounter}`, + now: () => (now += 10), + }; +} + +describe("enqueue", () => { + it("enqueue appends + returns snapshot with the new message (id unique, queuedAt set)", () => { + const state: MessageQueueState = new Map(); + const deps = makeDeps(); + + const first = enqueue(state, "c1", "first", deps); + expect(first).toHaveLength(1); + const firstMsg = first[0]; + if (firstMsg === undefined) throw new Error("expected a message"); + expect(firstMsg.id).toBe("id-1"); + expect(firstMsg.text).toBe("first"); + expect(firstMsg.queuedAt).toBe(1_010); + + const second = enqueue(state, "c1", "second", deps); + expect(second).toHaveLength(2); + const secondMsg = second[1]; + if (secondMsg === undefined) throw new Error("expected a message"); + expect(secondMsg.id).toBe("id-2"); // unique per message + expect(secondMsg.text).toBe("second"); + expect(secondMsg.queuedAt).toBe(1_020); + + // a separate conversation does not share state + const other = enqueue(state, "c2", "other", deps); + expect(other).toHaveLength(1); + expect(getQueue(state, "c2")).toHaveLength(1); + }); +}); + +describe("getQueue", () => { + it("getQueue returns current snapshot; empty array when none", () => { + const state: MessageQueueState = new Map(); + const deps = makeDeps(); + + expect(getQueue(state, "unknown")).toEqual([]); + + enqueue(state, "c1", "x", deps); + enqueue(state, "c1", "y", deps); + const snap = getQueue(state, "c1"); + expect(snap.map((m) => m.text)).toEqual(["x", "y"]); + + // returns a COPY — mutating it does not affect live state + snap.push({ id: "evil", text: "mutate", queuedAt: 0 }); + expect(getQueue(state, "c1")).toHaveLength(2); + }); +}); + +describe("drain", () => { + it("drain returns all + clears; second drain returns []", () => { + const state: MessageQueueState = new Map(); + const deps = makeDeps(); + enqueue(state, "c1", "a", deps); + enqueue(state, "c1", "b", deps); + + const drained = drain(state, "c1"); + expect(drained.map((m) => m.text)).toEqual(["a", "b"]); + + // cleared + expect(getQueue(state, "c1")).toEqual([]); + expect(drain(state, "c1")).toEqual([]); + }); + + it("drain on an empty queue returns [] and is a no-op", () => { + const state: MessageQueueState = new Map(); + + // never had a queue + expect(drain(state, "c1")).toEqual([]); + expect(getQueue(state, "c1")).toEqual([]); + + // and after a drain that emptied it, draining again is a no-op + const deps = makeDeps(); + enqueue(state, "c1", "once", deps); + drain(state, "c1"); + expect(drain(state, "c1")).toEqual([]); + expect(getQueue(state, "c1")).toEqual([]); + }); +}); + +describe("combine", () => { + it("combine joins texts with blank-line separator", () => { + const msgs: QueuedMessage[] = [ + { id: "1", text: "alpha", queuedAt: 1 }, + { id: "2", text: "beta", queuedAt: 2 }, + ]; + expect(combine(msgs)).toBe("alpha\n\nbeta"); + expect(combine([])).toBe(""); + expect(combine([{ id: "1", text: "solo", queuedAt: 1 }])).toBe("solo"); + }); +}); + +describe("buildQueueSpec", () => { + it("renders a custom field with the queue snapshot", () => { + const msgs: QueuedMessage[] = [ + { id: "1", text: "a", queuedAt: 1 }, + { id: "2", text: "b", queuedAt: 2 }, + ]; + const spec = buildQueueSpec(msgs); + expect(spec.id).toBe(MESSAGE_QUEUE_SURFACE_ID); + expect(spec.region).toBe("side"); + expect(spec.title).toBe("Message Queue"); + expect(spec.fields).toHaveLength(1); + + const field = spec.fields[0]; + if (field === undefined || field.kind !== "custom") { + throw new Error("expected a custom field"); + } + expect(field.rendererId).toBe(MESSAGE_QUEUE_RENDERER_ID); + const payload = field.payload as { messages: readonly QueuedMessage[] }; + expect(payload.messages).toHaveLength(2); + expect(payload.messages.map((m) => m.text)).toEqual(["a", "b"]); + }); + + it("renders an empty list for an empty queue", () => { + const spec = buildQueueSpec([]); + const field = spec.fields[0]; + if (field === undefined || field.kind !== "custom") { + throw new Error("expected a custom field"); + } + const payload = field.payload as { messages: readonly QueuedMessage[] }; + expect(payload.messages).toEqual([]); + }); +}); diff --git a/packages/message-queue/src/pure.ts b/packages/message-queue/src/pure.ts new file mode 100644 index 0000000..834018b --- /dev/null +++ b/packages/message-queue/src/pure.ts @@ -0,0 +1,105 @@ +/** + * Pure core for message-queue — zero I/O, zero ambient state. + * + * Every function is input → output; testable without mocks. State is a plain + * `Map<conversationId, QueuedMessage[]>` OWNED by the caller (the service + * shell); the pure functions mutate it in place and return snapshots (fresh + * array copies), so a caller can never reach into or mutate live state through + * a returned value. The id factory + clock are injected (`QueueDeps`) so tests + * are deterministic. + */ + +import type { CustomField, SurfaceSpec } from "@dispatch/ui-contract"; +import type { QueuedMessage, QueuePayload } from "@dispatch/wire"; + +/** The queue state: a per-conversation map of queued messages. */ +export type MessageQueueState = Map<string, QueuedMessage[]>; + +/** Injected effectful factories kept out of the pure core. */ +export interface QueueDeps { + /** Stable (client-visible) id factory for UI keying + dedup. */ + readonly id: () => string; + /** Clock returning epoch-ms for `queuedAt`. */ + readonly now: () => number; +} + +/** Surface id this extension contributes (also the manifest + catalog id). */ +export const MESSAGE_QUEUE_SURFACE_ID = "message-queue"; +/** The custom renderer id a frontend switches on to render the queue. */ +export const MESSAGE_QUEUE_RENDERER_ID = "message-queue"; + +/** + * Append a message to a conversation's queue. Mutates `state` and returns the + * CURRENT snapshot (post-append) — a fresh array copy, so callers cannot mutate + * live state through the returned value. + */ +export function enqueue( + state: MessageQueueState, + conversationId: string, + text: string, + deps: QueueDeps, +): QueuedMessage[] { + const message: QueuedMessage = { id: deps.id(), text, queuedAt: deps.now() }; + const existing = state.get(conversationId); + if (existing === undefined) { + state.set(conversationId, [message]); + } else { + existing.push(message); + } + return getQueue(state, conversationId); +} + +/** + * Current queue snapshot for a conversation — a fresh array copy. Empty array + * if the conversation has no queue / is unknown. + */ +export function getQueue(state: MessageQueueState, conversationId: string): QueuedMessage[] { + const existing = state.get(conversationId); + if (existing === undefined) return []; + return [...existing]; +} + +/** + * Drain: return all queued messages for a conversation and CLEAR its queue. + * Returns a fresh array copy of the drained messages (empty array if the queue + * was empty or unknown). The caller (session-orchestrator) combines these into + * a steering ChatMessage; this returns the raw `QueuedMessage[]`, NOT a + * ChatMessage. + */ +export function drain(state: MessageQueueState, conversationId: string): QueuedMessage[] { + const existing = state.get(conversationId); + if (existing === undefined || existing.length === 0) return []; + const drained = [...existing]; + state.delete(conversationId); + return drained; +} + +/** + * Combine drained messages' texts into a single steering string, joined by a + * blank line (`\n\n`). Pure — the session-orchestrator builds the final + * ChatMessage from this. + */ +export function combine(messages: readonly QueuedMessage[]): string { + return messages.map((m) => m.text).join("\n\n"); +} + +/** + * Build the per-conversation surface spec: a single `custom` field whose + * payload is the current queue snapshot (`QueuePayload`). An empty `messages` + * array (idle conversation / post-drain) renders as an empty list. Pure — no + * I/O; the surface-registry re-fetches this on every notify. + */ +export function buildQueueSpec(messages: readonly QueuedMessage[]): SurfaceSpec { + const payload: QueuePayload = { messages }; + const field: CustomField = { + kind: "custom", + rendererId: MESSAGE_QUEUE_RENDERER_ID, + payload, + }; + return { + id: MESSAGE_QUEUE_SURFACE_ID, + region: "side", + title: "Message Queue", + fields: [field], + }; +} diff --git a/packages/message-queue/src/service.test.ts b/packages/message-queue/src/service.test.ts new file mode 100644 index 0000000..04cde91 --- /dev/null +++ b/packages/message-queue/src/service.test.ts @@ -0,0 +1,101 @@ +import type { SurfaceSpec } from "@dispatch/ui-contract"; +import type { QueuedMessage, QueuePayload } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { buildQueueSpec, combine } from "./pure.js"; +import { createMessageQueueService, type MessageQueueDeps } from "./service.js"; + +interface Recorder extends MessageQueueDeps { + readonly calls: { value: number }; +} + +function makeDeps(): Recorder { + let idCounter = 0; + let now = 1_000; + const calls = { value: 0 }; + return { + id: () => `q-${++idCounter}`, + now: () => (now += 10), + notify: () => { + calls.value += 1; + }, + calls, + }; +} + +function queueMessages(spec: SurfaceSpec): readonly QueuedMessage[] { + const field = spec.fields[0]; + if (field === undefined || field.kind !== "custom") { + throw new Error("expected a custom field"); + } + return (field.payload as QueuePayload).messages; +} + +describe("message-queue service", () => { + it("enqueue pushes a surface update (snapshot grew)", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + expect(deps.calls.value).toBe(0); + + const snapshot = svc.enqueue("c1", "hello"); + expect(deps.calls.value).toBe(1); // surface update pushed + expect(snapshot).toHaveLength(1); + const first = snapshot[0]; + if (first === undefined) throw new Error("expected a message"); + expect(first.text).toBe("hello"); + expect(typeof first.id).toBe("string"); + expect(first.queuedAt).toBe(1_010); + + // the surface spec reflects the grown snapshot + expect(queueMessages(buildQueueSpec(svc.getQueue("c1")))).toHaveLength(1); + }); + + it("drain pushes a surface update (empty)", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + svc.enqueue("c1", "a"); + svc.enqueue("c1", "b"); + expect(deps.calls.value).toBe(2); // two enqueues notified + + const drained = svc.drain("c1"); + expect(deps.calls.value).toBe(3); // drain pushed a surface update + expect(drained).toHaveLength(2); + expect(drained.map((m) => m.text)).toEqual(["a", "b"]); + + // cleared + surface now shows empty + expect(svc.getQueue("c1")).toEqual([]); + expect(queueMessages(buildQueueSpec(svc.getQueue("c1")))).toEqual([]); + }); + + it("drain on empty does NOT push a surface update (already empty)", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + + expect(svc.drain("c1")).toEqual([]); + expect(deps.calls.value).toBe(0); // no notify — no change + + // after a real drain empties it, a further drain is a no-op (no notify) + svc.enqueue("c1", "x"); + expect(deps.calls.value).toBe(1); + svc.drain("c1"); + expect(deps.calls.value).toBe(2); + svc.drain("c1"); + expect(deps.calls.value).toBe(2); // unchanged — queue was already empty + }); + + it("getQueue returns current snapshot; empty for unknown conversation", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + expect(svc.getQueue("nope")).toEqual([]); + svc.enqueue("c1", "hi"); + expect(svc.getQueue("c1")).toHaveLength(1); + }); + + it("drain returns the raw QueuedMessage[] the orchestrator combines", () => { + const deps = makeDeps(); + const svc = createMessageQueueService(deps); + svc.enqueue("c1", "alpha"); + svc.enqueue("c1", "beta"); + const drained = svc.drain("c1"); + expect(combine(drained)).toBe("alpha\n\nbeta"); + }); +}); diff --git a/packages/message-queue/src/service.ts b/packages/message-queue/src/service.ts new file mode 100644 index 0000000..91d0fc5 --- /dev/null +++ b/packages/message-queue/src/service.ts @@ -0,0 +1,88 @@ +/** + * Message-queue service — the typed handle the session-orchestrator obtains + * (lazily) to enqueue / read / drain a conversation's steering queue. + * + * The queue is transient + per-conversation (in-memory, only meaningful during + * generation). `drain` returns + clears AND pushes a surface update (empty). + * The orchestrator owns the delivery (drain → combine → inject → carry-to-new- + * turn); this service owns only the queue state, the surface, and the + * drain-and-clear. + */ +import { defineService, type Logger, type ServiceHandle } from "@dispatch/kernel"; +import type { QueuedMessage } from "@dispatch/wire"; +import type { MessageQueueState, QueueDeps } from "./pure.js"; +import { drain as drainQueue, enqueue as enqueueMessage, getQueue as readQueue } from "./pure.js"; + +/** + * The message-queue service interface. Obtained via + * `host.getService(messageQueueHandle)`. + */ +export interface MessageQueueService { + /** Append a message; return the CURRENT queue snapshot (post-append). */ + enqueue(conversationId: string, text: string): QueuedMessage[]; + /** Current queue snapshot (empty array if none / unknown conversation). */ + getQueue(conversationId: string): QueuedMessage[]; + /** + * Drain: return all queued messages, CLEAR the queue, and push a surface + * update (empty). Returns the raw `QueuedMessage[]` (NOT a ChatMessage — + * the caller combines + builds the ChatMessage). Empty array if the queue + * was empty (and then NO surface update is pushed — no change). + */ + drain(conversationId: string): QueuedMessage[]; +} + +/** + * Typed handle anchoring the message-queue service. The single symbol the + * session-orchestrator imports to reach the queue — no string-keyed lookup. + */ +export const messageQueueHandle: ServiceHandle<MessageQueueService> = + defineService<MessageQueueService>("message-queue"); + +/** Deps for the service shell — the pure core's deps plus the surface notifier. */ +export interface MessageQueueDeps extends QueueDeps { + /** + * Notify the surface-registry that the queue changed, so the transport + * re-fetches + pushes a full new SurfaceUpdate. Called ONLY on a real + * change (enqueue → grew; drain-non-empty → emptied); a no-op drain of an + * already-empty queue does NOT notify (no change → no surface push). + */ + readonly notify: () => void; + /** Injected host logger (optional in tests). Logs counts/lengths at debug, never bodies. */ + readonly logger?: Logger; +} + +/** + * Create a MessageQueueService backed by an in-memory per-conversation Map. + * Pure decision logic (enqueue/getQueue/drain) lives in ./pure.js; this shell + * owns the state + the notify edge. State is owned (not ambient): the Map lives + * in this closure, reachable only through the returned service. + */ +export function createMessageQueueService(deps: MessageQueueDeps): MessageQueueService { + const state: MessageQueueState = new Map(); + + return { + enqueue(conversationId, text) { + const snapshot = enqueueMessage(state, conversationId, text, deps); + deps.logger?.debug("message-queue: enqueued", { + conversationId, + queueLen: snapshot.length, + textLen: text.length, + }); + deps.notify(); + return snapshot; + }, + getQueue(conversationId) { + return readQueue(state, conversationId); + }, + drain(conversationId) { + const drained = drainQueue(state, conversationId); + if (drained.length === 0) return drained; + deps.logger?.debug("message-queue: drained", { + conversationId, + count: drained.length, + }); + deps.notify(); + return drained; + }, + }; +} diff --git a/packages/message-queue/tsconfig.json b/packages/message-queue/tsconfig.json new file mode 100644 index 0000000..da5dfb7 --- /dev/null +++ b/packages/message-queue/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [ + { "path": "../kernel" }, + { "path": "../surface-registry" }, + { "path": "../ui-contract" }, + { "path": "../wire" } + ] +} diff --git a/packages/session-orchestrator/package.json b/packages/session-orchestrator/package.json index 47ab1a2..8aa6c0d 100644 --- a/packages/session-orchestrator/package.json +++ b/packages/session-orchestrator/package.json @@ -8,6 +8,7 @@ "dependencies": { "@dispatch/kernel": "workspace:*", "@dispatch/conversation-store": "workspace:*", - "@dispatch/credential-store": "workspace:*" + "@dispatch/credential-store": "workspace:*", + "@dispatch/message-queue": "workspace:*" } } diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index 781164a..6e56c2b 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -2,6 +2,7 @@ import { conversationStoreHandle } from "@dispatch/conversation-store"; import { credentialStoreHandle } from "@dispatch/credential-store"; import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; import { runTurn } from "@dispatch/kernel"; +import { messageQueueHandle } from "@dispatch/message-queue"; import { cacheWarmHandle, createSessionOrchestrator, @@ -49,6 +50,15 @@ export function activate(host: HostAPI): void { logger: host.logger, now: () => Date.now(), emit: (hook, payload) => host.emit(hook, payload), + resolveQueue: () => { + // Lazily resolve the message-queue service. Returns undefined when the + // extension isn't loaded (feature degrades off) — checked via the + // activated-manifests list so `host.getService` is only called when the + // service is registered. Lazy so activation order with message-queue + // doesn't matter; called per-turn / per-enqueue, not at activate time. + const loaded = host.getExtensions().some((m) => m.id === "message-queue"); + return loaded ? host.getService(messageQueueHandle) : undefined; + }, }); host.provideService(sessionOrchestratorHandle, orchestrator); diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts index 711fa5a..afec2b4 100644 --- a/packages/session-orchestrator/src/index.ts +++ b/packages/session-orchestrator/src/index.ts @@ -5,6 +5,8 @@ export { conversationClosed, createSessionOrchestrator, createWarmService, + type EnqueueInput, + type EnqueueResult, type SessionOrchestrator, type SessionOrchestratorBundle, type SessionOrchestratorDeps, diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 5b2f264..3a74c2d 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -15,6 +15,7 @@ import type { UsageEvent, } from "@dispatch/kernel"; import { defineEventHook, defineService, type ServiceHandle } from "@dispatch/kernel"; +import type { MessageQueueService, QueuedMessage } from "@dispatch/message-queue"; import { createMetricsAccumulator } from "./metrics.js"; import { buildUserMessage, @@ -38,6 +39,25 @@ export type StartTurnResult = | { readonly started: true; readonly turnId: string } | { readonly started: false; readonly reason: "already-active" }; +/** Input to `SessionOrchestrator.enqueue` — the single entry transports call. */ +export interface EnqueueInput { + readonly conversationId: string; + readonly text: string; +} + +/** + * Result of `SessionOrchestrator.enqueue`. When `startedTurn` is true the + * conversation was idle and a turn was started (the message is the opening + * prompt — nothing queued). When false the conversation was active: the message + * was enqueued onto the steering queue and `queue` is the post-enqueue snapshot + * (empty when the message-queue extension isn't loaded — degraded: the message + * is dropped, see `enqueue` docs). + */ +export interface EnqueueResult { + readonly startedTurn: boolean; + readonly queue: readonly QueuedMessage[]; +} + export type TurnEventListener = (event: AgentEvent) => void; interface ActiveTurn { @@ -109,6 +129,16 @@ export const cacheWarmHandle: ServiceHandle<WarmService> = defineService<WarmSer export interface SessionOrchestrator { startTurn(input: StartTurnInput): StartTurnResult; + /** + * The single entry transports call to deliver a user message. Owns the + * idle→startTurn vs active→queue decision (no separate `isActive` race — + * `startTurn`'s single-flight guard is authoritative). When the conversation + * is idle, starts a turn (the message is the opening prompt). When active, + * enqueues onto the steering queue (if the message-queue extension is + * loaded); with no queue extension loaded the message is dropped and the + * returned snapshot is empty (degraded — feature off). + */ + enqueue(input: EnqueueInput): EnqueueResult; subscribe(conversationId: string, listener: TurnEventListener): () => void; isActive(conversationId: string): boolean; /** @@ -143,6 +173,16 @@ export interface SessionOrchestratorDeps { modelName: string, ) => { provider: ProviderContract; model: string } | undefined; readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>; + /** + * Lazily resolves the message-queue service (the steering queue), or + * `undefined` when the message-queue extension isn't loaded (the feature + * degrades off: no `drainSteering`, no post-seal carry, `enqueue` drops + * messages when active). host-bin wires this via `host.getService`; the + * orchestrator calls it per-turn / per-enqueue so activation order with the + * message-queue extension doesn't matter. Injected (not ambient) so a turn + * stays reproducible from its inputs and tests use a fake queue. + */ + readonly resolveQueue?: () => MessageQueueService | undefined; /** Apply the per-turn tools filter chain. Injected for testability. */ readonly applyToolsFilter: (assembly: ToolAssembly) => Promise<ToolAssembly>; /** Base logger (auto-scoped to this extension); childed per turn for span capture. */ @@ -184,6 +224,25 @@ export function createSessionOrchestrator( } } + /** + * Post-seal carry: if a steering queue is available and non-empty, drain it, + * combine, and start a NEW detached turn whose opening `user-message` carries + * the combined text (no `steering` event — that's only for mid-turn drain). + * Returns true iff a new turn was started. Called from `runTurnDetached`'s + * finally AFTER `activeTurns.delete` (so the new turn's single-flight guard + * passes) and BEFORE `activeConversations.delete` (skipped when carried, since + * the new turn re-adds it). May chain — the new turn's own finally re-checks. + */ + function tryCarryQueue(conversationId: string): boolean { + const queue = deps.resolveQueue?.(); + if (queue === undefined) return false; + if (queue.getQueue(conversationId).length === 0) return false; + const drained = queue.drain(conversationId); + const combined = drained.map((q) => q.text).join("\n\n"); + const result = orchestrator.startTurn({ conversationId, text: combined }); + return result.started; + } + function runTurnDetached( conversationId: string, text: string, @@ -218,6 +277,7 @@ export function createSessionOrchestrator( }); void (async () => { + let sealed = false; try { const [effectiveCwd, storedEffort] = await Promise.all([ effectiveCwdPromise, @@ -273,6 +333,30 @@ export function createSessionOrchestrator( ...(modelOverride !== undefined ? { model: modelOverride } : {}), }; + // Resolve the steering queue once for this turn. When present, wire + // `drainSteering`: the kernel calls it at the tool-result boundary and + // appends whatever it returns as user-role messages alongside the tool + // results (mid-turn steering). The wrapper emits a `steering` AgentEvent + // into the hub (buffered for late-join like `user-message`) so a + // frontend can place a user bubble in the transcript live; the kernel + // only appends the returned messages — it does NOT emit the event. + const queue = deps.resolveQueue?.(); + const drainSteering = + queue === undefined + ? undefined + : (): readonly ChatMessage[] => { + const queued = queue.drain(conversationId); + if (queued.length === 0) return []; + const steerText = queued.map((q) => q.text).join("\n\n"); + emitToHub(conversationId, { + type: "steering", + conversationId, + turnId, + text: steerText, + }); + return [{ role: "user", chunks: [{ type: "text", text: steerText }] }]; + }; + const opts: RunTurnInput = { provider, messages: [...history, userMsg], @@ -286,6 +370,7 @@ export function createSessionOrchestrator( ...(turnLogger !== undefined ? { logger: turnLogger } : {}), ...(effectiveCwd !== undefined ? { cwd: effectiveCwd } : {}), ...(deps.now !== undefined ? { now: deps.now } : {}), + ...(drainSteering !== undefined ? { drainSteering } : {}), }; const result = await deps.runTurn(opts); @@ -297,6 +382,7 @@ export function createSessionOrchestrator( await deps.conversationStore.appendMetrics(conversationId, turnMetrics); emitToHub(conversationId, { type: "turn-sealed", conversationId, turnId }); + sealed = true; } catch (err) { const message = err instanceof Error ? err.message : String(err); emitToHub(conversationId, { @@ -307,7 +393,16 @@ export function createSessionOrchestrator( }); } finally { activeTurns.delete(conversationId); - activeConversations.delete(conversationId); + // Post-seal carry: if the turn sealed with a non-empty steering queue + // (no tool call fired → drainSteering never drained it), start a NEW + // detached turn whose opening user-message carries the combined text. + // The new turn re-adds to activeTurns + activeConversations, so skip + // the activeConversations.delete when carried. May chain (user keeps + // steering) — each carried turn's own finally re-checks the queue. + const carried = sealed && tryCarryQueue(conversationId); + if (!carried) { + activeConversations.delete(conversationId); + } void payloadPromise.then((payload) => { deps.emit?.(turnSettled, payload); }); @@ -326,6 +421,19 @@ export function createSessionOrchestrator( return { started: true, turnId }; }, + enqueue({ conversationId, text }) { + const result = orchestrator.startTurn({ conversationId, text }); + if (result.started) { + return { startedTurn: true, queue: [] }; + } + // Already active → enqueue onto the steering queue. When the + // message-queue extension isn't loaded this degrades: the message is + // dropped and the snapshot is empty (feature off). + const queue = deps.resolveQueue?.(); + const snapshot = queue !== undefined ? queue.enqueue(conversationId, text) : []; + return { startedTurn: false, queue: snapshot }; + }, + subscribe(conversationId, listener) { let listeners = subscribers.get(conversationId); if (listeners === undefined) { diff --git a/packages/session-orchestrator/src/queue.test.ts b/packages/session-orchestrator/src/queue.test.ts new file mode 100644 index 0000000..c1f12da --- /dev/null +++ b/packages/session-orchestrator/src/queue.test.ts @@ -0,0 +1,497 @@ +import type { ConversationStore } from "@dispatch/conversation-store"; +import type { + AgentEvent, + ChatMessage, + ProviderContract, + ProviderEvent, + ReasoningEffort, + RunTurnInput, + RunTurnResult, + StoredChunk, + ToolContract, + TurnMetrics, +} from "@dispatch/kernel"; +import { runTurn } from "@dispatch/kernel"; +import { createMessageQueueService } from "@dispatch/message-queue"; +import { describe, expect, it } from "vitest"; +import { createSessionOrchestrator } from "./orchestrator.js"; +import type { ToolAssembly } from "./tools-filter.js"; + +// --- Shared test helpers (duplicated from orchestrator.test.ts per isolation-over-dRY; +// a shared test-helper module wired between test files is a coupling smell) --- + +function createInMemoryStore(): ConversationStore & { + readonly data: Map<string, ChatMessage[]>; + readonly metricsData: Map<string, TurnMetrics[]>; + readonly cwdData: Map<string, string>; + readonly effortData: Map<string, ReasoningEffort>; +} { + const data = new Map<string, ChatMessage[]>(); + const metricsData = new Map<string, TurnMetrics[]>(); + const cwdData = new Map<string, string>(); + const effortData = new Map<string, ReasoningEffort>(); + return { + data, + metricsData, + cwdData, + effortData, + async append(conversationId, messages) { + const existing = data.get(conversationId) ?? []; + data.set(conversationId, [...existing, ...messages]); + }, + async load(conversationId) { + return [...(data.get(conversationId) ?? [])]; + }, + async loadSince(conversationId, sinceSeq) { + const messages = data.get(conversationId) ?? []; + const result: StoredChunk[] = []; + let seq = 1; + for (const msg of messages) { + for (const chunk of msg.chunks) { + if (sinceSeq === undefined || seq > sinceSeq) { + result.push({ seq, role: msg.role, chunk }); + } + seq++; + } + } + return result; + }, + async appendMetrics(conversationId, metrics) { + const existing = metricsData.get(conversationId) ?? []; + metricsData.set(conversationId, [...existing, metrics]); + }, + async loadMetrics(conversationId) { + return [...(metricsData.get(conversationId) ?? [])]; + }, + async getCwd(conversationId) { + return cwdData.get(conversationId) ?? null; + }, + async setCwd(conversationId, cwd) { + cwdData.set(conversationId, cwd); + }, + async getReasoningEffort(conversationId) { + return effortData.get(conversationId) ?? null; + }, + async setReasoningEffort(conversationId, effort) { + effortData.set(conversationId, effort); + }, + }; +} + +function identityApplyToolsFilter(assembly: ToolAssembly): Promise<ToolAssembly> { + return Promise.resolve(assembly); +} + +function noTools(): readonly ToolContract[] { + return []; +} + +function simpleProvider(): ProviderContract { + return { + id: "fake", + stream: async function* () { + yield { type: "text-delta", delta: "ok" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + }, + }; +} + +/** + * A capturing runTurn that simulates the kernel calling `drainSteering` at the + * tool-result boundary. It records the RunTurnInput (so the test can assert + * drainSteering was wired) and collects what drainSteering returned. NOT a mock + * of @dispatch/* — it's a plain fake of the outermost runTurn edge. + */ +function createDrainingCaptureRunTurn(): { + captured: RunTurnInput[]; + drainedMessages: ChatMessage[]; + wasDrainCalled: () => boolean; + runTurn: (input: RunTurnInput) => Promise<RunTurnResult>; +} { + const captured: RunTurnInput[] = []; + const drainedMessages: ChatMessage[] = []; + let drainCalled = false; + return { + captured, + drainedMessages, + wasDrainCalled: () => drainCalled, + runTurn: async (input) => { + captured.push(input); + if (input.drainSteering !== undefined) { + drainCalled = true; + const drained = input.drainSteering(); + drainedMessages.push(...drained); + } + return { + messages: [{ role: "assistant", chunks: [{ type: "text", text: "ok" }] }], + usage: { inputTokens: 1, outputTokens: 1 }, + finishReason: "stop", + }; + }, + }; +} + +function waitForSealed( + orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"], + conversationId: string, +): Promise<void> { + return new Promise((resolve) => { + const unsub = orchestrator.subscribe(conversationId, (e) => { + if (e.type === "turn-sealed") { + unsub(); + resolve(); + } + }); + }); +} + +function waitForSealedCount( + orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"], + conversationId: string, + count: number, +): Promise<void> { + return new Promise((resolve) => { + let seen = 0; + const unsub = orchestrator.subscribe(conversationId, (e) => { + if (e.type === "turn-sealed") { + seen++; + if (seen >= count) { + unsub(); + resolve(); + } + } + }); + }); +} + +function isSteering(e: AgentEvent): e is Extract<AgentEvent, { type: "steering" }> { + return e.type === "steering"; +} + +function isUserMessage(e: AgentEvent): e is Extract<AgentEvent, { type: "user-message" }> { + return e.type === "user-message"; +} + +function createTestQueue() { + return createMessageQueueService({ + id: () => `q-${Math.random().toString(36).slice(2, 8)}`, + now: () => 1000, + notify: () => {}, + }); +} + +// --- drainSteering (mid-turn, at the tool-result boundary) --- + +describe("drainSteering", () => { + it("drainSteering drains the queue + emits a steering event + returns one combined user message", async () => { + const store = createInMemoryStore(); + const queue = createTestQueue(); + queue.enqueue("conv-drain", "first"); + queue.enqueue("conv-drain", "second"); + + const { captured, drainedMessages, runTurn: captureRunTurn } = createDrainingCaptureRunTurn(); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => ({ id: "p", stream: async function* () {} }), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn: captureRunTurn, + resolveQueue: () => queue, + }); + + const events: AgentEvent[] = []; + const unsub = orchestrator.subscribe("conv-drain", (e) => events.push(e)); + + orchestrator.startTurn({ conversationId: "conv-drain", text: "go" }); + await waitForSealed(orchestrator, "conv-drain"); + unsub(); + + // drainSteering was wired on the RunTurnInput + expect(captured).toHaveLength(1); + expect(captured[0]?.drainSteering).toBeDefined(); + expect(typeof captured[0]?.drainSteering).toBe("function"); + + // The fake runTurn called drainSteering → returned one combined user message + expect(drainedMessages).toHaveLength(1); + const steerMsg = drainedMessages[0]; + if (steerMsg === undefined) throw new Error("expected drained message"); + expect(steerMsg.role).toBe("user"); + expect(steerMsg.chunks).toHaveLength(1); + const chunk = steerMsg.chunks[0]; + if (chunk === undefined) throw new Error("expected chunk"); + expect(chunk.type).toBe("text"); + if (chunk.type === "text") { + expect(chunk.text).toBe("first\n\nsecond"); + } + + // The queue was drained (cleared) + expect(queue.getQueue("conv-drain")).toHaveLength(0); + + // A steering event was emitted into the hub with the combined text + const steering = events.find(isSteering); + expect(steering).toBeDefined(); + expect(steering?.conversationId).toBe("conv-drain"); + expect(steering?.text).toBe("first\n\nsecond"); + expect(steering?.turnId).toMatch(/^turn-/); + }); + + it("drainSteering on an empty queue returns [] and emits nothing", async () => { + const store = createInMemoryStore(); + const queue = createTestQueue(); + + const { + drainedMessages, + wasDrainCalled, + runTurn: captureRunTurn, + } = createDrainingCaptureRunTurn(); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => ({ id: "p", stream: async function* () {} }), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn: captureRunTurn, + resolveQueue: () => queue, + }); + + const events: AgentEvent[] = []; + const unsub = orchestrator.subscribe("conv-empty", (e) => events.push(e)); + + orchestrator.startTurn({ conversationId: "conv-empty", text: "go" }); + await waitForSealed(orchestrator, "conv-empty"); + unsub(); + + // drainSteering was wired and called, but returned [] + expect(wasDrainCalled()).toBe(true); + expect(drainedMessages).toHaveLength(0); + + // No steering event was emitted + expect(events.filter(isSteering)).toHaveLength(0); + }); + + it("no queue ext (resolveQueue undefined) → drainSteering omitted; turn unchanged", async () => { + const store = createInMemoryStore(); + + const { captured, wasDrainCalled, runTurn: captureRunTurn } = createDrainingCaptureRunTurn(); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => ({ id: "p", stream: async function* () {} }), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn: captureRunTurn, + // resolveQueue intentionally omitted — feature degrades off + }); + + const events: AgentEvent[] = []; + const unsub = orchestrator.subscribe("conv-noqueue", (e) => events.push(e)); + + orchestrator.startTurn({ conversationId: "conv-noqueue", text: "go" }); + await waitForSealed(orchestrator, "conv-noqueue"); + unsub(); + + // drainSteering is absent from the RunTurnInput (not undefined — omitted) + expect(captured).toHaveLength(1); + expect(captured[0]?.drainSteering).toBeUndefined(); + expect(wasDrainCalled()).toBe(false); + + // No steering event; turn sealed normally + expect(events.filter(isSteering)).toHaveLength(0); + expect(events.filter((e) => e.type === "turn-sealed")).toHaveLength(1); + }); +}); + +// --- Post-seal carry (turn ended with a non-empty queue → new turn) --- + +describe("post-seal carry", () => { + it("post-seal: non-empty queue → a new turn starts with the combined message", async () => { + const store = createInMemoryStore(); + const queue = createTestQueue(); + queue.enqueue("conv-carry", "queued-a"); + queue.enqueue("conv-carry", "queued-b"); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => simpleProvider(), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn, + resolveQueue: () => queue, + }); + + const events: AgentEvent[] = []; + const unsub = orchestrator.subscribe("conv-carry", (e) => events.push(e)); + + orchestrator.startTurn({ conversationId: "conv-carry", text: "original" }); + // Wait for the original turn + the carried turn to both seal. + await waitForSealedCount(orchestrator, "conv-carry", 2); + unsub(); + + // Two user-message events: the original prompt + the carried combined text. + const userMessages = events.filter(isUserMessage); + expect(userMessages).toHaveLength(2); + expect(userMessages[0]?.text).toBe("original"); + expect(userMessages[1]?.text).toBe("queued-a\n\nqueued-b"); + + // No steering event — the carry case emits user-message, not steering. + expect(events.filter(isSteering)).toHaveLength(0); + + // The queue was drained by the carry. + expect(queue.getQueue("conv-carry")).toHaveLength(0); + + // Both turns persisted (original + carry). + expect(store.data.get("conv-carry")?.length).toBeGreaterThanOrEqual(4); + }); + + it("post-seal: empty queue → no new turn", async () => { + const store = createInMemoryStore(); + const queue = createTestQueue(); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => simpleProvider(), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn, + resolveQueue: () => queue, + }); + + const events: AgentEvent[] = []; + const unsub = orchestrator.subscribe("conv-no-carry", (e) => events.push(e)); + + orchestrator.startTurn({ conversationId: "conv-no-carry", text: "original" }); + await waitForSealed(orchestrator, "conv-no-carry"); + // Give the carry check a chance to run (it's in the finally, synchronous + // after turn-sealed, but await yields first). + await new Promise<void>((resolve) => setTimeout(resolve, 10)); + unsub(); + + // Only one user-message (the original) — no carry turn. + expect(events.filter(isUserMessage)).toHaveLength(1); + expect(events.filter((e) => e.type === "turn-sealed")).toHaveLength(1); + }); +}); + +// --- enqueue facade (the single entry transports call) --- + +describe("enqueue", () => { + it("enqueue when idle → starts a turn (startedTurn:true)", async () => { + const store = createInMemoryStore(); + const queue = createTestQueue(); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => simpleProvider(), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn, + resolveQueue: () => queue, + }); + + const result = orchestrator.enqueue({ conversationId: "conv-idle", text: "hello" }); + expect(result.startedTurn).toBe(true); + expect(result.queue).toHaveLength(0); + + await waitForSealed(orchestrator, "conv-idle"); + + // The turn ran and persisted. + expect(store.data.get("conv-idle")).toBeDefined(); + expect(store.data.get("conv-idle")?.length).toBeGreaterThanOrEqual(2); + }); + + it("enqueue when active → queues (startedTurn:false, snapshot with the message)", async () => { + const store = createInMemoryStore(); + const queue = createTestQueue(); + + let resolveFirst: (() => void) | undefined; + const firstBlocker = new Promise<void>((resolve) => { + resolveFirst = resolve; + }); + let callCount = 0; + const blockingFirstRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => { + callCount++; + if (callCount === 1) { + await firstBlocker; + } + return { + messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }], + usage: { inputTokens: 1, outputTokens: 1 }, + finishReason: "stop", + }; + }; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => simpleProvider(), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn: blockingFirstRunTurn, + resolveQueue: () => queue, + }); + + // Start the original turn (it blocks in runTurn). + orchestrator.startTurn({ conversationId: "conv-active", text: "first" }); + // Let the turn reach the blocked runTurn call. + await new Promise<void>((resolve) => setTimeout(resolve, 10)); + + // Enqueue while active. + const result = orchestrator.enqueue({ conversationId: "conv-active", text: "second" }); + expect(result.startedTurn).toBe(false); + expect(result.queue).toHaveLength(1); + expect(result.queue[0]?.text).toBe("second"); + + // The queue holds the enqueued message. + expect(queue.getQueue("conv-active")).toHaveLength(1); + + // Release the original turn → it seals → post-seal carry starts a new + // turn with the enqueued message. Subscribe before releasing to catch + // both turn-sealed events. + const sealed = waitForSealedCount(orchestrator, "conv-active", 2); + resolveFirst?.(); + await sealed; + }); + + it("enqueue when active + no queue ext → startedTurn:false, empty queue (degraded)", async () => { + const store = createInMemoryStore(); + + let resolveFirst: (() => void) | undefined; + const firstBlocker = new Promise<void>((resolve) => { + resolveFirst = resolve; + }); + let callCount = 0; + const blockingFirstRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => { + callCount++; + if (callCount === 1) { + await firstBlocker; + } + return { + messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }], + usage: { inputTokens: 1, outputTokens: 1 }, + finishReason: "stop", + }; + }; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => simpleProvider(), + resolveTools: noTools, + applyToolsFilter: identityApplyToolsFilter, + runTurn: blockingFirstRunTurn, + // resolveQueue omitted — no queue extension loaded (degraded) + }); + + orchestrator.startTurn({ conversationId: "conv-degraded", text: "first" }); + await new Promise<void>((resolve) => setTimeout(resolve, 10)); + + // Enqueue while active, but no queue ext → message dropped, empty snapshot. + const result = orchestrator.enqueue({ conversationId: "conv-degraded", text: "second" }); + expect(result.startedTurn).toBe(false); + expect(result.queue).toHaveLength(0); + + // Release the original turn; no carry (no queue ext). + const sealed = waitForSealed(orchestrator, "conv-degraded"); + resolveFirst?.(); + await sealed; + }); +}); diff --git a/packages/session-orchestrator/tsconfig.json b/packages/session-orchestrator/tsconfig.json index 4401407..782c1c8 100644 --- a/packages/session-orchestrator/tsconfig.json +++ b/packages/session-orchestrator/tsconfig.json @@ -5,6 +5,7 @@ "references": [ { "path": "../kernel" }, { "path": "../conversation-store" }, - { "path": "../credential-store" } + { "path": "../credential-store" }, + { "path": "../message-queue" } ] } diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 3a0a983..ec7ccd1 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.11.0", + "version": "0.12.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index e992f8b..e71feb7 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -20,10 +20,17 @@ */ import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; -import type { AgentEvent, ReasoningEffort, StoredChunk, TurnMetrics } from "@dispatch/wire"; +import type { + AgentEvent, + QueuedMessage, + ReasoningEffort, + StoredChunk, + TurnMetrics, +} from "@dispatch/wire"; export type { AgentEvent, + QueuedMessage, ReasoningEffort, StepMetrics, StoredChunk, @@ -249,6 +256,41 @@ export interface CloseConversationResponse { readonly abortedTurn: boolean; } +// ─── Message queue (steering) ───────────────────────────────────────────────── + +/** + * Request body for `POST /conversations/:id/queue` — enqueue a user message + * onto a conversation's message queue for mid-turn steering delivery. + * + * When a turn is ACTIVE for the conversation, the message is appended to the + * queue (the message-queue extension's per-conversation SURFACE updates) and + * delivered at the next tool-result boundary as a steering message the model + * sees alongside the tool results (a `steering` `AgentEvent` is emitted). When + * NO turn is active, enqueuing instead STARTS a new turn with the message as its + * opening prompt (equivalent to `POST /chat`) — so a fire-and-forget enqueue + * works regardless of generation state. The resolved queue + whether a turn was + * started are returned in `QueueResponse`. + * + * `text` must be non-empty (after trim) → HTTP 400 `{ error }` otherwise. + */ +export interface QueueRequest { + readonly text: string; +} + +/** + * Response body for `POST /conversations/:id/queue` — the conversation's queue + * snapshot AFTER the enqueue, so a client renders the queue from this alone. + * `conversationId` echoes the path. `startedTurn` is true when no turn was + * active and the enqueue started a new turn (the message is now the turn's + * opening prompt, not a queued steering message); the turn's events stream on + * the chat channel as usual. + */ +export interface QueueResponse { + readonly conversationId: string; + readonly startedTurn: boolean; + readonly queue: readonly QueuedMessage[]; +} + // ─── Per-conversation LSP status ────────────────────────────────────────────── /** The connection state of a single language server for a workspace. */ @@ -404,6 +446,23 @@ export interface ChatUnsubscribeMessage { } /** + * Client → server: enqueue a message onto a conversation's message queue while + * a turn is generating (steering). The WebSocket counterpart of the HTTP + * `POST /conversations/:id/queue` (`QueueRequest`). Fire-and-forget: success is + * confirmed by the message-queue SURFACE updating (the FE renders the queue + * from the surface, not from a reply here); a failure (malformed/empty text, + * unknown conversation) arrives as a `chat.error`. When no turn is active, the + * enqueue starts a new turn (the turn's events stream as `chat.delta`s), so a + * client reuses this op for both "queue while generating" and "send" (the + * latter being equivalent to `chat.send`). + */ +export interface ChatQueueMessage { + readonly type: "chat.queue"; + readonly conversationId: string; + readonly text: string; +} + +/** * Every client → server WS message: surface ops (`@dispatch/ui-contract`) + chat * ops. A server discriminates on `type`. */ @@ -411,7 +470,8 @@ export type WsClientMessage = | SurfaceClientMessage | ChatSendMessage | ChatSubscribeMessage - | ChatUnsubscribeMessage; + | ChatUnsubscribeMessage + | ChatQueueMessage; /** * Every server → client WS message: surface ops (`@dispatch/ui-contract`) + chat diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index 1f95dd8..49e240f 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -8,7 +8,11 @@ import type { TurnMetrics, } from "@dispatch/kernel"; import { createThroughputStore, dayKeyOf } from "@dispatch/throughput-store"; -import type { ThroughputResponse } from "@dispatch/transport-contract"; +import type { + QueuedMessage, + QueueResponse, + ThroughputResponse, +} from "@dispatch/transport-contract"; import { describe, expect, it } from "vitest"; import { createApp } from "./app.js"; import type { @@ -132,6 +136,9 @@ function createFakeOrchestrator(events: AgentEvent[]): SessionOrchestrator { isActive() { return false; }, + enqueue() { + return { startedTurn: false, queue: [] }; + }, closeConversation() { return { abortedTurn: false }; }, @@ -162,6 +169,9 @@ function createCapturingOrchestrator(): SessionOrchestrator & { isActive() { return false; }, + enqueue() { + return { startedTurn: false, queue: [] }; + }, closeConversation() { return { abortedTurn: false }; }, @@ -182,6 +192,9 @@ function createThrowingOrchestrator(error: Error): SessionOrchestrator { isActive() { return false; }, + enqueue() { + return { startedTurn: false, queue: [] }; + }, closeConversation() { return { abortedTurn: false }; }, @@ -1319,6 +1332,269 @@ describe("POST /conversations/:id/close", () => { }); }); +describe("POST /conversations/:id/queue", () => { + it("with valid text → 200 + QueueResponse (startedTurn + queue)", async () => { + const queue: readonly QueuedMessage[] = [ + { id: "q1", text: "queued-msg", queuedAt: 1700000000000 }, + ]; + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + enqueue() { + return { startedTurn: false, queue }; + }, + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: "hello" }), + }); + + expect(res.status).toBe(200); + const body = (await res.json()) as QueueResponse; + expect(body.conversationId).toBe("conv1"); + expect(body.startedTurn).toBe(false); + expect(body.queue).toEqual(queue); + }); + + it("with empty/whitespace text → 400 { error } and enqueue is never called", async () => { + let enqueueCalled = false; + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + enqueue() { + enqueueCalled = true; + return { startedTurn: false, queue: [] }; + }, + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: " " }), + }); + + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("text"); + expect(enqueueCalled).toBe(false); + }); + + it("with missing text field → 400 { error } and enqueue is never called", async () => { + let enqueueCalled = false; + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + enqueue() { + enqueueCalled = true; + return { startedTurn: false, queue: [] }; + }, + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }); + + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("text"); + expect(enqueueCalled).toBe(false); + }); + + it("enqueue returns startedTurn:true (was idle) → response echoes it", async () => { + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + enqueue() { + return { startedTurn: true, queue: [] }; + }, + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv-idle/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: "go" }), + }); + + expect(res.status).toBe(200); + const body = (await res.json()) as QueueResponse; + expect(body.conversationId).toBe("conv-idle"); + expect(body.startedTurn).toBe(true); + expect(body.queue).toEqual([]); + }); + + it("enqueue returns startedTurn:false (was active) → response carries the queue snapshot", async () => { + const queue: readonly QueuedMessage[] = [ + { id: "q1", text: "second", queuedAt: 1700000000000 }, + { id: "q2", text: "third", queuedAt: 1700000001000 }, + ]; + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + enqueue() { + return { startedTurn: false, queue }; + }, + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv-active/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: "steer" }), + }); + + expect(res.status).toBe(200); + const body = (await res.json()) as QueueResponse; + expect(body.conversationId).toBe("conv-active"); + expect(body.startedTurn).toBe(false); + expect(body.queue).toEqual(queue); + }); + + it("forwards the path conversationId and trimmed text to enqueue", async () => { + const calls: { conversationId: string; text: string }[] = []; + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + enqueue(input) { + calls.push(input); + return { startedTurn: false, queue: [] }; + }, + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv-1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: " hello world " }), + }); + + expect(res.status).toBe(200); + expect(calls).toHaveLength(1); + expect(calls[0]?.conversationId).toBe("conv-1"); + expect(calls[0]?.text).toBe("hello world"); + }); + + it("returns 400 for invalid JSON body", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "not json", + }); + + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("JSON"); + }); + + it("returns 400 for a non-string text", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + + const res = await app.request("/conversations/conv1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: 42 }), + }); + + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("text"); + }); + + it("logs an info line on success and never logs the enqueued text", async () => { + const logger = createFakeLogger(); + const orchestrator: SessionOrchestrator = { + ...createFakeOrchestrator([]), + enqueue() { + return { startedTurn: true, queue: [] }; + }, + }; + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator, + credentialStore: createFakeCredentialStore([]), + logger, + }); + + await app.request("/conversations/conv1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: "secret-ish user message" }), + }); + + const infoLogs = logger.records.filter((r) => r.level === "info"); + expect(infoLogs).toHaveLength(1); + expect(infoLogs[0]?.msg).toBe("conversations: enqueued"); + expect(infoLogs[0]?.attrs?.conversationId).toBe("conv1"); + expect(infoLogs[0]?.attrs?.startedTurn).toBe(true); + expect(infoLogs[0]?.attrs?.queueLength).toBe(0); + // Restraint: the user's message text is never logged (mirrors POST /chat). + expect(JSON.stringify(logger.records)).not.toContain("secret-ish user message"); + }); + + it("logs a warn on a malformed body (400)", async () => { + const logger = createFakeLogger(); + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger, + }); + + await app.request("/conversations/conv1/queue", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ text: "" }), + }); + + const warnLogs = logger.records.filter((r) => r.level === "warn"); + expect(warnLogs.length).toBeGreaterThanOrEqual(1); + expect(warnLogs[0]?.msg).toBe("conversations/queue: validation failed"); + }); +}); + describe("GET /conversations/:id/cwd", () => { it("returns null when unset", async () => { const app = createApp({ diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 2788bf9..8cb85c9 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -7,6 +7,7 @@ import type { LspServerInfo, LspStatusResponse, ModelsResponse, + QueueResponse, ReasoningEffortResponse, ThroughputResponse, WarmResponse, @@ -21,6 +22,7 @@ import { isSinceSeqError, isWindowParamError, parseChatBody, + parseQueueBody, parseReasoningEffortBody, parseSinceSeq, parseWarmBody, @@ -370,6 +372,40 @@ export function createApp(opts: CreateServerOptions): Hono { return c.json(body, 200); }); + app.post("/conversations/:id/queue", async (c) => { + const conversationId = c.req.param("id"); + + let body: unknown; + try { + body = await c.req.json(); + } catch { + log.warn("conversations/queue: invalid JSON body"); + return c.json({ error: "Invalid JSON body" }, 400); + } + + const parsed = parseQueueBody(body); + if (isParseError(parsed)) { + log.warn("conversations/queue: validation failed", { reason: parsed.error }); + return c.json({ error: parsed.error }, 400); + } + + // `enqueue` is synchronous and owns the idle→startTurn vs active→queue + // decision (no separate `isActive` race) — it does not throw for an + // unknown/idle conversation, which instead starts a turn. Mirrors the + // direct sync call used by `POST /conversations/:id/close`. + const { startedTurn, queue } = opts.orchestrator.enqueue({ + conversationId, + text: parsed.text, + }); + log.info("conversations: enqueued", { + conversationId, + startedTurn, + queueLength: queue.length, + }); + const response: QueueResponse = { conversationId, startedTurn, queue }; + return c.json(response, 200); + }); + app.get("/conversations/:id/cwd", async (c) => { const conversationId = c.req.param("id"); try { diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts index ab23b65..3fcc473 100644 --- a/packages/transport-http/src/extension.ts +++ b/packages/transport-http/src/extension.ts @@ -31,6 +31,7 @@ export const manifest: Manifest = { "/conversations/:id/close", "/conversations/:id/cwd", "/conversations/:id/lsp", + "/conversations/:id/queue", "/conversations/:id/reasoning-effort", "/health", "/models", diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts index b231b7e..735dc38 100644 --- a/packages/transport-http/src/index.ts +++ b/packages/transport-http/src/index.ts @@ -5,6 +5,7 @@ export type { ChatCommand, ParseError, ParseResult, + QueueBodyParsed, SinceSeqResult, WarmBodyParsed, WindowParamResult, @@ -17,6 +18,7 @@ export { isValidReasoningEffort, isWindowParamError, parseChatBody, + parseQueueBody, parseReasoningEffortBody, parseSinceSeq, parseWindowParam, diff --git a/packages/transport-http/src/logic.test.ts b/packages/transport-http/src/logic.test.ts index f91b38c..40a82fd 100644 --- a/packages/transport-http/src/logic.test.ts +++ b/packages/transport-http/src/logic.test.ts @@ -8,6 +8,7 @@ import { isValidReasoningEffort, isWindowParamError, parseChatBody, + parseQueueBody, parseReasoningEffortBody, parseSinceSeq, parseWindowParam, @@ -368,3 +369,60 @@ describe("parseReasoningEffortBody", () => { expect(isReasoningEffortParseError(parseReasoningEffortBody("string"))).toBe(true); }); }); + +describe("parseQueueBody", () => { + it("returns error for null body", () => { + const result = parseQueueBody(null); + expect(isParseError(result)).toBe(true); + if (isParseError(result)) { + expect(result.error).toContain("JSON object"); + } + }); + + it("returns error for non-object body", () => { + const result = parseQueueBody("hello"); + expect(isParseError(result)).toBe(true); + }); + + it("returns error when text is missing", () => { + const result = parseQueueBody({}); + expect(isParseError(result)).toBe(true); + if (isParseError(result)) { + expect(result.error).toContain("text"); + } + }); + + it("returns error when text is empty string", () => { + const result = parseQueueBody({ text: "" }); + expect(isParseError(result)).toBe(true); + }); + + it("returns error when text is whitespace only", () => { + const result = parseQueueBody({ text: " " }); + expect(isParseError(result)).toBe(true); + }); + + it("returns error when text is not a string", () => { + const result = parseQueueBody({ text: 42 }); + expect(isParseError(result)).toBe(true); + if (isParseError(result)) { + expect(result.error).toContain("text"); + } + }); + + it("returns the trimmed text for a valid body", () => { + const result = parseQueueBody({ text: "hello" }); + expect(isParseError(result)).toBe(false); + if (!isParseError(result)) { + expect(result.text).toBe("hello"); + } + }); + + it("trims text whitespace", () => { + const result = parseQueueBody({ text: " hello world " }); + expect(isParseError(result)).toBe(false); + if (!isParseError(result)) { + expect(result.text).toBe("hello world"); + } + }); +}); diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts index e0adfeb..aa5394c 100644 --- a/packages/transport-http/src/logic.ts +++ b/packages/transport-http/src/logic.ts @@ -73,8 +73,8 @@ export function parseChatBody(body: unknown, generateId: () => string): ParseRes return result; } -export function isParseError(result: ParseResult): result is ParseError { - return "error" in result; +export function isParseError<T>(result: T | ParseError): result is ParseError { + return typeof result === "object" && result !== null && "error" in result; } export function serializeEventLine(event: AgentEvent): string { @@ -172,6 +172,37 @@ export function computeExpectedCacheRate( return Math.round((cacheReadTokens / denom) * 100); } +/** + * Parsed body for `POST /conversations/:id/queue` (`QueueRequest`). Only the + * `text` field — `conversationId` comes from the path param, not the body, so it + * is deliberately NOT part of this parse result. + */ +export interface QueueBodyParsed { + readonly text: string; +} + +/** + * Parse + validate a `POST /conversations/:id/queue` body (`QueueRequest`). + * `text` must be a non-empty string after trim — invalid/missing → + * {@link ParseError}. The TRIMMED text is returned (forwarded to + * `orchestrator.enqueue`), mirroring how `parseChatBody` forwards a trimmed + * `message`. + */ +export function parseQueueBody(body: unknown): QueueBodyParsed | ParseError { + if (body === null || typeof body !== "object") { + return { error: "Request body must be a JSON object" }; + } + + const obj = body as Record<string, unknown>; + + const text = obj.text; + if (typeof text !== "string" || text.trim().length === 0) { + return { error: "Field 'text' is required and must be a non-empty string" }; + } + + return { text: text.trim() }; +} + export function parseReasoningEffortBody(body: unknown): ReasoningEffort | ParseError { if (body === null || typeof body !== "object") { return { error: "Request body must be a JSON object" }; diff --git a/packages/transport-http/src/server.bun.test.ts b/packages/transport-http/src/server.bun.test.ts index 36b05a5..151ad24 100644 --- a/packages/transport-http/src/server.bun.test.ts +++ b/packages/transport-http/src/server.bun.test.ts @@ -68,6 +68,9 @@ function fakeOrchestrator(): SessionOrchestrator { isActive() { return false; }, + enqueue() { + return { startedTurn: false, queue: [] }; + }, closeConversation() { return { abortedTurn: false }; }, diff --git a/packages/transport-ws/src/extension.ts b/packages/transport-ws/src/extension.ts index 332ebe0..7a4e707 100644 --- a/packages/transport-ws/src/extension.ts +++ b/packages/transport-ws/src/extension.ts @@ -255,6 +255,28 @@ export function createTransportWsExtension(): Extension { break; } + case "chat-queue": { + // Fire-and-forget: success is confirmed by the message-queue + // SURFACE updating (startedTurn:false) or by streaming + // chat.deltas (startedTurn:true), NOT by a reply here. On + // startedTurn:true the sender is auto-subscribed so the new + // turn's events stream to it (same as chat.send); on + // startedTurn:false (queued for steering) we emit NOTHING + // back and do not auto-subscribe. + const enqueueResult = orchestrator.enqueue({ + conversationId: result.conversationId, + text: result.text, + }); + if (enqueueResult.startedTurn) { + ensureChatSubscribed(ws, state, result.conversationId); + } + logger.info?.("transport-ws: chat.queue accepted", { + conversationId: result.conversationId, + startedTurn: enqueueResult.startedTurn, + }); + break; + } + case "chat-error": { logger.warn?.("transport-ws: malformed chat.send", { reason: result.errorMessage, diff --git a/packages/transport-ws/src/index.ts b/packages/transport-ws/src/index.ts index baa6b0f..e0cc66b 100644 --- a/packages/transport-ws/src/index.ts +++ b/packages/transport-ws/src/index.ts @@ -1,6 +1,7 @@ export { createTransportWsExtension } from "./extension.js"; export { manifest } from "./manifest.js"; export type { + ChatQueueRouteResult, ChatRouteError, ChatRouteResult, ChatSubscribeRouteResult, diff --git a/packages/transport-ws/src/router.test.ts b/packages/transport-ws/src/router.test.ts index a0ad104..66e2611 100644 --- a/packages/transport-ws/src/router.test.ts +++ b/packages/transport-ws/src/router.test.ts @@ -1,7 +1,8 @@ import type { SurfaceContext, SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; +import type { WsClientMessage } from "@dispatch/transport-contract"; import type { SurfaceCatalogEntry, SurfaceSpec } from "@dispatch/ui-contract"; import { describe, expect, it } from "vitest"; -import { catalogMessage, routeClientMessage, subKey } from "./router.js"; +import { catalogMessage, type RouteResult, routeClientMessage, subKey } from "./router.js"; // ── Fake in-memory registry (no mocks — just a plain implementation) ──────── @@ -450,6 +451,114 @@ describe("routeClientMessage", () => { expect(result).toEqual({ kind: "chat-unsubscribe", conversationId: "conv-abc" }); }); }); + + describe("chat.queue", () => { + it("routes a valid chat.queue → { kind: 'chat-queue', conversationId, text } (what the shell passes to orchestrator.enqueue)", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "chat.queue", + conversationId: "conv-1", + text: "steer here", + }); + + expect(result).toEqual({ + kind: "chat-queue", + conversationId: "conv-1", + text: "steer here", + }); + }); + + it("rejects empty/whitespace text → chat-error (no enqueue signal)", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + for (const text of ["", " ", "\t\n"]) { + const result = routeClientMessage(registry, connSubs, { + type: "chat.queue", + conversationId: "conv-1", + text, + }); + + expect(result.kind).toBe("chat-error"); + if (result.kind !== "chat-error") throw new Error("expected chat-error"); + expect(result.conversationId).toBe("conv-1"); + expect(result.errorMessage).toContain("non-empty string"); + expect(result.errorMessage).toContain("text"); + } + }); + + it("rejects missing text → chat-error (no enqueue signal)", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "chat.queue", + conversationId: "conv-1", + text: undefined as unknown as string, + }); + + expect(result.kind).toBe("chat-error"); + if (result.kind !== "chat-error") throw new Error("expected chat-error"); + expect(result.errorMessage).toContain("non-empty string"); + }); + + it("does not trim the stored text — passes the original through to the shell", () => { + const registry = fakeRegistry([]); + const connSubs = new Set<string>(); + + // Non-empty after trim (so valid), but the value carries surrounding + // whitespace: the router passes it through unchanged (validation uses + // trim; the orchestrator receives the original text). + const result = routeClientMessage(registry, connSubs, { + type: "chat.queue", + conversationId: "conv-1", + text: " steer ", + }); + + expect(result.kind).toBe("chat-queue"); + if (result.kind !== "chat-queue") throw new Error("expected chat-queue"); + expect(result.text).toBe(" steer "); + }); + }); + + describe("exhaustive switch (regression guard for Wave-0 fan-out)", () => { + // Every WsClientMessage variant must route to a defined result with a + // known kind — no fall-through / undefined return. If the union is + // widened again, `tsc` catches the missing case (the switch is + // exhaustive); this test guards the runtime side of that contract. + it("routes every WsClientMessage variant to a defined RouteResult", () => { + const provider = fakeProvider("a", "Surface A", ["toggle"]); + const registry = fakeRegistry([provider]); + const connSubs = new Set<string>(); + + const samples: WsClientMessage[] = [ + { type: "subscribe", surfaceId: "a" }, + { type: "unsubscribe", surfaceId: "a" }, + { type: "invoke", surfaceId: "a", actionId: "toggle", payload: true }, + { type: "chat.send", message: "hi" }, + { type: "chat.subscribe", conversationId: "c1" }, + { type: "chat.unsubscribe", conversationId: "c1" }, + { type: "chat.queue", conversationId: "c1", text: "steer" }, + ]; + + const validKinds = new Set<RouteResult["kind"]>([ + "surface", + "chat", + "chat-error", + "chat-subscribe", + "chat-unsubscribe", + "chat-queue", + ]); + + for (const msg of samples) { + const result = routeClientMessage(registry, connSubs, msg); + expect(result).toBeDefined(); + expect(validKinds.has(result.kind)).toBe(true); + } + }); + }); }); describe("catalogMessage", () => { diff --git a/packages/transport-ws/src/router.ts b/packages/transport-ws/src/router.ts index 93f97f8..e1f53c5 100644 --- a/packages/transport-ws/src/router.ts +++ b/packages/transport-ws/src/router.ts @@ -9,6 +9,7 @@ import type { SurfaceContext, SurfaceRegistry } from "@dispatch/surface-registry"; import type { + ChatQueueMessage, ChatSendMessage, ChatSubscribeMessage, ChatUnsubscribeMessage, @@ -68,13 +69,27 @@ export interface ChatUnsubscribeRouteResult { readonly conversationId: string; } +/** + * The effect a validated chat.queue should produce. The shell calls + * `orchestrator.enqueue({ conversationId, text })` and emits NOTHING back on + * either path (fire-and-forget): success is confirmed by the message-queue + * SURFACE updating (startedTurn:false) or by streaming chat.deltas + * (startedTurn:true — the shell auto-subscribes the sender, same as chat.send). + */ +export interface ChatQueueRouteResult { + readonly kind: "chat-queue"; + readonly conversationId: string; + readonly text: string; +} + /** The effect any client WS message should produce. */ export type RouteResult = | SurfaceRouteResult | ChatRouteResult | ChatRouteError | ChatSubscribeRouteResult - | ChatUnsubscribeRouteResult; + | ChatUnsubscribeRouteResult + | ChatQueueRouteResult; // ── Helpers ───────────────────────────────────────────────────────────────── @@ -118,6 +133,8 @@ export function routeClientMessage( return handleChatSubscribe(msg); case "chat.unsubscribe": return handleChatUnsubscribe(msg); + case "chat.queue": + return handleChatQueue(msg); } } @@ -164,6 +181,27 @@ function handleChatUnsubscribe(msg: ChatUnsubscribeMessage): ChatUnsubscribeRout return { kind: "chat-unsubscribe", conversationId: msg.conversationId }; } +/** + * Validate a chat.queue: `text` must be a non-empty string AFTER TRIM (matches + * the HTTP `QueueRequest` rule). Invalid → `chat-error` (the shell replies with + * `chat.error`, same style as a malformed `chat.send`; the orchestrator is never + * called). Valid → `chat-queue` (the shell calls `orchestrator.enqueue`). + */ +function handleChatQueue(msg: ChatQueueMessage): ChatQueueRouteResult | ChatRouteError { + if (typeof msg.text !== "string" || msg.text.trim().length === 0) { + return { + kind: "chat-error", + conversationId: msg.conversationId, + errorMessage: "chat.queue requires a non-empty string `text`", + }; + } + return { + kind: "chat-queue", + conversationId: msg.conversationId, + text: msg.text, + }; +} + // ── Per-message handlers ──────────────────────────────────────────────────── function handleSubscribe( diff --git a/packages/transport-ws/src/server.bun.test.ts b/packages/transport-ws/src/server.bun.test.ts index 43d008a..da3be5e 100644 --- a/packages/transport-ws/src/server.bun.test.ts +++ b/packages/transport-ws/src/server.bun.test.ts @@ -96,16 +96,22 @@ interface FakeOrchestratorOpts { readonly startTurn?: SessionOrchestrator["startTurn"]; /** If true, startTurn always returns already-active. */ readonly alreadyActive?: boolean; + /** Custom enqueue impl. */ + readonly enqueue?: SessionOrchestrator["enqueue"]; + /** If true, enqueue reports the conversation was active (startedTurn:false). */ + readonly queueActive?: boolean; } function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & { readonly listeners: Map<string, Set<TurnEventListener>>; readonly startCalls: readonly { conversationId: string; text: string }[]; + readonly enqueueCalls: readonly { conversationId: string; text: string }[]; readonly aborted: boolean; } { const listeners = opts?.listeners ?? new Map<string, Set<TurnEventListener>>(); const bufferedEvents = opts?.bufferedEvents ?? new Map<string, readonly AgentEvent[]>(); const startCalls: { conversationId: string; text: string }[] = []; + const enqueueCalls: { conversationId: string; text: string }[] = []; const aborted = false; return { @@ -113,6 +119,9 @@ function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & { get startCalls() { return startCalls; }, + get enqueueCalls() { + return enqueueCalls; + }, get aborted() { return aborted; }, @@ -126,6 +135,16 @@ function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & { } return { started: true, turnId: "fake-turn-id" }; }, + enqueue(input) { + enqueueCalls.push({ conversationId: input.conversationId, text: input.text }); + if (opts?.enqueue) { + return opts.enqueue(input); + } + if (opts?.queueActive) { + return { startedTurn: false, queue: [] }; + } + return { startedTurn: true, queue: [] }; + }, subscribe(conversationId, listener) { let set = listeners.get(conversationId); if (!set) { @@ -159,12 +178,15 @@ function fakeOrchestrator(opts?: FakeOrchestratorOpts): SessionOrchestrator & { /** Create a fake orchestrator that broadcasts events when `broadcast` is called. */ function fakeOrchestratorWithBroadcast(): SessionOrchestrator & { readonly listeners: Map<string, Set<TurnEventListener>>; + readonly enqueueCalls: readonly { conversationId: string; text: string }[]; broadcast(conversationId: string, event: AgentEvent): void; } { const listeners = new Map<string, Set<TurnEventListener>>(); + const enqueueCalls: { conversationId: string; text: string }[] = []; return { listeners, + enqueueCalls, broadcast(conversationId, event) { const set = listeners.get(conversationId); if (set) { @@ -176,6 +198,10 @@ function fakeOrchestratorWithBroadcast(): SessionOrchestrator & { startTurn(_input) { return { started: true, turnId: "fake-turn-id" }; }, + enqueue(input) { + enqueueCalls.push({ conversationId: input.conversationId, text: input.text }); + return { startedTurn: true, queue: [] }; + }, subscribe(conversationId, listener) { let set = listeners.get(conversationId); if (!set) { @@ -323,6 +349,29 @@ function startServer( break; } + case "chat-queue": { + // Mirror extension.ts: fire-and-forget. On startedTurn:true + // auto-subscribe the sender (deltas stream); on false emit + // nothing back. + const enqueueResult = orchestrator.enqueue({ + conversationId: result.conversationId, + text: result.text, + }); + if (enqueueResult.startedTurn) { + if (!state.chatSubscriptions.has(result.conversationId)) { + const unsubscribe = orchestrator.subscribe(result.conversationId, (event) => { + ws.send(JSON.stringify({ type: "chat.delta", event })); + }); + state.chatSubscriptions.set(result.conversationId, unsubscribe); + } + } + log.info?.("transport-ws: chat.queue accepted", { + conversationId: result.conversationId, + startedTurn: enqueueResult.startedTurn, + }); + break; + } + case "chat-error": { log.warn?.("transport-ws: malformed chat.send", { reason: result.errorMessage, @@ -649,6 +698,136 @@ describe("chat ops (new orchestrator API)", () => { }); }); +describe("chat.queue (steering enqueue)", () => { + let server: ReturnType<typeof Bun.serve>; + let port: number; + + afterEach(() => { + server.stop(); + }); + + test("chat.queue with valid text → orchestrator.enqueue called with {conversationId, text}, no reply sent", async () => { + const orch = fakeOrchestrator(); // idle → startedTurn:true + const registry = fakeRegistry([]); + server = startServer(registry, orch); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: "steer please" })); + // Allow the message handler to run. + await new Promise((r) => setTimeout(r, 50)); + + expect(orch.enqueueCalls).toEqual([{ conversationId: "c1", text: "steer please" }]); + // chat.send-path equivalence: enqueue NOT called via startTurn. + expect(orch.startCalls).toHaveLength(0); + // Fire-and-forget: no chat.error, no ack — only the catalog was sent. + // (startedTurn:true path auto-subscribes but emits nothing itself.) + + ws.close(); + }); + + test("chat.queue on startedTurn:true auto-subscribes the sender (deltas stream as chat.delta)", async () => { + const orch = fakeOrchestratorWithBroadcast(); // enqueue → startedTurn:true + const registry = fakeRegistry([]); + server = startServer(registry, orch); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: "go" })); + await new Promise((r) => setTimeout(r, 50)); + + // The sender was auto-subscribed — a broadcast reaches it as a chat.delta. + const event = { + type: "text-delta", + conversationId: "c1", + turnId: "t1", + delta: "Hi", + } as AgentEvent; + orch.broadcast("c1", event); + + const msg = await waitForMessage(ws); + expect(msg.type).toBe("chat.delta"); + if (msg.type === "chat.delta") { + expect(msg.event).toEqual(event); + } + + ws.close(); + }); + + test("chat.queue on startedTurn:false (queued for steering) emits NOTHING back", async () => { + const orch = fakeOrchestrator({ queueActive: true }); // enqueue → startedTurn:false + const registry = fakeRegistry([]); + server = startServer(registry, orch); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: "steer" })); + await new Promise((r) => setTimeout(r, 50)); + + expect(orch.enqueueCalls).toEqual([{ conversationId: "c1", text: "steer" }]); + // No further message should arrive within a quiet window: success is + // confirmed by the message-queue SURFACE, not a reply here. We assert + // by NOT receiving anything (a silent socket). + await expect( + Promise.race([ + waitForMessage(ws).then((m) => new Error(`unexpected reply: ${JSON.stringify(m)}`)), + new Promise((resolve) => setTimeout(() => resolve("silent"), 150)), + ]), + ).resolves.toBe("silent"); + + ws.close(); + }); + + test("chat.queue with empty text → chat.error to client, no enqueue", async () => { + const orch = fakeOrchestrator(); + const registry = fakeRegistry([]); + server = startServer(registry, orch); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1", text: " " })); + const errMsg = await waitForMessage(ws); + + expect(errMsg.type).toBe("chat.error"); + if (errMsg.type === "chat.error") { + expect(errMsg.conversationId).toBe("c1"); + expect(errMsg.message).toContain("non-empty string"); + } + expect(orch.enqueueCalls).toHaveLength(0); + + ws.close(); + }); + + test("chat.queue with missing text → chat.error to client, no enqueue", async () => { + const orch = fakeOrchestrator(); + const registry = fakeRegistry([]); + server = startServer(registry, orch); + port = server.port as number; + + const ws = new WebSocket(`ws://localhost:${port}`); + await waitForMessage(ws); // drain catalog + + ws.send(JSON.stringify({ type: "chat.queue", conversationId: "c1" })); + const errMsg = await waitForMessage(ws); + + expect(errMsg.type).toBe("chat.error"); + if (errMsg.type === "chat.error") { + expect(errMsg.message).toContain("non-empty string"); + } + expect(orch.enqueueCalls).toHaveLength(0); + + ws.close(); + }); +}); + describe("logging", () => { let server: ReturnType<typeof Bun.serve>; let port: number; diff --git a/packages/wire/package.json b/packages/wire/package.json index 07c20b7..b83f127 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.7.0", + "version": "0.8.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 6a6de7d..3edeabf 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -224,6 +224,37 @@ export interface TurnMetrics { readonly contextSize?: number; } +// ─── Message queue + steering ─────────────────────────────────────────────── + +/** + * A user message held in a conversation's message queue, awaiting mid-turn + * steering delivery. The message-queue extension owns the queue and exposes it + * as a per-conversation `custom` surface field; this type is the shared shape + * the surface payload, the enqueue response, and the extension's service all + * use (so a separate frontend repo can depend on the wire alone to render it). + */ +export interface QueuedMessage { + /** Stable id (client-visible) for UI keying + dedup. */ + readonly id: string; + /** The message text the client enqueued. */ + readonly text: string; + /** When the message was enqueued (epoch-ms). */ + readonly queuedAt: number; +} + +/** + * The payload of the message-queue extension's per-conversation `custom` + * surface field (`rendererId: "message-queue"`): the current queue snapshot a + * frontend renders. Carried on the SURFACE channel (NOT the chat stream) — the + * queue is control/state, distinct from turn content. An empty `messages` + * array means the queue is empty (no pending steering). The frontend moves a + * message from this queue surface into the transcript when it is drained (the + * surface clears) and/or when the matching `TurnSteeringEvent` arrives. + */ +export interface QueuePayload { + readonly messages: readonly QueuedMessage[]; +} + // ─── Outward events ───────────────────────────────────────────────────────── /** @@ -243,7 +274,8 @@ export type AgentEvent = | TurnStepCompleteEvent | TurnErrorEvent | TurnDoneEvent - | TurnSealedEvent; + | TurnSealedEvent + | TurnSteeringEvent; /** Status change for a conversation (e.g. idle → running). */ export interface StatusEvent { @@ -440,3 +472,27 @@ export interface TurnSealedEvent { readonly conversationId: string; readonly turnId: string; } + +/** + * A steering message was injected into an in-flight turn at the tool-result + * boundary (the model sees it alongside the tool results and may adjust + * course). Drawn from the conversation's message queue (which the drain + * clears); the cleared queue arrives as a message-queue SURFACE update, while + * THIS event carries the injected `text` so a frontend can place a user bubble + * in the transcript live — and so a late-joining watcher sees it before seal + * (mirroring `TurnInputEvent` for the opening prompt; emitted into the + * in-flight buffer by the session-orchestrator). + * + * Emitted by the session-orchestrator (in its `drainSteering` wrapper) only + * when the kernel drained a non-empty queue at a tool-result boundary. If the + * turn instead ENDS with a non-empty queue (no tool call fired), the queue is + * carried into a NEW turn whose opening `user-message` event covers the + * transcript — so no `steering` event is emitted in that case. One `steering` + * event per drain; the combined text of all drained messages. + */ +export interface TurnSteeringEvent { + readonly type: "steering"; + readonly conversationId: string; + readonly turnId: string; + readonly text: string; +} @@ -5,7 +5,7 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **894 vitest + transport bun green**. +`tsc -b` EXIT 0 · biome clean · **1043 vitest + transport bun green**. Built and verified live (full-fidelity: every feature is a manifest-loaded extension through the host): @@ -384,6 +384,46 @@ budget_tokens; `../claude` orchestrated DIRECTLY (mode A); CLI `--effort` now. `../dispatch-web`): ChatRequest/chat.send field + GET/PUT endpoints + ladder + default-`high` semantics + cache note. +## Message queue + steering injection (DONE) +Design: this file's roadmap item 3 (now implemented). User-gated calls: a **separate +`message-queue` standard extension** (dependsOn `surface-registry`) owns the queue STATE + +a per-conversation `custom` surface; the **session-orchestrator** owns delivery (drain → +inject → carry) + emits the `steering` event (it owns the chat hub — no `chatEmit` service +needed); the **kernel** gets a generic `drainSteering` callback. Glossary: added +**message queue**, **steering**, **queued message**. Enqueue when idle **starts a turn** +(user choice; `chat.queue` degrades to `chat.send`). Steering text rendered live via a new +additive `steering` `AgentEvent`; queue state via the surface (NOT the chat stream). +- **Wave 0 (orchestrator, contracts):** `RunTurnInput.drainSteering?: () => readonly + ChatMessage[]` (kernel contract — generic, kernel stays pure); `QueuedMessage` + + `QueuePayload` + `TurnSteeringEvent` (type `"steering"`, additive to `AgentEvent`) in + `@dispatch/wire` (`0.7.0→0.8.0`); `POST /conversations/:id/queue` + WS `chat.queue` op + + `QueueRequest`/`QueueResponse` in `@dispatch/transport-contract` (`0.11.0→0.12.0`). typecheck + clean except the expected transport-ws exhaustive-switch fan-out (fixed in Wave 3). +- **Wave 1 (parallel ×2, disjoint):** `kernel` runtime — calls `drainSteering` at the + tool-result boundary only when continuing to a next step (gated; no drain on max-steps), + +6 pure tests (65 total); `message-queue` (NEW ext) — pure queue core (enqueue/getQueue/ + drain/combine) + `MessageQueueService`/`messageQueueHandle` + per-conversation `custom` + surface (`rendererId:"message-queue"`, `QueuePayload`), 12 tests. (The message-queue agent + DIED mid-task after writing all src+tests but before verifying/reporting; orchestrator + recovered by running `bun install` + root tsconfig ref + verifying directly — tsc/vitest/ + biome clean, 12 tests pass; no hand-fixing of impl.) +- **Wave 2:** `session-orchestrator` — added `enqueue` facade (idle→`startTurn`, + active→queue.enqueue) + `resolveQueue?` dep (self-wired lazily in `activate` via + `host.getService(messageQueueHandle)` — host-bin does NOT wire it) + `drainSteering` wrapper + (drain → emit `steering` → return one combined user `ChatMessage`) + post-seal carry + (non-empty queue → new turn), +8 tests (85 total). `message-queue` is an OPTIONAL dep + (feature degrades off if absent). +- **Wave 3 (parallel ×3):** `host-bin` — registered `message-queue` in `CORE_EXTENSIONS` + (+dep+ref), 28 tests; `transport-http` — `POST /conversations/:id/queue` route + validation, + 145 tests; `transport-ws` — `chat.queue` op + fixed the Wave-0 exhaustive-switch fan-out, + 29 vitest + 20 bun. +- Verified: `tsc -b` EXIT 0, biome clean (280 files), **1043 vitest + 199 transport bun** pass; + all agents in-lane. **Boot smoke:** private instance boots clean with `message-queue` + registered (no activation crash). +- [x] FE courier handoff written: `frontend-message-queue-handoff.md` (user couriers to + `../dispatch-web`): surface (`rendererId:"message-queue"`), `chat.queue` WS op, `steering` + event, HTTP `POST /queue`, auto-start-when-idle, carry semantics, version bumps. + ## Open items - **Context window LIMIT (deferred, sibling of context size):** expose the selected model's max context-window token limit so the FE can render `contextSize / limit` (e.g. `1286 / 200000`). @@ -420,19 +460,12 @@ budget_tokens; `../claude` orchestrated DIRECTLY (mode A); CLI `--effort` now. whole conversation). - **send, no `--queue` flag (default):** BLOCKING — sends, waits for the turn to settle, returns the AI's last message (same shape as the read). - - **send with `--queue`:** enqueues the message into the conversation's - message queue (roadmap item 3) and exits immediately. -3. **Message queue + steering injection (backend core; prerequisite for the - `--queue` flag in item 2):** a per-conversation queue a client (FE or CLI) - can push a message onto while a turn is GENERATING. Delivery semantics: - - On the turn's next TOOL CALL, queued messages are injected as "steering" - messages returned alongside the tool result (the model sees them - mid-turn and can adjust course). - - If the turn ends before any tool call fires, the queued messages are - COMBINED and sent as a fresh user message starting a NEW turn. - - FE queueing UX (queue while generating) couriered to `../dispatch-web`. - Touches the kernel turn loop (injection at the tool-result boundary) — a - contract-first design pass needed before summoning. + - **send with `--queue`:** enqueues the message into the conversation's + message queue (DONE — see "Message queue + steering injection" above; the + `POST /conversations/:id/queue` endpoint + `chat.queue` WS op ship it) and + exits immediately. + 3. **Message queue + steering injection — DONE** (see the milestone section above; + prerequisite for item 2's `--queue` flag met). 4. **CLI flag to open/activate an FE tab:** optional CLI flag (new or existing conversation) that makes an already-open frontend open the conversation as a tab and mark it active. Does NOT exist today — no backend→FE "open/focus @@ -442,7 +475,18 @@ budget_tokens; `../claude` orchestrated DIRECTLY (mode A); CLI `--effort` now. 5. **`todo` tool** — a per-conversation task-list tool the model maintains (like opencode's todowrite/todoread), as a standard tool extension; likely a surface so the FE can render the live list. -6. **`web_search` tool** — a web search tool (like old dispatch's; - reference-only source at `../dispatch-source`), as a standard tool extension. + 6. **`web_search` tool** — a web search tool (like old dispatch's; + reference-only source at `../dispatch-source`), as a standard tool extension. + 7. **Message queue — close-with-queued-messages (deferred product decision):** + if a client closes a conversation (`POST /conversations/:id/close`) while the + queue is non-empty, the carry currently still fires (starts a new turn on the + closed conversation). Decide: does closing discard pending steering, or honor + it? If "discard," gate the carry on `finishReason !== "aborted"` in + session-orchestrator (one-line). No FE action either way. + 8. **Live-verify the steering flow (once the frontend is complete):** run a live + `chat.queue` → tool-call → `steering` event flow against a real tool-calling + model, end-to-end. The logic is unit/integration tested + boot-smoke-clean; + this is the live end-to-end smoke. Blocked on the frontend wiring the queue + surface + `chat.queue` op (or run it backend-only with a probe client). -(Done and dropped from the list: CLI; dedup / storage growth.) +(Done and dropped from the list: CLI; dedup / storage growth; message queue + steering injection.) diff --git a/tsconfig.json b/tsconfig.json index 13a4735..66982e4 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -22,6 +22,7 @@ { "path": "./packages/tool-write-file" }, { "path": "./packages/skills" }, { "path": "./packages/cache-warming" }, + { "path": "./packages/message-queue" }, { "path": "./packages/lsp" }, { "path": "./packages/cli" }, { "path": "./packages/journal-sink" }, |
