summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-21 02:19:54 +0900
committerAdam Malczewski <[email protected]>2026-06-21 02:19:54 +0900
commitd98a63ce17519983dcf58c27432723e2f4b96e75 (patch)
tree21a4e043d040984aa62fd2ba81ca3349ce01f5c4
parent9c90105b6cfede0f3327169718300c649bb0531a (diff)
downloaddispatch-web-d98a63ce17519983dcf58c27432723e2f4b96e75.tar.gz
dispatch-web-d98a63ce17519983dcf58c27432723e2f4b96e75.zip
feat(chat): message queue + steering — mid-turn injection at tool-result boundaries
Consume the message-queue + steering handoff ([email protected], [email protected]). Re-pinned file: deps + re-mirrored .dispatch/*.reference.md. - fold steering AgentEvent into the transcript as a provisional user bubble (after the tool-result it followed; no de-dup — the queue surface carried it) - add rendererId: "message-queue" custom renderer (pure parser + MessageQueueList) rendered as a compact panel above the Composer (hidden when queue is empty) - add ChatStore.queueMessage / AppStore.queueMessage — sends chat.queue WS op (trim/validate non-empty; auto-starts a turn if idle) - Composer switches to chat.queue while generating (button → Queue, placeholder → Steer the conversation...) - exhaustiveness guards updated for steering + chat.queue - carry-to-new-turn needs no special handling (normal new turn) 664 tests green.
-rw-r--r--.dispatch/transport-contract.reference.md94
-rw-r--r--.dispatch/wire.reference.md85
-rw-r--r--GLOSSARY.md4
-rw-r--r--backend-handoff.md42
-rw-r--r--src/app/App.svelte69
-rw-r--r--src/app/store.svelte.ts69
-rw-r--r--src/core/chunks/reducer.test.ts54
-rw-r--r--src/core/chunks/reducer.ts19
-rw-r--r--src/core/wire/conformance.test.ts8
-rw-r--r--src/core/wire/conformance.ts4
-rw-r--r--src/features/chat/ports.ts9
-rw-r--r--src/features/chat/store.svelte.ts112
-rw-r--r--src/features/chat/store.test.ts276
-rw-r--r--src/features/chat/test-helpers.ts16
-rw-r--r--src/features/chat/ui/Composer.svelte25
-rw-r--r--src/features/surface-host/logic/message-queue.test.ts48
-rw-r--r--src/features/surface-host/logic/message-queue.ts45
-rw-r--r--src/features/surface-host/ui/MessageQueueList.svelte22
-rw-r--r--src/features/surface-host/ui/SurfaceView.svelte3
19 files changed, 949 insertions, 55 deletions
diff --git a/.dispatch/transport-contract.reference.md b/.dispatch/transport-contract.reference.md
index 1c3d993..18d1a3d 100644
--- a/.dispatch/transport-contract.reference.md
+++ b/.dispatch/transport-contract.reference.md
@@ -5,10 +5,31 @@
> hangs on a permission prompt). Your CODE still imports `@dispatch/transport-contract` normally —
> this file is for READING only.
>
-> **Orchestrator:** SNAPSHOT of `[email protected]` (reasoning effort shipped).
-> Depends on `@dispatch/[email protected]` (see `wire.reference.md`) + `@dispatch/[email protected]` (see
+> **Orchestrator:** SNAPSHOT of `[email protected]` (message queue + steering).
+> Depends on `@dispatch/[email protected]` (see `wire.reference.md`) + `@dispatch/[email protected]` (see
> `ui-contract.reference.md`).
>
+> **2026-06-21 delta (message-queue + steering handoff — package bumped `0.11.0` → `0.12.0`, ADDITIVE):**
+> adds the enqueue surface for the per-conversation message queue (the wire types `QueuedMessage` /
+> `QueuePayload` + the new `steering` `AgentEvent` live in `[email protected]`, re-exported here). Two
+> additive shapes:
+> 1. **WS `chat.queue` op** — `ChatQueueMessage { type: "chat.queue"; conversationId; text }` (a
+> new `WsClientMessage` union member). Fire-and-forget: on success the server emits NOTHING back
+> — the message-queue SURFACE updates (the new message appears in the snapshot). On failure (empty
+> `text`, unknown conversation) the server replies `chat.error`. **Auto-start when idle
+> (server-owned):** if no turn is active, `chat.queue` does NOT queue — it STARTS A NEW TURN with
+> the message as its opening prompt (equivalent to `chat.send`). So a single op works for both
+> "steer during generation" and "send"; the client doesn't pick. `text` must be non-empty after trim.
+> 2. **HTTP `POST /conversations/:id/queue`** — body `QueueRequest { text }` → `QueueResponse
+> { conversationId; startedTurn: boolean; queue: QueuedMessage[] }`. `startedTurn: true` = was
+> idle, a new turn started (the message is the turn's opening prompt, NOT a queued steering
+> message); `startedTurn: false` = a turn was active, the message was queued (the `queue`
+> snapshot includes it). Empty/whitespace `text` → HTTP 400 `{ error }`. The FE uses the WS op.
+>
+> The queue is read via a per-conversation SURFACE (`message-queue`, scope `conversation`; one
+> `custom` field, `rendererId: "message-queue"`, `payload: QueuePayload`) — NOT via the chat stream.
+> See the handoff for the full flow (steering event, carry-to-new-turn, move-vs-duplicate).
+>
> **2026-06-12 delta (reasoning-effort handoff — package bumped `0.10.0` → `0.11.0`, ADDITIVE):**
> the thinking-depth knob (`ReasoningEffort`, re-exported from `[email protected]`) lands in TWO scopes,
> resolved server-side per turn (per-turn override → persisted conversation value → default
@@ -135,6 +156,11 @@
- `POST /conversations/:id/close` — no body → `200 CloseConversationResponse`. The EXPLICIT tab-close
affordance: aborts any in-flight turn (persists the partial; seals with `finishReason: "aborted"`)
AND stops + disables cache-warming (persisted OFF). Idempotent (`abortedTurn: false` when idle/unknown).
+- `POST /conversations/:id/queue` — body `QueueRequest { text }` → `200 QueueResponse`. Enqueue a user
+ message for mid-turn steering delivery (the WS `chat.queue` op is the FE's path). When a turn is
+ active, the message is queued + delivered at the next tool-result boundary (a `steering` `AgentEvent`
+ fires; the message-queue SURFACE updates). When idle, the enqueue STARTS a new turn with the message
+ as its opening prompt (`startedTurn: true`). Empty/whitespace `text` → `400 { error }`.
- `GET /metrics/throughput?period=day|week|month&date=<...>` — `ThroughputResponse` (token-weighted
tokens/sec per model over the window). Not part of cache-warming; listed for completeness.
- `GET /conversations/:id/cwd` — `CwdResponse` (`cwd` is `null` until set).
@@ -172,10 +198,17 @@
*/
import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
-import type { AgentEvent, ReasoningEffort, StoredChunk, TurnMetrics } from "@dispatch/wire";
+import type {
+ AgentEvent,
+ QueuedMessage,
+ ReasoningEffort,
+ StoredChunk,
+ TurnMetrics,
+} from "@dispatch/wire";
export type {
AgentEvent,
+ QueuedMessage,
ReasoningEffort,
StepMetrics,
StoredChunk,
@@ -395,6 +428,41 @@ export interface CloseConversationResponse {
readonly abortedTurn: boolean;
}
+// ─── Message queue (steering) ─────────────────────────────────────────────────
+
+/**
+ * Request body for `POST /conversations/:id/queue` — enqueue a user message
+ * onto a conversation's message queue for mid-turn steering delivery.
+ *
+ * When a turn is ACTIVE for the conversation, the message is appended to the
+ * queue (the message-queue extension's per-conversation SURFACE updates) and
+ * delivered at the next tool-result boundary as a steering message the model
+ * sees alongside the tool results (a `steering` `AgentEvent` is emitted). When
+ * NO turn is active, enqueuing instead STARTS a new turn with the message as its
+ * opening prompt (equivalent to `POST /chat`) — so a fire-and-forget enqueue
+ * works regardless of generation state. The resolved queue + whether a turn was
+ * started are returned in `QueueResponse`.
+ *
+ * `text` must be non-empty (after trim) → HTTP 400 `{ error }` otherwise.
+ */
+export interface QueueRequest {
+ readonly text: string;
+}
+
+/**
+ * Response body for `POST /conversations/:id/queue` — the conversation's queue
+ * snapshot AFTER the enqueue, so a client renders the queue from this alone.
+ * `conversationId` echoes the path. `startedTurn` is true when no turn was
+ * active and the enqueue started a new turn (the message is now the turn's
+ * opening prompt, not a queued steering message); the turn's events stream on
+ * the chat channel as usual.
+ */
+export interface QueueResponse {
+ readonly conversationId: string;
+ readonly startedTurn: boolean;
+ readonly queue: readonly QueuedMessage[];
+}
+
// ─── Per-conversation LSP status ──────────────────────────────────────────────
/** The connection state of a single language server for a workspace. */
@@ -550,6 +618,23 @@ export interface ChatUnsubscribeMessage {
}
/**
+ * Client → server: enqueue a message onto a conversation's message queue while
+ * a turn is generating (steering). The WebSocket counterpart of the HTTP
+ * `POST /conversations/:id/queue` (`QueueRequest`). Fire-and-forget: success is
+ * confirmed by the message-queue SURFACE updating (the FE renders the queue
+ * from the surface, not from a reply here); a failure (malformed/empty text,
+ * unknown conversation) arrives as a `chat.error`. When no turn is active, the
+ * enqueue starts a new turn (the turn's events stream as `chat.delta`s), so a
+ * client reuses this op for both "queue while generating" and "send" (the
+ * latter being equivalent to `chat.send`).
+ */
+export interface ChatQueueMessage {
+ readonly type: "chat.queue";
+ readonly conversationId: string;
+ readonly text: string;
+}
+
+/**
* Every client → server WS message: surface ops (`@dispatch/ui-contract`) + chat
* ops. A server discriminates on `type`.
*/
@@ -557,7 +642,8 @@ export type WsClientMessage =
| SurfaceClientMessage
| ChatSendMessage
| ChatSubscribeMessage
- | ChatUnsubscribeMessage;
+ | ChatUnsubscribeMessage
+ | ChatQueueMessage;
/**
* Every server → client WS message: surface ops (`@dispatch/ui-contract`) + chat
diff --git a/.dispatch/wire.reference.md b/.dispatch/wire.reference.md
index 34984d2..c2c4d43 100644
--- a/.dispatch/wire.reference.md
+++ b/.dispatch/wire.reference.md
@@ -4,8 +4,31 @@
> types WITHOUT following the `file:` dep symlink out of this repo (which hangs on a permission
> prompt). Your CODE still imports `@dispatch/wire` normally — this file is for READING only.
>
-> **Orchestrator:** SNAPSHOT of `[email protected]` (reasoning effort — the thinking-depth knob).
-> Regenerate whenever `@dispatch/wire` changes.
+> **Orchestrator:** SNAPSHOT of `[email protected]` (message queue + steering). Regenerate
+> whenever `@dispatch/wire` changes.
+>
+> **2026-06-21 delta (message-queue + steering handoff — package bumped `0.7.0` → `0.8.0`, ADDITIVE):**
+> adds the per-conversation **message queue** + **steering** feature. While a turn is GENERATING,
+> a client enqueues a user message (via the `chat.queue` WS op or `POST /conversations/:id/queue`,
+> see `[email protected]`); it is delivered mid-turn as **steering** — injected at the next
+> tool-result boundary so the model sees it alongside the tool results and can adjust course. If the
+> turn ends with a non-empty queue (no tool call fired), the queue is carried into a NEW turn as its
+> opening prompt (no `steering` event — the new turn's `user-message` covers it).
+>
+> Adds:
+> - **`QueuedMessage`** (`{ id, text, queuedAt }`) — a message held in the queue (stable id for UI
+> keying + dedup).
+> - **`QueuePayload`** (`{ messages: QueuedMessage[] }`) — the payload of the message-queue
+> extension's per-conversation `custom` surface field (`rendererId: "message-queue"`). Carried on
+> the SURFACE channel (NOT the chat stream) — the queue is control/state. Empty `messages` = empty
+> queue. See `transport-contract.reference.md` for the surface + the enqueue op.
+> - **`TurnSteeringEvent`** (`{ type: "steering"; conversationId; turnId; text }`) — a NEW
+> `AgentEvent` union member, emitted on the chat stream when the kernel drains a non-empty queue
+> at a tool-result boundary. Render `text` as a USER bubble in the transcript (positioned after
+> the tool-result it followed); the queue surface separately clears on drain. One event per drain;
+> `text` is the combined text of all drained messages. Late-join safe (buffered into the in-flight
+> turn's event buffer, mirroring `user-message`). Carry-to-new-turn does NOT emit `steering`.
+> ADDITIVE to the union — if you have an exhaustive `AgentEvent` switch, add a `steering` case.
>
> **2026-06-12 delta (reasoning-effort handoff — package bumped `0.6.1` → `0.7.0`, ADDITIVE):**
> adds the **`ReasoningEffort`** type — the per-request thinking-depth ladder
@@ -284,6 +307,37 @@ export interface TurnMetrics {
readonly contextSize?: number;
}
+// ─── Message queue + steering ───────────────────────────────────────────────
+
+/**
+ * A user message held in a conversation's message queue, awaiting mid-turn
+ * steering delivery. The message-queue extension owns the queue and exposes it
+ * as a per-conversation `custom` surface field; this type is the shared shape
+ * the surface payload, the enqueue response, and the extension's service all
+ * use (so a separate frontend repo can depend on the wire alone to render it).
+ */
+export interface QueuedMessage {
+ /** Stable id (client-visible) for UI keying + dedup. */
+ readonly id: string;
+ /** The message text the client enqueued. */
+ readonly text: string;
+ /** When the message was enqueued (epoch-ms). */
+ readonly queuedAt: number;
+}
+
+/**
+ * The payload of the message-queue extension's per-conversation `custom`
+ * surface field (`rendererId: "message-queue"`): the current queue snapshot a
+ * frontend renders. Carried on the SURFACE channel (NOT the chat stream) — the
+ * queue is control/state, distinct from turn content. An empty `messages`
+ * array means the queue is empty (no pending steering). The frontend moves a
+ * message from this queue surface into the transcript when it is drained (the
+ * surface clears) and/or when the matching `TurnSteeringEvent` arrives.
+ */
+export interface QueuePayload {
+ readonly messages: readonly QueuedMessage[];
+}
+
// ─── Outward events ─────────────────────────────────────────────────────────
/**
@@ -303,7 +357,8 @@ export type AgentEvent =
| TurnStepCompleteEvent
| TurnErrorEvent
| TurnDoneEvent
- | TurnSealedEvent;
+ | TurnSealedEvent
+ | TurnSteeringEvent;
/** Status change for a conversation (e.g. idle → running). */
export interface StatusEvent {
@@ -498,4 +553,28 @@ export interface TurnSealedEvent {
readonly conversationId: string;
readonly turnId: string;
}
+
+/**
+ * A steering message was injected into an in-flight turn at the tool-result
+ * boundary (the model sees it alongside the tool results and may adjust
+ * course). Drawn from the conversation's message queue (which the drain
+ * clears); the cleared queue arrives as a message-queue SURFACE update, while
+ * THIS event carries the injected `text` so a frontend can place a user bubble
+ * in the transcript live — and so a late-joining watcher sees it before seal
+ * (mirroring `TurnInputEvent` for the opening prompt; emitted into the
+ * in-flight buffer by the session-orchestrator).
+ *
+ * Emitted by the session-orchestrator (in its `drainSteering` wrapper) only
+ * when the kernel drained a non-empty queue at a tool-result boundary. If the
+ * turn instead ENDS with a non-empty queue (no tool call fired), the queue is
+ * carried into a NEW turn whose opening `user-message` event covers the
+ * transcript — so no `steering` event is emitted in that case. One `steering`
+ * event per drain; the combined text of all drained messages.
+ */
+export interface TurnSteeringEvent {
+ readonly type: "steering";
+ readonly conversationId: string;
+ readonly turnId: string;
+ readonly text: string;
+}
```
diff --git a/GLOSSARY.md b/GLOSSARY.md
index 90acdd8..e350ffa 100644
--- a/GLOSSARY.md
+++ b/GLOSSARY.md
@@ -22,6 +22,8 @@
| **context size** | The tokens a conversation currently occupies: the most recent turn's FINAL step `inputTokens + outputTokens` (NOT the aggregate per-turn `usage`, which sums per-step prompts and overcounts a multi-step turn). On the wire as `TurnDoneEvent.contextSize` (live `done`) + `TurnMetrics.contextSize` (persisted); the FE reads the LATEST turn's value as current usage, and treats `undefined` as "unknown" (renders a placeholder, never `0`). Mirrors the backend GLOSSARY. | context usage, context length, tokens used (and do NOT call it "context window" — that's the limit) |
| **reasoning effort** | The per-request thinking-depth knob: how much extended thinking the model spends before answering. Canonical ladder `ReasoningEffort = "low" \| "medium" \| "high" \| "xhigh" \| "max"` (`[email protected]`). Resolution is SERVER-owned (never re-implement): per-turn `ChatRequest.reasoningEffort` override → persisted per-conversation value (`GET`/`PUT /conversations/:id/reasoning-effort`) → default `"high"` — so `null` from the GET means "default (`high`) applies", not "off". Changing the level can bust the prompt cache for the next turn (one-time re-prefill); a stable setting stays cache-safe. | thinking setting, thinking level, effort level, thinking budget |
| **context window** | The model's MAXIMUM token capacity (the limit a **context size** is measured against). A FUTURE backend field — not on the wire yet. **Placeholder:** the composer status bar currently HARDCODES a `1,000,000`-token window for the `size / limit · pct%` readout + fill bar; swap to the real per-model value when the backend ships it (see `backend-handoff.md` §3). | max context, token limit (distinct from **context size**, the current usage) |
+| **message queue** | A per-conversation buffer of user messages awaiting mid-turn **steering** delivery (owned by the `message-queue` backend extension). Transient + in-memory; exposed to the FE as a per-conversation **surface** (`message-queue`, scope `conversation`; one `custom` field, `rendererId: "message-queue"`, `payload: QueuePayload`). NOT on the chat stream — it is control/state. Enqueued via `chat.queue` (WS) or `POST /conversations/:id/queue` (HTTP); auto-starts a turn if idle. `[email protected]`. | pending messages, steering queue |
+| **steering** | A user message injected into an in-flight turn at the tool-result boundary (drawn from the **message queue**): the model sees it alongside the tool results and may adjust course. Emitted on the chat stream as a `steering` `AgentEvent` (`TurnSteeringEvent`); the queue surface clears on drain (move, don't duplicate). If the turn ends with a non-empty queue (no tool call fired), the queue carries into a NEW turn as its opening prompt (no `steering` event). `[email protected]`. | mid-turn injection, course correction, interruption |
## Frontend-specific
| Term | Meaning | Aliases to avoid |
@@ -38,6 +40,6 @@
| **surface interpreter** | The generic renderer: field kind → component. Knows kinds, never surface ids. | — |
| **metrics bubble** | The FE chat element that renders a turn's **turn metrics** (one per-turn total) and **step metrics** (one per step) as muted system-style bubbles at a turn's tail. UI presentation of `TurnMetrics`/`StepMetrics`; never a surface. | telemetry bubble, usage bubble, stats bubble |
| **TPS** (tokens per second) | A FE-DERIVED decode rate: `outputTokens / (decodeMs / 1000)` (per step; per turn over Σ `decodeMs`), falling back to `genTotalMs` when `decodeMs` is absent. The backend-recommended basis (excludes first-token latency). Not carried on the wire; omitted when timing is absent. | throughput |
-| **chat limit** | The max LOADED chunks per conversation (default 256; localStorage `dispatch.chatLimit`, no UI yet) before the oldest quarter is unloaded. Counts **chunks** (committed + provisional + accumulating). Policy in `core/chunks/trim.ts`. | chunk limit, message limit, history limit |
+| **chat limit** | The max LOADED chunks per conversation (default 256; persisted at localStorage `dispatch.chatLimit`, settable via the sidebar's Settings view) before the oldest quarter is unloaded. Counts **chunks** (committed + provisional + accumulating). Policy in `core/chunks/trim.ts`. | chunk limit, message limit, history limit |
| **unload** | Drop the oldest COMMITTED chunks from the in-memory transcript (and DOM) past the **chat limit** — in BULK (`ceil(limit/4)` per pass, deferred while the reader is scrolled up), never one-per-delta (old Dispatch's scroll-jump bug). Purely local: the IndexedDB cache and the server keep everything; `TranscriptState.hiddenBeforeSeq` is the watermark. Distinct from the conversation-cache's cross-conversation **eviction**. | evict (reserved for the cross-conversation cache), prune, drop |
| **show earlier** | The affordance at the top of a transcript with unloaded history ("Show earlier messages"): pages one unload-unit back in — local cache first, then the server (CR-5 `?beforeSeq=&limit=`) when the cache doesn't reach far enough back — preserving the reader's scroll position. Offered whenever the loaded window starts above seq 1 (the [email protected] 1-based gap-free seq contract). | load more, pagination |
diff --git a/backend-handoff.md b/backend-handoff.md
index 7c7da05..5b54f2d 100644
--- a/backend-handoff.md
+++ b/backend-handoff.md
@@ -5,18 +5,38 @@
> **From:** dispatch-web orchestrator · **To:** arch-rewrite orchestrator · **Courier:** the user.
> `lsp` does NOT span the repos (AGENTS.md § Backend seam) — every cross-repo ask flows through here.
-_Last updated: 2026-06-12 (reasoning-effort handoff consumed). **FE is current on
-`[email protected]` / `[email protected]` / `[email protected]`.** All handoffs to date are
+_Last updated: 2026-06-21 (message-queue + steering handoff consumed). **FE is current on
+`[email protected]` / `[email protected]` / `[email protected]`.** All handoffs to date are
consumed: surfaces + WS, conversation transcript/metrics, tabs + model selector, cache-warming
(incl. authoritative timer + retention + cache-rate fix + the CR-4 lifecycle below),
**per-conversation cwd + LSP status**, **context size**, **turn continuity + multi-client live
-view**, the **chat limit + CR-5 history windowing**, and the **reasoning effort
-(thinking-depth knob)** (below).
+view**, the **chat limit + CR-5 history windowing**, the **reasoning effort
+(thinking-depth knob)**, and the **message queue + steering** (below).
**Open asks: NONE.** CR-1/CR-2/CR-4/CR-5 all RESOLVED ✅ (see §2); §3 lists likely next asks.
**CR-3 (watcher couldn't see the USER prompt until seal) → RESOLVED ✅** — backend shipped the
`user-message` turn event; FE re-pinned + consumption live.
The cwd/LSP draft-path verification (`backend-handoff-cwd-lsp.md`) came back **all ✅ confirmed**._
+**Message-queue + steering handoff (`frontend-message-queue-handoff.md`) → CONSUMED ✅.**
+Re-pinned `[email protected]→0.8.0` + `[email protected]→0.12.0` (`ui-contract` unchanged —
+the queue uses the existing `custom` surface field kind); re-mirrored both
+`.dispatch/*.reference.md`; added "message queue" + "steering" to FE `GLOSSARY.md`. FE work:
+(a) `core/chunks/reducer.ts` folds the new `steering` `AgentEvent` into the transcript as a
+provisional user bubble (after the tool-result it followed; no de-dup — the queue surface, not
+the transcript, carried the pending message); `core/wire/conformance.ts` exhaustiveness guards
+updated for `steering` + `chat.queue`; (b) a `rendererId: "message-queue"` custom renderer
+(`surface-host/logic/message-queue.ts` pure parser + `MessageQueueList.svelte`) renders
+`QueuePayload.messages` (`QueuedMessage[]`); (c) the `message-queue` surface is pulled out of
+the generic Extensions sidebar list and rendered as a compact panel above the Composer (only
+when the queue is non-empty — an idle queue is hidden); (d) `ChatStore.queueMessage(text)` +
+`AppStore.queueMessage(text)` send `chat.queue { conversationId, text }` (trim/validate
+non-empty client-side too); (e) the Composer switches to `chat.queue` while `generating`
+(button label → "Queue", placeholder → "Steer the conversation..."). Carry-to-new-turn needs
+no special handling (surfaces as a normal new turn). **NOT yet live-probed** — the handoff
+flags the live end-to-end steering flow (a real `chat.queue` → tool-call → `steering` event
+against a tool-calling model) as not yet exercised; worth a live smoke. 664 tests green. NO
+new backend ask._
+
**Reasoning-effort handoff (`frontend-reasoning-effort-handoff.md`) → CONSUMED ✅
(curl-probed live: GET null on unseen id · PUT `xhigh` → echo + sticky GET · bad level → 400
listing the ladder · CORS preflight allows PUT).** Re-pinned `[email protected]→0.7.0` +
@@ -81,13 +101,13 @@ backend ask — but the max-limit denominator is now a live FE need; see §3.
## 1. Pinned backend contracts (consumed by the FE)
-Pinned as `file:` deps: **`[email protected]`; `[email protected]`; `[email protected]`**.
+Pinned as `file:` deps: **`[email protected]`; `[email protected]`; `[email protected]`**.
| Package | Used for |
|---|---|
| `@dispatch/ui-contract` | surfaces + surface WS protocol |
-| `@dispatch/wire` | `Chunk`/`StoredChunk`(+`seq`)/`ChatMessage`/`AgentEvent`/`TurnSealedEvent`/`Usage`/`StepId` + metrics: `StepMetrics`/`TurnMetrics`, `usage.stepId`, `step-complete`, `done.durationMs`/`done.usage`, `tool-result.durationMs`, **`done.contextSize`/`TurnMetrics.contextSize`**, **`ReasoningEffort`** |
-| `@dispatch/transport-contract` | `ChatRequest`(+`reasoningEffort`)/`ModelsResponse`/`ConversationHistoryResponse`/`ConversationMetricsResponse` + `WarmRequest`/`WarmResponse` + `CwdResponse`/`SetCwdRequest` + `ReasoningEffortResponse`/`SetReasoningEffortRequest` + LSP (`LspStatusResponse`/`LspServerInfo`/`LspServerState`) + WS chat ops + `WsClientMessage`/`WsServerMessage` |
+| `@dispatch/wire` | `Chunk`/`StoredChunk`(+`seq`)/`ChatMessage`/`AgentEvent`/`TurnSealedEvent`/`Usage`/`StepId` + metrics: `StepMetrics`/`TurnMetrics`, `usage.stepId`, `step-complete`, `done.durationMs`/`done.usage`, `tool-result.durationMs`, **`done.contextSize`/`TurnMetrics.contextSize`**, **`ReasoningEffort`**, **`QueuedMessage`/`QueuePayload`/`TurnSteeringEvent`** |
+| `@dispatch/transport-contract` | `ChatRequest`(+`reasoningEffort`)/`ModelsResponse`/`ConversationHistoryResponse`/`ConversationMetricsResponse` + `WarmRequest`/`WarmResponse` + `CwdResponse`/`SetCwdRequest` + `ReasoningEffortResponse`/`SetReasoningEffortRequest` + **`QueueRequest`/`QueueResponse`/`ChatQueueMessage`** + LSP (`LspStatusResponse`/`LspServerInfo`/`LspServerState`) + WS chat ops + `WsClientMessage`/`WsServerMessage` |
Endpoints in use (HTTP **24203**, WS **24205**, CORS `*` incl. `PUT`):
`POST /chat` (NDJSON) · `GET /models` ·
@@ -95,12 +115,14 @@ Endpoints in use (HTTP **24203**, WS **24205**, CORS `*` incl. `PUT`):
`GET /conversations/:id/metrics` · `GET`/`PUT /conversations/:id/cwd` ·
`GET`/`PUT /conversations/:id/reasoning-effort` (sticky thinking-depth; `null` ⇒ default `high`) ·
`GET /conversations/:id/lsp` · `POST /chat/warm` · `POST /conversations/:id/close` (explicit
-tab-close: abort turn + stop/disable warming) · WS `chat.send`→`chat.delta` ·
-WS `chat.subscribe`/`chat.unsubscribe` (watch a conversation's turns without sending; replay + live).
+tab-close: abort turn + stop/disable warming) · **`POST /conversations/:id/queue`** (enqueue
+steering message; auto-starts a turn if idle) · WS `chat.send`→`chat.delta` ·
+WS `chat.subscribe`/`chat.unsubscribe` (watch a conversation's turns without sending; replay + live) ·
+**WS `chat.queue`** (enqueue steering; fire-and-forget — surface updates on success).
Mirrored in-repo for headless agents: `.dispatch/{ui-contract,wire,transport-contract}.reference.md`
(regenerate on any contract bump; all current as of `[email protected]` /
## 2. Open asks FOR THE BACKEND
diff --git a/src/app/App.svelte b/src/app/App.svelte
index dffa937..ee72ca5 100644
--- a/src/app/App.svelte
+++ b/src/app/App.svelte
@@ -19,11 +19,17 @@
import { manifest as conversationCacheManifest } from "../features/conversation-cache";
import { manifest as markdownManifest } from "../features/markdown";
import {
+ ChatLimitField,
+ manifest as settingsManifest,
+ type ChatLimitSaveResult,
+ } from "../features/settings";
+ import {
createSmartScrollController,
manifest as smartScrollManifest,
ScrollToBottom,
} from "../features/smart-scroll";
import { manifest as surfaceHostManifest, SurfaceView } from "../features/surface-host";
+ import { parseMessageQueuePayload } from "../features/surface-host/logic/message-queue";
import { manifest as tabsManifest, TabBar } from "../features/tabs";
import { manifest as viewsManifest, ViewSidebar } from "../features/views";
import {
@@ -42,6 +48,10 @@
// and keep it out of the generic Extensions surface list — SurfaceView itself
// stays fully generic (it never switches on a surface id).
const CACHE_WARMING_ID = "cache-warming";
+ // The message-queue extension's per-conversation surface (steering). Pulled
+ // out of the generic Extensions list and rendered as a compact panel above the
+ // composer — pending steering messages are tied to the chat, not the sidebar.
+ const MESSAGE_QUEUE_ID = "message-queue";
// The view kinds offered in the sidebar's dropdown. Generic data — the
// `viewContent` snippet below maps each kind id to its renderer.
@@ -50,10 +60,11 @@
{ id: "lsp", label: "Language Servers" },
{ id: "extensions", label: "Extensions" },
{ id: "cache-warming", label: "Cache Warming" },
+ { id: "settings", label: "Settings" },
] as const;
- // Default sidebar layout: Model panel on top, then Language Servers, Extensions, Cache Warming.
- const initialViews = ["model", "lsp", "extensions", "cache-warming"] as const;
+ // Default sidebar layout: Model, Language Servers, Extensions, Cache Warming, Settings.
+ const initialViews = ["model", "lsp", "extensions", "cache-warming", "settings"] as const;
// Frontend module list for the "Loaded Modules" view, AGGREGATED from each
// feature's public `manifest` export so it can't drift from what's actually
@@ -71,6 +82,7 @@
cacheWarmingManifest,
workspaceManifest,
smartScrollManifest,
+ settingsManifest,
].map((m) => [m.name, m.description] as const);
// Smart-scroll: keep the transcript pinned to the bottom while it streams,
@@ -120,6 +132,18 @@
smartScroll.contentChanged();
});
+ // The message-queue surface spec + whether it currently has pending messages
+ // (steering). Rendered as a compact panel above the composer only when non-empty.
+ const messageQueueSpec = $derived(store.surface(MESSAGE_QUEUE_ID));
+ const hasQueuedMessages = $derived.by(() => {
+ const spec = messageQueueSpec;
+ if (spec === null) return false;
+ const field = spec.fields.find((f) => f.kind === "custom" && f.rendererId === MESSAGE_QUEUE_ID);
+ if (field === undefined || field.kind !== "custom") return false;
+ const data = parseMessageQueuePayload(field.payload);
+ return data !== null && data.messages.length > 0;
+ });
+
// Conversation/tab switch → snap to the bottom of the new transcript.
$effect(() => {
void store.activeConversationId;
@@ -140,6 +164,10 @@
store.send(text);
}
+ function handleQueue(text: string) {
+ store.queueMessage(text);
+ }
+
function handleSelectModel(model: string) {
store.selectModel(model);
}
@@ -168,6 +196,25 @@
: { ok: false, error: result.error };
}
+ // Adapt the store's chat-limit result to the settings feature's port. On a
+ // raise the active chat refills (prepends older history); preserve the
+ // reader's viewport over the prepend (the manual analogue of CSS scroll
+ // anchoring), exactly like `handleShowEarlier`.
+ async function saveChatLimit(value: number): Promise<ChatLimitSaveResult> {
+ const el = transcriptEl;
+ const prevHeight = el?.scrollHeight ?? 0;
+ const prevTop = el?.scrollTop ?? 0;
+ const result = await store.setChatLimit(value);
+ await tick();
+ if (el) {
+ const delta = el.scrollHeight - prevHeight;
+ if (delta > 0) el.scrollTop = prevTop + delta;
+ }
+ return result.ok
+ ? { ok: true, chatLimit: result.chatLimit }
+ : { ok: false, error: result.error };
+ }
+
// Adapt the store's cwd/LSP results to the workspace feature's ports.
async function saveCwd(cwd: string): Promise<CwdSaveResult | null> {
const result = await store.setCwd(cwd);
@@ -262,8 +309,18 @@
<ScrollToBottom show={smartScroll.showButton} onResume={() => smartScroll.resume()} />
</div>
+ {#if hasQueuedMessages && messageQueueSpec !== null}
+ <!-- Pending steering messages (the message-queue surface). Rendered via
+ the generic SurfaceView (dispatches on rendererId, never surface id);
+ only shown when the queue is non-empty — an idle queue is hidden. -->
+ <div class="px-4 pt-2">
+ <SurfaceView spec={messageQueueSpec} onInvoke={handleInvoke} />
+ </div>
+ {/if}
+
<Composer
onSend={handleSend}
+ onQueue={handleQueue}
contextSize={store.activeChat.currentContextSize}
status={store.activeChat.error
? "error"
@@ -329,7 +386,7 @@
</section>
<section class="mt-4 flex flex-col gap-3">
<h3 class="text-xs font-semibold uppercase opacity-60">Surfaces</h3>
- {#each store.surfaces.filter((s) => s.id !== CACHE_WARMING_ID) as spec (spec.id)}
+ {#each store.surfaces.filter((s) => s.id !== CACHE_WARMING_ID && s.id !== MESSAGE_QUEUE_ID) as spec (spec.id)}
<SurfaceView {spec} onInvoke={handleInvoke} />
{/each}
</section>
@@ -344,5 +401,11 @@
{warmNow}
/>
{/key}
+ {:else if kind === "settings"}
+ <!-- FE-local settings. Not conversation-scoped (no {#key}: the chat limit is
+ global), so the field stays mounted across tab switches. -->
+ <div class="flex flex-col gap-3">
+ <ChatLimitField chatLimit={store.chatLimit} save={saveChatLimit} />
+ </div>
{/if}
{/snippet}
diff --git a/src/app/store.svelte.ts b/src/app/store.svelte.ts
index e8bb5e1..dc06ea1 100644
--- a/src/app/store.svelte.ts
+++ b/src/app/store.svelte.ts
@@ -60,6 +60,11 @@ export type ReasoningEffortResult =
| { readonly ok: true; readonly reasoningEffort: ReasoningEffort }
| { readonly ok: false; readonly error: string };
+/** Outcome of persisting a chat-limit setting (localStorage; FE-local). */
+export type ChatLimitResult =
+ | { readonly ok: true; readonly chatLimit: number }
+ | { readonly ok: false; readonly error: string };
+
export interface AppStore {
readonly tabs: readonly Tab[];
readonly activeConversationId: string | null;
@@ -73,6 +78,14 @@ export interface AppStore {
/** The current spec for one surface by id (discovery-by-id), or null if absent. */
surface(surfaceId: string): SurfaceSpec | null;
send(text: string): void;
+ /**
+ * Enqueue a steering message onto the focused conversation's queue
+ * (`chat.queue` WS op). While a turn is generating, the message is delivered
+ * mid-turn at the next tool-result boundary; when idle, the server
+ * auto-starts a turn (equivalent to `send`). Safe to offer whenever the user
+ * wants to add input — the server owns the idle-vs-generating decision.
+ */
+ queueMessage(text: string): void;
selectModel(model: string): void;
newDraft(): void;
selectTab(conversationId: string): void;
@@ -109,6 +122,16 @@ export interface AppStore {
* The backend lazily spawns servers, so this may take a moment on the first call for a cwd.
*/
lspStatus(): Promise<LspResult | null>;
+ /** The persisted chat limit (max loaded chunks per conversation). */
+ readonly chatLimit: number;
+ /**
+ * Persist + live-apply a new chat limit: writes `dispatch.chatLimit` to
+ * localStorage and propagates to every live chat store (trim if lower,
+ * deferred via the unload gate while a reader is scrolled up; no-op if
+ * higher — page unloaded history back in via "Show earlier"). Stores created
+ * afterwards pick the new limit up at creation. Always succeeds (FE-local).
+ */
+ setChatLimit(limit: number): Promise<ChatLimitResult>;
/**
* Wire the chat-limit unload gate (composition-root injection, called once by
* the shell after it owns the scroll region): unloading old chunks is allowed
@@ -189,15 +212,17 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
const tabsStore: TabsStore = createTabsStore(storageAdapter);
// The chat limit (max loaded chunks per conversation) — a persisted local
- // setting with no UI yet: edit `localStorage["dispatch.chatLimit"]`. The
- // default is written back on first run so the knob is discoverable.
+ // setting surfaced in the sidebar's Settings view. Reactive so the field +
+ // any live-apply re-trim update together. The default is written back on
+ // first run so the knob is discoverable in localStorage too.
const chatLimitStore = createLocalStore<number>("dispatch.chatLimit", {
storage: localStorageOpt,
});
const storedChatLimit = chatLimitStore.load();
- const chatLimit = normalizeChatLimit(storedChatLimit);
+ const normalizedChatLimit = normalizeChatLimit(storedChatLimit);
+ let chatLimit = $state(normalizedChatLimit);
if (storedChatLimit === null) {
- chatLimitStore.save(chatLimit);
+ chatLimitStore.save(normalizedChatLimit);
}
// Unload gate — attached by the shell once it owns the scroll region (see
@@ -225,7 +250,11 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
historySync,
metricsSync,
cache,
- chatLimit,
+ // Read from the persisted store (kept in sync with the reactive `chatLimit`
+ // by `setChatLimit` + boot) so this snapshot doesn't reference the `$state`
+ // — each store captures its limit at creation; live updates go through
+ // `setChatLimit`.
+ chatLimit: normalizeChatLimit(chatLimitStore.load()),
canUnload: () => (unloadGate === null ? true : unloadGate()),
});
}
@@ -516,6 +545,9 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
get reasoningEffort(): ReasoningEffort | null {
return reasoningEffort;
},
+ get chatLimit(): number {
+ return chatLimit;
+ },
get currentConversationId(): string {
return workspaceConversationId();
},
@@ -555,6 +587,15 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
}
},
+ queueMessage(text: string): void {
+ // Only offered while generating (Composer switches to `chat.queue`
+ // when `status === "running"`), so a draft (never generating) never
+ // reaches here. `chat.queue` auto-starts a turn if idle, so even a race
+ // (turn sealed between the status read and the send) is safe — the
+ // server starts a fresh turn with the message as its opening prompt.
+ activeChat.queueMessage(text);
+ },
+
selectModel(model: string): void {
activeModel = model;
const activeId = tabsStore.activeConversationId;
@@ -695,6 +736,24 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
}
},
+ async setChatLimit(limit: number): Promise<ChatLimitResult> {
+ const next = normalizeChatLimit(limit);
+ chatLimitStore.save(next);
+ chatLimit = next;
+ // Propagate to every live chat store. The ACTIVE one is awaited so its
+ // refill (on a raise) lands before the caller returns — letting the
+ // shell preserve scroll over the prepended older chunks. Background
+ // stores refill fire-and-forget. Future stores pick the new limit up at
+ // creation (via the persisted store).
+ const active = getActiveChat();
+ await active.setChatLimit(next);
+ for (const s of chatStores.values()) {
+ if (s !== active) void s.setChatLimit(next);
+ }
+ if (draftStore !== active) void draftStore.setChatLimit(next);
+ return { ok: true, chatLimit: next };
+ },
+
async lspStatus(): Promise<LspResult | null> {
const id = workspaceConversationId();
try {
diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts
index 35a586c..a346545 100644
--- a/src/core/chunks/reducer.test.ts
+++ b/src/core/chunks/reducer.test.ts
@@ -7,6 +7,7 @@ import type {
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
+ TurnSteeringEvent,
TurnTextDeltaEvent,
TurnToolCallEvent,
TurnToolResultEvent,
@@ -437,6 +438,59 @@ describe("foldEvent — user-message (the turn's user prompt; backend CR-3)", ()
});
});
+describe("foldEvent — steering (mid-turn steering injection)", () => {
+ const steering = (text: string): TurnSteeringEvent => ({
+ type: "steering",
+ conversationId: "c1",
+ turnId: "t1",
+ text,
+ });
+
+ it("appends a provisional user bubble + keeps generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, toolResult("t1", "tc1", "read", "output"));
+ s = foldEvent(s, steering("actually, use a different file"));
+ const chunks = selectChunks(s);
+ const last = chunks[chunks.length - 1];
+ expect(last?.role).toBe("user");
+ expect(last?.chunk).toEqual({ type: "text", text: "actually, use a different file" });
+ expect(last?.provisional).toBe(true);
+ expect(s.generating).toBe(true);
+ });
+
+ it("does NOT dedup against the sender's queue (unlike user-message)", () => {
+ // The sender enqueued the message via `chat.queue` — the queue SURFACE
+ // showed it. The `steering` event places it in the transcript; the surface
+ // separately clears on drain. No de-dup here (the transcript never showed
+ // the queued message).
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, steering("steer once"));
+ s = foldEvent(s, steering("steer again"));
+ const users = selectChunks(s).filter((c) => c.role === "user");
+ expect(users).toHaveLength(2);
+ });
+
+ it("ignores an empty steering event", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, steering(""));
+ expect(selectChunks(s)).toHaveLength(0);
+ expect(s.generating).toBe(true); // turn-start already set it
+ });
+
+ it("flushes an accumulating chunk before appending the steering bubble", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "partial response"));
+ s = foldEvent(s, steering("mid-turn correction"));
+ expect(s.accumulating).toBeNull();
+ const roles = selectChunks(s).map((c) => c.role);
+ expect(roles).toEqual(["assistant", "user"]);
+ });
+});
+
describe("applyHistory", () => {
it("orders committed chunks by seq", () => {
const s = initialState();
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts
index 0a57839..035846c 100644
--- a/src/core/chunks/reducer.ts
+++ b/src/core/chunks/reducer.ts
@@ -83,6 +83,8 @@ export function applyHistory(
* - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one).
* - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and
* add a new provisional chunk.
+ * - `steering` appends a user bubble mid-turn (drained from the message queue
+ * at a tool-result boundary; the queue surface separately clears on drain).
* - `usage` stores the latest Usage.
* - `done` finalizes any accumulating chunk (turn still provisional).
* - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId.
@@ -239,6 +241,23 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
generating: false,
};
}
+
+ case "steering": {
+ // A steering message drained from the queue at a tool-result boundary
+ // (the model sees it alongside the tool results). Append a user bubble
+ // to the provisional transcript; the turn is still in flight. The queue
+ // surface clears separately on drain (a different channel) — no de-dup
+ // here (unlike `user-message`, steering is never optimistically echoed
+ // into the transcript by the sender).
+ if (event.text.length === 0) return state;
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ return {
+ ...state,
+ provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }],
+ accumulating: null,
+ generating: true,
+ };
+ }
}
}
diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts
index a258873..2fdd3cb 100644
--- a/src/core/wire/conformance.test.ts
+++ b/src/core/wire/conformance.test.ts
@@ -75,6 +75,7 @@ describe("classifies every AgentEvent type", () => {
{ type: "error", conversationId: "c1", turnId: "t1", message: "oops" },
{ type: "done", conversationId: "c1", turnId: "t1", reason: "complete" },
{ type: "turn-sealed", conversationId: "c1", turnId: "t1" },
+ { type: "steering", conversationId: "c1", turnId: "t1", text: "steer mid-turn" },
];
it("returns a stable label for every AgentEvent.type variant", () => {
@@ -93,11 +94,12 @@ describe("classifies every AgentEvent type", () => {
"error",
"done",
"turn-sealed",
+ "steering",
]);
});
- it("covers all 13 AgentEvent variants", () => {
- expect(samples).toHaveLength(13);
+ it("covers all 14 AgentEvent variants", () => {
+ expect(samples).toHaveLength(14);
});
});
@@ -152,6 +154,7 @@ describe("classifies every WsClientMessage type", () => {
{ type: "chat.send" as const, message: "hi" },
{ type: "chat.subscribe" as const, conversationId: "c1" },
{ type: "chat.unsubscribe" as const, conversationId: "c1" },
+ { type: "chat.queue" as const, conversationId: "c1", text: "steer" },
];
const labels = msgs.map(assertWsClientMessageExhaustive);
expect(labels).toEqual([
@@ -161,6 +164,7 @@ describe("classifies every WsClientMessage type", () => {
"chat.send",
"chat.subscribe",
"chat.unsubscribe",
+ "chat.queue",
]);
});
});
diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts
index 13be78c..6e87e5c 100644
--- a/src/core/wire/conformance.ts
+++ b/src/core/wire/conformance.ts
@@ -34,6 +34,8 @@ export function assertAgentEventExhaustive(event: AgentEvent): string {
return "turn-sealed";
case "step-complete":
return "step-complete";
+ case "steering":
+ return "steering";
default:
return event satisfies never;
}
@@ -102,6 +104,8 @@ export function assertWsClientMessageExhaustive(msg: WsClientMessage): string {
return "chat.subscribe";
case "chat.unsubscribe":
return "chat.unsubscribe";
+ case "chat.queue":
+ return "chat.queue";
default:
return msg satisfies never;
}
diff --git a/src/features/chat/ports.ts b/src/features/chat/ports.ts
index f8c665f..ffe2c94 100644
--- a/src/features/chat/ports.ts
+++ b/src/features/chat/ports.ts
@@ -1,12 +1,17 @@
import type {
+ ChatQueueMessage,
ChatSendMessage,
ConversationHistoryResponse,
ConversationMetricsResponse,
} from "@dispatch/transport-contract";
-/** Injected transport port — sends chat messages to the server. */
+/**
+ * Injected transport port — sends chat messages to the server. Accepts both
+ * `chat.send` (start a turn) and `chat.queue` (enqueue a steering message;
+ * auto-starts a turn if idle).
+ */
export interface ChatTransport {
- send(msg: ChatSendMessage): void;
+ send(msg: ChatSendMessage | ChatQueueMessage): void;
}
/**
diff --git a/src/features/chat/store.svelte.ts b/src/features/chat/store.svelte.ts
index e74980d..9beabfc 100644
--- a/src/features/chat/store.svelte.ts
+++ b/src/features/chat/store.svelte.ts
@@ -1,9 +1,10 @@
import type {
ChatDeltaMessage,
ChatErrorMessage,
+ ChatQueueMessage,
ChatSendMessage,
} from "@dispatch/transport-contract";
-import type { ChatMessage } from "@dispatch/wire";
+import type { ChatMessage, StoredChunk } from "@dispatch/wire";
import type { RenderedChunk, TranscriptState } from "../../core/chunks";
import {
appendUserMessage,
@@ -89,7 +90,29 @@ export interface ChatStore {
readonly thinkingKeyBase: number;
handleDelta(msg: ChatDeltaMessage | ChatErrorMessage): void;
send(text: string): void;
+ /**
+ * Enqueue a steering message onto the conversation's queue (`chat.queue`
+ * WS op). While a turn is generating, the message is delivered mid-turn at
+ * the next tool-result boundary (a `steering` `AgentEvent` fires + the
+ * message-queue surface updates). When no turn is active, the server
+ * auto-starts a turn with the message as its opening prompt (equivalent to
+ * `chat.send`). No optimistic transcript echo — the queue SURFACE carries the
+ * pending message until drain; the `steering` event places it in the
+ * transcript. `text` must be non-empty (the server 400/errors otherwise).
+ */
+ queueMessage(text: string): void;
setModel(model: string): void;
+ /**
+ * Update the chat limit LIVE: re-normalizes, then adjusts the loaded window.
+ * Lowering it unloads older committed chunks (deferred via the gate while the
+ * reader is scrolled up, catching up on the next mutation). Raising it
+ * REFILLS older history (cache first, then CR-5 `?beforeSeq=`) up to the
+ * fresh-load window (`initialWindowSize` = 75% of the limit) — the same
+ * window a fresh `load()` would show — so upping the limit reveals more
+ * history instead of leaving a partial view. New deltas + loads use the new
+ * limit. The refill awaits, so a caller can preserve scroll over the prepend.
+ */
+ setChatLimit(limit: number): Promise<void>;
load(): Promise<void>;
/**
* Page one unload-unit (`ceil(limit/4)`) of earlier history back in — the
@@ -117,7 +140,7 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
let _model = $state<string | undefined>(deps.model);
let disposed = false;
- const chatLimit = normalizeChatLimit(deps.chatLimit);
+ let chatLimit = normalizeChatLimit(deps.chatLimit);
/**
* Enforce the chat limit after a transcript mutation — unless the injected
@@ -166,6 +189,52 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
}
}
+ /**
+ * Fetch up to `want` older chunks (seq < `oldest`) — cache first, then a
+ * CR-5 `?beforeSeq=&limit=` server backfill when the cache is too shallow,
+ * persisting it so the next read is local. Returns every locally-known
+ * chunk older than `oldest` (the caller — `restoreEarlier` — takes the
+ * newest `count` of them). Shared by `showEarlier` and the raise-refill.
+ */
+ async function backfillOlder(oldest: number, want: number): Promise<readonly StoredChunk[]> {
+ let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest);
+ const oldestKnown = earlier[0]?.seq ?? oldest;
+ if (earlier.length < want && oldestKnown > 1) {
+ const res = await deps.historySync(deps.conversationId, 0, {
+ beforeSeq: oldestKnown,
+ limit: want - earlier.length,
+ });
+ const merged = await deps.cache.commit(deps.conversationId, res.chunks);
+ earlier = merged.filter((c) => c.seq < oldest);
+ }
+ return earlier;
+ }
+
+ /**
+ * Refill toward the fresh-load window after a limit RAISE: pull older
+ * history (cache first, then server) so the loaded set grows to match what a
+ * fresh `load()` would show at the new limit. No-op when already at the
+ * origin (seq 1) or already within the window. `restoreEarlier` re-derives
+ * the window start at apply time, so a delta landing during the await can't
+ * corrupt the merge. NOT gated (refilling prepends above the viewport; the
+ * caller preserves scroll position).
+ */
+ async function refill(): Promise<void> {
+ if (disposed) return;
+ const oldest = transcript.committed[0]?.seq ?? transcript.hiddenBeforeSeq;
+ if (oldest <= 1) return;
+ const want = initialWindowSize(chatLimit) - transcript.committed.length;
+ if (want <= 0) return;
+ try {
+ const earlier = await backfillOlder(oldest, want);
+ if (earlier.length === 0) return;
+ transcript = restoreEarlier(transcript, earlier, want);
+ _error = null;
+ } catch (err) {
+ _error = err instanceof Error ? err.message : String(err);
+ }
+ }
+
return {
get messages(): readonly ChatMessage[] {
return selectMessages(transcript);
@@ -230,10 +299,31 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
deps.transport.send(msg);
},
+ queueMessage(text: string): void {
+ const trimmed = text.trim();
+ if (trimmed.length === 0) return;
+ const msg: ChatQueueMessage = {
+ type: "chat.queue",
+ conversationId: deps.conversationId,
+ text: trimmed,
+ };
+ deps.transport.send(msg);
+ },
+
setModel(model: string): void {
_model = model;
},
+ async setChatLimit(limit: number): Promise<void> {
+ const prev = chatLimit;
+ chatLimit = normalizeChatLimit(limit);
+ if (chatLimit < prev) {
+ maybeTrim();
+ } else if (chatLimit > prev) {
+ await refill();
+ }
+ },
+
async load(): Promise<void> {
// Fresh load shows only the newest 75% of the limit — headroom before the
// first trim. A warm cache is windowed locally (synchronously with its
@@ -256,23 +346,7 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
if (oldest <= 1) return;
const want = unloadCount(chatLimit);
try {
- let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest);
- // The local cache may not reach far enough back (a server-windowed
- // fresh load cached only the window): page the missing OLDER run in
- // from the server (CR-5 `?beforeSeq=&limit=`) and persist it, so the
- // next page-in is local. Seqs are gap-free, so the fetched run is
- // contiguous with what we hold. NOTE: the backfill response's
- // `latestSeq` is a window cursor — never fed to the tail cursor
- // (ours derives from the cache's max seq).
- const oldestKnown = earlier[0]?.seq ?? oldest;
- if (earlier.length < want && oldestKnown > 1) {
- const res = await deps.historySync(deps.conversationId, 0, {
- beforeSeq: oldestKnown,
- limit: want - earlier.length,
- });
- const merged = await deps.cache.commit(deps.conversationId, res.chunks);
- earlier = merged.filter((c) => c.seq < oldest);
- }
+ const earlier = await backfillOlder(oldest, want);
transcript = restoreEarlier(transcript, earlier, want);
_error = null;
} catch (err) {
diff --git a/src/features/chat/store.test.ts b/src/features/chat/store.test.ts
index 3232009..2d75139 100644
--- a/src/features/chat/store.test.ts
+++ b/src/features/chat/store.test.ts
@@ -144,6 +144,93 @@ describe("createChatStore", () => {
store.dispose();
});
+ describe("queueMessage (chat.queue — steering)", () => {
+ it("posts a chat.queue with conversationId + text", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage("steer left");
+
+ expect(transport.sent).toHaveLength(0); // chat.send stays empty
+ expect(transport.sentQueue).toHaveLength(1);
+ expect(transport.sentQueue[0]?.type).toBe("chat.queue");
+ expect(transport.sentQueue[0]?.conversationId).toBe(CONV_ID);
+ expect(transport.sentQueue[0]?.text).toBe("steer left");
+
+ store.dispose();
+ });
+
+ it("trims whitespace before sending", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage(" padded ");
+
+ expect(transport.sentQueue[0]?.text).toBe("padded");
+
+ store.dispose();
+ });
+
+ it("does not send for empty/whitespace-only text", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage(" ");
+ store.queueMessage("");
+
+ expect(transport.sentQueue).toHaveLength(0);
+
+ store.dispose();
+ });
+
+ it("does NOT optimistically echo into the transcript (the surface carries the queue)", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage("queued steering message");
+
+ expect(store.chunks).toHaveLength(0); // no transcript echo
+
+ store.dispose();
+ });
+ });
+
it("chat.error sets error", () => {
const transport = createFakeTransport();
const historySync = createFakeHistorySync();
@@ -1248,6 +1335,195 @@ describe("createChatStore", () => {
store.dispose();
});
+ it("setChatLimit: lowering the limit trims older committed chunks live", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+
+ // Load 80 committed chunks (under the limit — no trim yet).
+ historySync.returnChunks = Array.from({ length: 80 }, (_, i) => makeStoredChunk(i + 1));
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" }));
+ await vi.waitFor(() => {
+ expect(store.chunks).toHaveLength(80);
+ });
+
+ // Lower the limit to 10: 80 → unload ceil(10/4)=3 per quarter, needs
+ // ceil((80-10)/3)=24 quarters → drop min(72, 80)=72 → 8 remain.
+ await store.setChatLimit(10);
+ expect(store.chunks).toHaveLength(8);
+ expect(store.chunks[0]?.seq).toBe(73);
+ expect(store.hasEarlier).toBe(true);
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising the limit refills older history up to the fresh-load window", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ // Cache holds 200 chunks; load at limit 100 → window 75 → seqs 126..200.
+ await cache.impl.commit(
+ CONV_ID,
+ Array.from({ length: 200 }, (_, i) => makeStoredChunk(i + 1)),
+ );
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+ await store.load();
+ expect(store.chunks).toHaveLength(75);
+ expect(store.chunks[0]?.seq).toBe(126);
+ expect(store.hasEarlier).toBe(true);
+
+ // Raise to 200 → window floor(0.75×200)=150 → refill 75 older chunks
+ // (seqs 51..125) from the cache. No server backfill (cache is deep enough).
+ await store.setChatLimit(200);
+ expect(historySync.calls).toHaveLength(1); // the load-time tail sync only
+ expect(store.chunks).toHaveLength(150);
+ expect(store.chunks[0]?.seq).toBe(51);
+ expect(store.hasEarlier).toBe(true); // 51 > 1
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising backfills from the server when the cache is too shallow", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ // Server holds 200; cold-cache load at limit 100 → window 75 → seqs 126..200.
+ historySync.returnChunks = Array.from({ length: 200 }, (_, i) => makeStoredChunk(i + 1));
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+ await store.load();
+ expect(store.chunks[0]?.seq).toBe(126);
+
+ // Raise to 200 → want 75 older. Cache only holds 126..200 → backfill
+ // seqs 51..125 from the server (CR-5 ?beforeSeq=126&limit=75).
+ await store.setChatLimit(200);
+ const backfill = historySync.calls[1];
+ expect(backfill?.window).toEqual({ beforeSeq: 126, limit: 75 });
+ expect(store.chunks).toHaveLength(150);
+ expect(store.chunks[0]?.seq).toBe(51);
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising refills all available older history (down to the origin)", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+
+ // 101 chunks → one trim pass drops 25 → 76 remain (seqs 26..101).
+ historySync.returnChunks = Array.from({ length: 101 }, (_, i) => makeStoredChunk(i + 1));
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" }));
+ await vi.waitFor(() => {
+ expect(store.chunks).toHaveLength(76);
+ });
+ expect(store.chunks[0]?.seq).toBe(26);
+ expect(store.hasEarlier).toBe(true);
+
+ // Raise to 500 → window 375 → want 299 older. The cache holds only
+ // seqs 1..25 below the window (no more server-side) → restore all 25 →
+ // 101 loaded, reaching the origin.
+ await store.setChatLimit(500);
+ expect(store.chunks).toHaveLength(101);
+ expect(store.chunks[0]?.seq).toBe(1);
+ expect(store.hasEarlier).toBe(false);
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising is a no-op when the window already starts at the origin", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ await cache.impl.commit(
+ CONV_ID,
+ Array.from({ length: 50 }, (_, i) => makeStoredChunk(i + 1)),
+ );
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+ await store.load(); // only 50 chunks → all loaded, window starts at seq 1
+ expect(store.chunks).toHaveLength(50);
+ expect(store.hasEarlier).toBe(false);
+ const callsAfterLoad = historySync.calls.length;
+
+ await store.setChatLimit(500); // raise → refill no-ops (oldest = 1)
+ expect(store.chunks).toHaveLength(50);
+ expect(store.chunks[0]?.seq).toBe(1);
+ expect(historySync.calls).toHaveLength(callsAfterLoad); // no backfill
+
+ store.dispose();
+ });
+
+ it("setChatLimit: a nonsensical value is normalized (no crash, no trim)", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+
+ historySync.returnChunks = Array.from({ length: 50 }, (_, i) => makeStoredChunk(i + 1));
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" }));
+ await vi.waitFor(() => {
+ expect(store.chunks).toHaveLength(50);
+ });
+
+ // NaN normalizes to the default (256). prev was 100 → raise → refill,
+ // but the loaded window already starts at seq 1 (origin) → no-op.
+ await store.setChatLimit(Number.NaN);
+ expect(store.chunks).toHaveLength(50);
+
+ store.dispose();
+ });
+
it("resync is a no-op after dispose", async () => {
const transport = createFakeTransport();
const historySync = createFakeHistorySync();
diff --git a/src/features/chat/test-helpers.ts b/src/features/chat/test-helpers.ts
index 6bb98a1..100449f 100644
--- a/src/features/chat/test-helpers.ts
+++ b/src/features/chat/test-helpers.ts
@@ -1,19 +1,29 @@
+import type { ChatQueueMessage, ChatSendMessage } from "@dispatch/transport-contract";
import type { StoredChunk } from "@dispatch/wire";
import type { ConversationCache } from "../conversation-cache";
import type { ChatTransport, HistorySync, HistoryWindow, MetricsSync } from "./ports";
export interface FakeTransport {
- readonly sent: import("@dispatch/transport-contract").ChatSendMessage[];
+ /** All `chat.send` messages sent through the fake transport. */
+ readonly sent: ChatSendMessage[];
+ /** All `chat.queue` messages sent through the fake transport. */
+ readonly sentQueue: ChatQueueMessage[];
readonly impl: ChatTransport;
}
export function createFakeTransport(): FakeTransport {
- const sent: import("@dispatch/transport-contract").ChatSendMessage[] = [];
+ const sent: ChatSendMessage[] = [];
+ const sentQueue: ChatQueueMessage[] = [];
return {
sent,
+ sentQueue,
impl: {
send(msg) {
- sent.push(msg);
+ if (msg.type === "chat.queue") {
+ sentQueue.push(msg);
+ } else {
+ sent.push(msg);
+ }
},
},
};
diff --git a/src/features/chat/ui/Composer.svelte b/src/features/chat/ui/Composer.svelte
index 24c2c19..d519efc 100644
--- a/src/features/chat/ui/Composer.svelte
+++ b/src/features/chat/ui/Composer.svelte
@@ -8,10 +8,18 @@
let {
onSend,
+ onQueue,
contextSize = undefined,
status = "idle",
}: {
onSend: (text: string) => void;
+ /**
+ * Enqueue a steering message (`chat.queue`). When provided AND the status
+ * is `running`, the send button becomes a "Queue" button that steers the
+ * in-flight turn instead of starting a new one. When absent, `onSend` is
+ * used regardless (tests / non-steering contexts).
+ */
+ onQueue?: (text: string) => void;
// Current context occupancy (latest turn's contextSize), or `undefined`
// when unknown — the status bar then shows "— tokens", never 0%.
contextSize?: number | undefined;
@@ -26,6 +34,13 @@
const usage = $derived(computeContextUsage(contextSize, MAX_CONTEXT));
const hasUsage = $derived(contextSize !== undefined);
+ // While a turn is generating, the send button becomes a "Queue" button that
+ // enqueues a steering message (`chat.queue`) instead of starting a new turn
+ // (`chat.send`). Falls back to `onSend` when no `onQueue` is wired.
+ const steering = $derived(status === "running" && onQueue !== undefined);
+ const submitLabel = $derived(steering ? "Queue" : "Send");
+ const placeholder = $derived(steering ? "Steer the conversation..." : "Type a message...");
+
// As the window fills, escalate color: calm → warning → danger.
function fillClass(pct: number): string {
if (pct >= 90) return "progress-error";
@@ -58,7 +73,11 @@
function handleSubmit(): void {
const trimmed = text.trim();
if (trimmed.length === 0) return;
- onSend(trimmed);
+ if (steering) {
+ onQueue?.(trimmed);
+ } else {
+ onSend(trimmed);
+ }
text = "";
}
@@ -84,12 +103,12 @@
class="textarea textarea-bordered flex-1 resize-none leading-normal !min-h-0 h-auto"
bind:value={text}
onkeydown={handleKeydown}
- placeholder="Type a message..."
+ placeholder={placeholder}
rows="1"
aria-label="Message input"
></textarea>
<button class="btn btn-primary w-20 shrink-0" type="submit" disabled={!hasText}>
- Send
+ {submitLabel}
</button>
</div>
diff --git a/src/features/surface-host/logic/message-queue.test.ts b/src/features/surface-host/logic/message-queue.test.ts
new file mode 100644
index 0000000..ce078d9
--- /dev/null
+++ b/src/features/surface-host/logic/message-queue.test.ts
@@ -0,0 +1,48 @@
+import type { QueuedMessage } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import { parseMessageQueuePayload } from "./message-queue";
+
+const msg = (id: string, text: string, queuedAt = 1_700_000_000_000): QueuedMessage => ({
+ id,
+ text,
+ queuedAt,
+});
+
+describe("parseMessageQueuePayload", () => {
+ it("parses a well-formed payload with messages", () => {
+ const data = parseMessageQueuePayload({
+ messages: [msg("m1", "steer left"), msg("m2", "actually, go right")],
+ });
+ expect(data).toEqual({
+ messages: [msg("m1", "steer left"), msg("m2", "actually, go right")],
+ });
+ });
+
+ it("parses an empty-messages payload (queue is empty)", () => {
+ expect(parseMessageQueuePayload({ messages: [] })).toEqual({ messages: [] });
+ });
+
+ it("preserves message order", () => {
+ const data = parseMessageQueuePayload({
+ messages: [msg("a", "first"), msg("b", "second"), msg("c", "third")],
+ });
+ expect(data?.messages.map((m) => m.id)).toEqual(["a", "b", "c"]);
+ });
+
+ it.each([
+ ["null", null],
+ ["a number", 7],
+ ["a string", "nope"],
+ ["missing messages key", { foo: [] }],
+ ["messages not an array", { messages: "x" }],
+ ["entry not an object", { messages: ["x"] }],
+ ["entry missing id", { messages: [{ text: "x", queuedAt: 1 }] }],
+ ["entry with non-string id", { messages: [{ id: 1, text: "x", queuedAt: 1 }] }],
+ ["entry missing text", { messages: [{ id: "m1", queuedAt: 1 }] }],
+ ["entry with non-string text", { messages: [{ id: "m1", text: 1, queuedAt: 1 }] }],
+ ["entry missing queuedAt", { messages: [{ id: "m1", text: "x" }] }],
+ ["entry with non-finite queuedAt", { messages: [msg("m1", "x", Number.NaN)] }],
+ ])("returns null for invalid payload: %s", (_label, payload) => {
+ expect(parseMessageQueuePayload(payload)).toBeNull();
+ });
+});
diff --git a/src/features/surface-host/logic/message-queue.ts b/src/features/surface-host/logic/message-queue.ts
new file mode 100644
index 0000000..a8e1567
--- /dev/null
+++ b/src/features/surface-host/logic/message-queue.ts
@@ -0,0 +1,45 @@
+import type { QueuedMessage } from "@dispatch/wire";
+
+/**
+ * Pure parser for the `rendererId: "message-queue"` custom-field payload.
+ *
+ * The message-queue extension's per-conversation surface emits ONE `custom`
+ * field with `rendererId: "message-queue"` and `payload: QueuePayload`
+ * (`{ messages: QueuedMessage[] }` — the current queue snapshot). This parser
+ * validates the untyped `payload: unknown` at the network seam so a
+ * hostile/partial payload can never crash the renderer (graceful skip → null).
+ *
+ * Empty `messages` is a valid, parseable state (the queue is empty — nothing to
+ * render); the caller hides the panel. Null is returned only for a malformed
+ * payload shape.
+ */
+export interface MessageQueueData {
+ readonly messages: readonly QueuedMessage[];
+}
+
+function isQueuedMessage(v: unknown): v is QueuedMessage {
+ if (typeof v !== "object" || v === null) return false;
+ const o = v as Record<string, unknown>;
+ return (
+ typeof o.id === "string" &&
+ typeof o.text === "string" &&
+ typeof o.queuedAt === "number" &&
+ Number.isFinite(o.queuedAt)
+ );
+}
+
+export function parseMessageQueuePayload(payload: unknown): MessageQueueData | null {
+ if (typeof payload !== "object" || payload === null) return null;
+ const obj = payload as Record<string, unknown>;
+ const raw = obj.messages;
+ if (!Array.isArray(raw)) return null;
+ const messages: QueuedMessage[] = [];
+ for (const entry of raw) {
+ if (!isQueuedMessage(entry)) return null;
+ messages.push(entry);
+ }
+ return { messages };
+}
+
+/** The `rendererId` the message-queue extension's `custom` surface field uses. */
+export const MESSAGE_QUEUE_RENDERER_ID = "message-queue";
diff --git a/src/features/surface-host/ui/MessageQueueList.svelte b/src/features/surface-host/ui/MessageQueueList.svelte
new file mode 100644
index 0000000..12de970
--- /dev/null
+++ b/src/features/surface-host/ui/MessageQueueList.svelte
@@ -0,0 +1,22 @@
+<script lang="ts">
+ import { parseMessageQueuePayload } from "../logic/message-queue";
+
+ let { payload }: { readonly payload: unknown } = $props();
+
+ // Parse defensively; an unparseable payload yields null → render nothing
+ // (graceful skip, per the custom-field contract).
+ const data = $derived(parseMessageQueuePayload(payload));
+</script>
+
+{#if data !== null && data.messages.length > 0}
+ <ul class="flex flex-col gap-1 text-sm">
+ {#each data.messages as msg (msg.id)}
+ <li class="rounded-box bg-base-200 px-3 py-2">
+ <p class="whitespace-pre-wrap">{msg.text}</p>
+ <time class="text-xs opacity-50" datetime={new Date(msg.queuedAt).toISOString()}>
+ {new Date(msg.queuedAt).toLocaleTimeString()}
+ </time>
+ </li>
+ {/each}
+ </ul>
+{/if}
diff --git a/src/features/surface-host/ui/SurfaceView.svelte b/src/features/surface-host/ui/SurfaceView.svelte
index 24be8b8..e5f807a 100644
--- a/src/features/surface-host/ui/SurfaceView.svelte
+++ b/src/features/surface-host/ui/SurfaceView.svelte
@@ -2,6 +2,7 @@
import type { InvokeMessage, SurfaceSpec } from "@dispatch/ui-contract";
import { groupRenderFields, planSurface } from "../logic/plan";
import Button from "./Button.svelte";
+ import MessageQueueList from "./MessageQueueList.svelte";
import Number from "./Number.svelte";
import Progress from "./Progress.svelte";
import Selector from "./Selector.svelte";
@@ -40,6 +41,8 @@
unknown ids gracefully render nothing. -->
{#if group.field.rendererId === "table"}
<SurfaceTable payload={group.field.payload} />
+ {:else if group.field.rendererId === "message-queue"}
+ <MessageQueueList payload={group.field.payload} />
{/if}
{/if}
{/each}