From d98a63ce17519983dcf58c27432723e2f4b96e75 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Sun, 21 Jun 2026 02:19:54 +0900 Subject: feat(chat): message queue + steering — mid-turn injection at tool-result boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consume the message-queue + steering handoff (wire@0.8.0, transport-contract@0.12.0). Re-pinned file: deps + re-mirrored .dispatch/*.reference.md. - fold steering AgentEvent into the transcript as a provisional user bubble (after the tool-result it followed; no de-dup — the queue surface carried it) - add rendererId: "message-queue" custom renderer (pure parser + MessageQueueList) rendered as a compact panel above the Composer (hidden when queue is empty) - add ChatStore.queueMessage / AppStore.queueMessage — sends chat.queue WS op (trim/validate non-empty; auto-starts a turn if idle) - Composer switches to chat.queue while generating (button → Queue, placeholder → Steer the conversation...) - exhaustiveness guards updated for steering + chat.queue - carry-to-new-turn needs no special handling (normal new turn) 664 tests green. --- .dispatch/transport-contract.reference.md | 94 ++++++- .dispatch/wire.reference.md | 85 ++++++- GLOSSARY.md | 4 +- backend-handoff.md | 42 +++- src/app/App.svelte | 69 +++++- src/app/store.svelte.ts | 69 +++++- src/core/chunks/reducer.test.ts | 54 ++++ src/core/chunks/reducer.ts | 19 ++ src/core/wire/conformance.test.ts | 8 +- src/core/wire/conformance.ts | 4 + src/features/chat/ports.ts | 9 +- src/features/chat/store.svelte.ts | 112 +++++++-- src/features/chat/store.test.ts | 276 +++++++++++++++++++++ src/features/chat/test-helpers.ts | 16 +- src/features/chat/ui/Composer.svelte | 25 +- .../surface-host/logic/message-queue.test.ts | 48 ++++ src/features/surface-host/logic/message-queue.ts | 45 ++++ .../surface-host/ui/MessageQueueList.svelte | 22 ++ src/features/surface-host/ui/SurfaceView.svelte | 3 + 19 files changed, 949 insertions(+), 55 deletions(-) create mode 100644 src/features/surface-host/logic/message-queue.test.ts create mode 100644 src/features/surface-host/logic/message-queue.ts create mode 100644 src/features/surface-host/ui/MessageQueueList.svelte diff --git a/.dispatch/transport-contract.reference.md b/.dispatch/transport-contract.reference.md index 1c3d993..18d1a3d 100644 --- a/.dispatch/transport-contract.reference.md +++ b/.dispatch/transport-contract.reference.md @@ -5,10 +5,31 @@ > hangs on a permission prompt). Your CODE still imports `@dispatch/transport-contract` normally — > this file is for READING only. > -> **Orchestrator:** SNAPSHOT of `transport-contract@0.11.0` (reasoning effort shipped). -> Depends on `@dispatch/wire@0.7.0` (see `wire.reference.md`) + `@dispatch/ui-contract@0.2.0` (see +> **Orchestrator:** SNAPSHOT of `transport-contract@0.12.0` (message queue + steering). +> Depends on `@dispatch/wire@0.8.0` (see `wire.reference.md`) + `@dispatch/ui-contract@0.2.0` (see > `ui-contract.reference.md`). > +> **2026-06-21 delta (message-queue + steering handoff — package bumped `0.11.0` → `0.12.0`, ADDITIVE):** +> adds the enqueue surface for the per-conversation message queue (the wire types `QueuedMessage` / +> `QueuePayload` + the new `steering` `AgentEvent` live in `wire@0.8.0`, re-exported here). Two +> additive shapes: +> 1. **WS `chat.queue` op** — `ChatQueueMessage { type: "chat.queue"; conversationId; text }` (a +> new `WsClientMessage` union member). Fire-and-forget: on success the server emits NOTHING back +> — the message-queue SURFACE updates (the new message appears in the snapshot). On failure (empty +> `text`, unknown conversation) the server replies `chat.error`. **Auto-start when idle +> (server-owned):** if no turn is active, `chat.queue` does NOT queue — it STARTS A NEW TURN with +> the message as its opening prompt (equivalent to `chat.send`). So a single op works for both +> "steer during generation" and "send"; the client doesn't pick. `text` must be non-empty after trim. +> 2. **HTTP `POST /conversations/:id/queue`** — body `QueueRequest { text }` → `QueueResponse +> { conversationId; startedTurn: boolean; queue: QueuedMessage[] }`. `startedTurn: true` = was +> idle, a new turn started (the message is the turn's opening prompt, NOT a queued steering +> message); `startedTurn: false` = a turn was active, the message was queued (the `queue` +> snapshot includes it). Empty/whitespace `text` → HTTP 400 `{ error }`. The FE uses the WS op. +> +> The queue is read via a per-conversation SURFACE (`message-queue`, scope `conversation`; one +> `custom` field, `rendererId: "message-queue"`, `payload: QueuePayload`) — NOT via the chat stream. +> See the handoff for the full flow (steering event, carry-to-new-turn, move-vs-duplicate). +> > **2026-06-12 delta (reasoning-effort handoff — package bumped `0.10.0` → `0.11.0`, ADDITIVE):** > the thinking-depth knob (`ReasoningEffort`, re-exported from `wire@0.7.0`) lands in TWO scopes, > resolved server-side per turn (per-turn override → persisted conversation value → default @@ -135,6 +156,11 @@ - `POST /conversations/:id/close` — no body → `200 CloseConversationResponse`. The EXPLICIT tab-close affordance: aborts any in-flight turn (persists the partial; seals with `finishReason: "aborted"`) AND stops + disables cache-warming (persisted OFF). Idempotent (`abortedTurn: false` when idle/unknown). +- `POST /conversations/:id/queue` — body `QueueRequest { text }` → `200 QueueResponse`. Enqueue a user + message for mid-turn steering delivery (the WS `chat.queue` op is the FE's path). When a turn is + active, the message is queued + delivered at the next tool-result boundary (a `steering` `AgentEvent` + fires; the message-queue SURFACE updates). When idle, the enqueue STARTS a new turn with the message + as its opening prompt (`startedTurn: true`). Empty/whitespace `text` → `400 { error }`. - `GET /metrics/throughput?period=day|week|month&date=<...>` — `ThroughputResponse` (token-weighted tokens/sec per model over the window). Not part of cache-warming; listed for completeness. - `GET /conversations/:id/cwd` — `CwdResponse` (`cwd` is `null` until set). @@ -172,10 +198,17 @@ */ import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; -import type { AgentEvent, ReasoningEffort, StoredChunk, TurnMetrics } from "@dispatch/wire"; +import type { + AgentEvent, + QueuedMessage, + ReasoningEffort, + StoredChunk, + TurnMetrics, +} from "@dispatch/wire"; export type { AgentEvent, + QueuedMessage, ReasoningEffort, StepMetrics, StoredChunk, @@ -395,6 +428,41 @@ export interface CloseConversationResponse { readonly abortedTurn: boolean; } +// ─── Message queue (steering) ───────────────────────────────────────────────── + +/** + * Request body for `POST /conversations/:id/queue` — enqueue a user message + * onto a conversation's message queue for mid-turn steering delivery. + * + * When a turn is ACTIVE for the conversation, the message is appended to the + * queue (the message-queue extension's per-conversation SURFACE updates) and + * delivered at the next tool-result boundary as a steering message the model + * sees alongside the tool results (a `steering` `AgentEvent` is emitted). When + * NO turn is active, enqueuing instead STARTS a new turn with the message as its + * opening prompt (equivalent to `POST /chat`) — so a fire-and-forget enqueue + * works regardless of generation state. The resolved queue + whether a turn was + * started are returned in `QueueResponse`. + * + * `text` must be non-empty (after trim) → HTTP 400 `{ error }` otherwise. + */ +export interface QueueRequest { + readonly text: string; +} + +/** + * Response body for `POST /conversations/:id/queue` — the conversation's queue + * snapshot AFTER the enqueue, so a client renders the queue from this alone. + * `conversationId` echoes the path. `startedTurn` is true when no turn was + * active and the enqueue started a new turn (the message is now the turn's + * opening prompt, not a queued steering message); the turn's events stream on + * the chat channel as usual. + */ +export interface QueueResponse { + readonly conversationId: string; + readonly startedTurn: boolean; + readonly queue: readonly QueuedMessage[]; +} + // ─── Per-conversation LSP status ────────────────────────────────────────────── /** The connection state of a single language server for a workspace. */ @@ -549,6 +617,23 @@ export interface ChatUnsubscribeMessage { readonly conversationId: string; } +/** + * Client → server: enqueue a message onto a conversation's message queue while + * a turn is generating (steering). The WebSocket counterpart of the HTTP + * `POST /conversations/:id/queue` (`QueueRequest`). Fire-and-forget: success is + * confirmed by the message-queue SURFACE updating (the FE renders the queue + * from the surface, not from a reply here); a failure (malformed/empty text, + * unknown conversation) arrives as a `chat.error`. When no turn is active, the + * enqueue starts a new turn (the turn's events stream as `chat.delta`s), so a + * client reuses this op for both "queue while generating" and "send" (the + * latter being equivalent to `chat.send`). + */ +export interface ChatQueueMessage { + readonly type: "chat.queue"; + readonly conversationId: string; + readonly text: string; +} + /** * Every client → server WS message: surface ops (`@dispatch/ui-contract`) + chat * ops. A server discriminates on `type`. @@ -557,7 +642,8 @@ export type WsClientMessage = | SurfaceClientMessage | ChatSendMessage | ChatSubscribeMessage - | ChatUnsubscribeMessage; + | ChatUnsubscribeMessage + | ChatQueueMessage; /** * Every server → client WS message: surface ops (`@dispatch/ui-contract`) + chat diff --git a/.dispatch/wire.reference.md b/.dispatch/wire.reference.md index 34984d2..c2c4d43 100644 --- a/.dispatch/wire.reference.md +++ b/.dispatch/wire.reference.md @@ -4,8 +4,31 @@ > types WITHOUT following the `file:` dep symlink out of this repo (which hangs on a permission > prompt). Your CODE still imports `@dispatch/wire` normally — this file is for READING only. > -> **Orchestrator:** SNAPSHOT of `wire@0.7.0` (reasoning effort — the thinking-depth knob). -> Regenerate whenever `@dispatch/wire` changes. +> **Orchestrator:** SNAPSHOT of `wire@0.8.0` (message queue + steering). Regenerate +> whenever `@dispatch/wire` changes. +> +> **2026-06-21 delta (message-queue + steering handoff — package bumped `0.7.0` → `0.8.0`, ADDITIVE):** +> adds the per-conversation **message queue** + **steering** feature. While a turn is GENERATING, +> a client enqueues a user message (via the `chat.queue` WS op or `POST /conversations/:id/queue`, +> see `transport-contract@0.12.0`); it is delivered mid-turn as **steering** — injected at the next +> tool-result boundary so the model sees it alongside the tool results and can adjust course. If the +> turn ends with a non-empty queue (no tool call fired), the queue is carried into a NEW turn as its +> opening prompt (no `steering` event — the new turn's `user-message` covers it). +> +> Adds: +> - **`QueuedMessage`** (`{ id, text, queuedAt }`) — a message held in the queue (stable id for UI +> keying + dedup). +> - **`QueuePayload`** (`{ messages: QueuedMessage[] }`) — the payload of the message-queue +> extension's per-conversation `custom` surface field (`rendererId: "message-queue"`). Carried on +> the SURFACE channel (NOT the chat stream) — the queue is control/state. Empty `messages` = empty +> queue. See `transport-contract.reference.md` for the surface + the enqueue op. +> - **`TurnSteeringEvent`** (`{ type: "steering"; conversationId; turnId; text }`) — a NEW +> `AgentEvent` union member, emitted on the chat stream when the kernel drains a non-empty queue +> at a tool-result boundary. Render `text` as a USER bubble in the transcript (positioned after +> the tool-result it followed); the queue surface separately clears on drain. One event per drain; +> `text` is the combined text of all drained messages. Late-join safe (buffered into the in-flight +> turn's event buffer, mirroring `user-message`). Carry-to-new-turn does NOT emit `steering`. +> ADDITIVE to the union — if you have an exhaustive `AgentEvent` switch, add a `steering` case. > > **2026-06-12 delta (reasoning-effort handoff — package bumped `0.6.1` → `0.7.0`, ADDITIVE):** > adds the **`ReasoningEffort`** type — the per-request thinking-depth ladder @@ -284,6 +307,37 @@ export interface TurnMetrics { readonly contextSize?: number; } +// ─── Message queue + steering ─────────────────────────────────────────────── + +/** + * A user message held in a conversation's message queue, awaiting mid-turn + * steering delivery. The message-queue extension owns the queue and exposes it + * as a per-conversation `custom` surface field; this type is the shared shape + * the surface payload, the enqueue response, and the extension's service all + * use (so a separate frontend repo can depend on the wire alone to render it). + */ +export interface QueuedMessage { + /** Stable id (client-visible) for UI keying + dedup. */ + readonly id: string; + /** The message text the client enqueued. */ + readonly text: string; + /** When the message was enqueued (epoch-ms). */ + readonly queuedAt: number; +} + +/** + * The payload of the message-queue extension's per-conversation `custom` + * surface field (`rendererId: "message-queue"`): the current queue snapshot a + * frontend renders. Carried on the SURFACE channel (NOT the chat stream) — the + * queue is control/state, distinct from turn content. An empty `messages` + * array means the queue is empty (no pending steering). The frontend moves a + * message from this queue surface into the transcript when it is drained (the + * surface clears) and/or when the matching `TurnSteeringEvent` arrives. + */ +export interface QueuePayload { + readonly messages: readonly QueuedMessage[]; +} + // ─── Outward events ───────────────────────────────────────────────────────── /** @@ -303,7 +357,8 @@ export type AgentEvent = | TurnStepCompleteEvent | TurnErrorEvent | TurnDoneEvent - | TurnSealedEvent; + | TurnSealedEvent + | TurnSteeringEvent; /** Status change for a conversation (e.g. idle → running). */ export interface StatusEvent { @@ -498,4 +553,28 @@ export interface TurnSealedEvent { readonly conversationId: string; readonly turnId: string; } + +/** + * A steering message was injected into an in-flight turn at the tool-result + * boundary (the model sees it alongside the tool results and may adjust + * course). Drawn from the conversation's message queue (which the drain + * clears); the cleared queue arrives as a message-queue SURFACE update, while + * THIS event carries the injected `text` so a frontend can place a user bubble + * in the transcript live — and so a late-joining watcher sees it before seal + * (mirroring `TurnInputEvent` for the opening prompt; emitted into the + * in-flight buffer by the session-orchestrator). + * + * Emitted by the session-orchestrator (in its `drainSteering` wrapper) only + * when the kernel drained a non-empty queue at a tool-result boundary. If the + * turn instead ENDS with a non-empty queue (no tool call fired), the queue is + * carried into a NEW turn whose opening `user-message` event covers the + * transcript — so no `steering` event is emitted in that case. One `steering` + * event per drain; the combined text of all drained messages. + */ +export interface TurnSteeringEvent { + readonly type: "steering"; + readonly conversationId: string; + readonly turnId: string; + readonly text: string; +} ``` diff --git a/GLOSSARY.md b/GLOSSARY.md index 90acdd8..e350ffa 100644 --- a/GLOSSARY.md +++ b/GLOSSARY.md @@ -22,6 +22,8 @@ | **context size** | The tokens a conversation currently occupies: the most recent turn's FINAL step `inputTokens + outputTokens` (NOT the aggregate per-turn `usage`, which sums per-step prompts and overcounts a multi-step turn). On the wire as `TurnDoneEvent.contextSize` (live `done`) + `TurnMetrics.contextSize` (persisted); the FE reads the LATEST turn's value as current usage, and treats `undefined` as "unknown" (renders a placeholder, never `0`). Mirrors the backend GLOSSARY. | context usage, context length, tokens used (and do NOT call it "context window" — that's the limit) | | **reasoning effort** | The per-request thinking-depth knob: how much extended thinking the model spends before answering. Canonical ladder `ReasoningEffort = "low" \| "medium" \| "high" \| "xhigh" \| "max"` (`wire@0.7.0`). Resolution is SERVER-owned (never re-implement): per-turn `ChatRequest.reasoningEffort` override → persisted per-conversation value (`GET`/`PUT /conversations/:id/reasoning-effort`) → default `"high"` — so `null` from the GET means "default (`high`) applies", not "off". Changing the level can bust the prompt cache for the next turn (one-time re-prefill); a stable setting stays cache-safe. | thinking setting, thinking level, effort level, thinking budget | | **context window** | The model's MAXIMUM token capacity (the limit a **context size** is measured against). A FUTURE backend field — not on the wire yet. **Placeholder:** the composer status bar currently HARDCODES a `1,000,000`-token window for the `size / limit · pct%` readout + fill bar; swap to the real per-model value when the backend ships it (see `backend-handoff.md` §3). | max context, token limit (distinct from **context size**, the current usage) | +| **message queue** | A per-conversation buffer of user messages awaiting mid-turn **steering** delivery (owned by the `message-queue` backend extension). Transient + in-memory; exposed to the FE as a per-conversation **surface** (`message-queue`, scope `conversation`; one `custom` field, `rendererId: "message-queue"`, `payload: QueuePayload`). NOT on the chat stream — it is control/state. Enqueued via `chat.queue` (WS) or `POST /conversations/:id/queue` (HTTP); auto-starts a turn if idle. `wire@0.8.0`. | pending messages, steering queue | +| **steering** | A user message injected into an in-flight turn at the tool-result boundary (drawn from the **message queue**): the model sees it alongside the tool results and may adjust course. Emitted on the chat stream as a `steering` `AgentEvent` (`TurnSteeringEvent`); the queue surface clears on drain (move, don't duplicate). If the turn ends with a non-empty queue (no tool call fired), the queue carries into a NEW turn as its opening prompt (no `steering` event). `wire@0.8.0`. | mid-turn injection, course correction, interruption | ## Frontend-specific | Term | Meaning | Aliases to avoid | @@ -38,6 +40,6 @@ | **surface interpreter** | The generic renderer: field kind → component. Knows kinds, never surface ids. | — | | **metrics bubble** | The FE chat element that renders a turn's **turn metrics** (one per-turn total) and **step metrics** (one per step) as muted system-style bubbles at a turn's tail. UI presentation of `TurnMetrics`/`StepMetrics`; never a surface. | telemetry bubble, usage bubble, stats bubble | | **TPS** (tokens per second) | A FE-DERIVED decode rate: `outputTokens / (decodeMs / 1000)` (per step; per turn over Σ `decodeMs`), falling back to `genTotalMs` when `decodeMs` is absent. The backend-recommended basis (excludes first-token latency). Not carried on the wire; omitted when timing is absent. | throughput | -| **chat limit** | The max LOADED chunks per conversation (default 256; localStorage `dispatch.chatLimit`, no UI yet) before the oldest quarter is unloaded. Counts **chunks** (committed + provisional + accumulating). Policy in `core/chunks/trim.ts`. | chunk limit, message limit, history limit | +| **chat limit** | The max LOADED chunks per conversation (default 256; persisted at localStorage `dispatch.chatLimit`, settable via the sidebar's Settings view) before the oldest quarter is unloaded. Counts **chunks** (committed + provisional + accumulating). Policy in `core/chunks/trim.ts`. | chunk limit, message limit, history limit | | **unload** | Drop the oldest COMMITTED chunks from the in-memory transcript (and DOM) past the **chat limit** — in BULK (`ceil(limit/4)` per pass, deferred while the reader is scrolled up), never one-per-delta (old Dispatch's scroll-jump bug). Purely local: the IndexedDB cache and the server keep everything; `TranscriptState.hiddenBeforeSeq` is the watermark. Distinct from the conversation-cache's cross-conversation **eviction**. | evict (reserved for the cross-conversation cache), prune, drop | | **show earlier** | The affordance at the top of a transcript with unloaded history ("Show earlier messages"): pages one unload-unit back in — local cache first, then the server (CR-5 `?beforeSeq=&limit=`) when the cache doesn't reach far enough back — preserving the reader's scroll position. Offered whenever the loaded window starts above seq 1 (the wire@0.6.1 1-based gap-free seq contract). | load more, pagination | diff --git a/backend-handoff.md b/backend-handoff.md index 7c7da05..5b54f2d 100644 --- a/backend-handoff.md +++ b/backend-handoff.md @@ -5,18 +5,38 @@ > **From:** dispatch-web orchestrator · **To:** arch-rewrite orchestrator · **Courier:** the user. > `lsp` does NOT span the repos (AGENTS.md § Backend seam) — every cross-repo ask flows through here. -_Last updated: 2026-06-12 (reasoning-effort handoff consumed). **FE is current on -`ui-contract@0.2.0` / `transport-contract@0.11.0` / `wire@0.7.0`.** All handoffs to date are +_Last updated: 2026-06-21 (message-queue + steering handoff consumed). **FE is current on +`ui-contract@0.2.0` / `transport-contract@0.12.0` / `wire@0.8.0`.** All handoffs to date are consumed: surfaces + WS, conversation transcript/metrics, tabs + model selector, cache-warming (incl. authoritative timer + retention + cache-rate fix + the CR-4 lifecycle below), **per-conversation cwd + LSP status**, **context size**, **turn continuity + multi-client live -view**, the **chat limit + CR-5 history windowing**, and the **reasoning effort -(thinking-depth knob)** (below). +view**, the **chat limit + CR-5 history windowing**, the **reasoning effort +(thinking-depth knob)**, and the **message queue + steering** (below). **Open asks: NONE.** CR-1/CR-2/CR-4/CR-5 all RESOLVED ✅ (see §2); §3 lists likely next asks. **CR-3 (watcher couldn't see the USER prompt until seal) → RESOLVED ✅** — backend shipped the `user-message` turn event; FE re-pinned + consumption live. The cwd/LSP draft-path verification (`backend-handoff-cwd-lsp.md`) came back **all ✅ confirmed**._ +**Message-queue + steering handoff (`frontend-message-queue-handoff.md`) → CONSUMED ✅.** +Re-pinned `wire@0.7.0→0.8.0` + `transport-contract@0.11.0→0.12.0` (`ui-contract` unchanged — +the queue uses the existing `custom` surface field kind); re-mirrored both +`.dispatch/*.reference.md`; added "message queue" + "steering" to FE `GLOSSARY.md`. FE work: +(a) `core/chunks/reducer.ts` folds the new `steering` `AgentEvent` into the transcript as a +provisional user bubble (after the tool-result it followed; no de-dup — the queue surface, not +the transcript, carried the pending message); `core/wire/conformance.ts` exhaustiveness guards +updated for `steering` + `chat.queue`; (b) a `rendererId: "message-queue"` custom renderer +(`surface-host/logic/message-queue.ts` pure parser + `MessageQueueList.svelte`) renders +`QueuePayload.messages` (`QueuedMessage[]`); (c) the `message-queue` surface is pulled out of +the generic Extensions sidebar list and rendered as a compact panel above the Composer (only +when the queue is non-empty — an idle queue is hidden); (d) `ChatStore.queueMessage(text)` + +`AppStore.queueMessage(text)` send `chat.queue { conversationId, text }` (trim/validate +non-empty client-side too); (e) the Composer switches to `chat.queue` while `generating` +(button label → "Queue", placeholder → "Steer the conversation..."). Carry-to-new-turn needs +no special handling (surfaces as a normal new turn). **NOT yet live-probed** — the handoff +flags the live end-to-end steering flow (a real `chat.queue` → tool-call → `steering` event +against a tool-calling model) as not yet exercised; worth a live smoke. 664 tests green. NO +new backend ask._ + **Reasoning-effort handoff (`frontend-reasoning-effort-handoff.md`) → CONSUMED ✅ (curl-probed live: GET null on unseen id · PUT `xhigh` → echo + sticky GET · bad level → 400 listing the ladder · CORS preflight allows PUT).** Re-pinned `wire@0.6.1→0.7.0` + @@ -81,13 +101,13 @@ backend ask — but the max-limit denominator is now a live FE need; see §3. ## 1. Pinned backend contracts (consumed by the FE) -Pinned as `file:` deps: **`ui-contract@0.2.0`; `wire@0.7.0`; `transport-contract@0.11.0`**. +Pinned as `file:` deps: **`ui-contract@0.2.0`; `wire@0.8.0`; `transport-contract@0.12.0`**. | Package | Used for | |---|---| | `@dispatch/ui-contract` | surfaces + surface WS protocol | -| `@dispatch/wire` | `Chunk`/`StoredChunk`(+`seq`)/`ChatMessage`/`AgentEvent`/`TurnSealedEvent`/`Usage`/`StepId` + metrics: `StepMetrics`/`TurnMetrics`, `usage.stepId`, `step-complete`, `done.durationMs`/`done.usage`, `tool-result.durationMs`, **`done.contextSize`/`TurnMetrics.contextSize`**, **`ReasoningEffort`** | -| `@dispatch/transport-contract` | `ChatRequest`(+`reasoningEffort`)/`ModelsResponse`/`ConversationHistoryResponse`/`ConversationMetricsResponse` + `WarmRequest`/`WarmResponse` + `CwdResponse`/`SetCwdRequest` + `ReasoningEffortResponse`/`SetReasoningEffortRequest` + LSP (`LspStatusResponse`/`LspServerInfo`/`LspServerState`) + WS chat ops + `WsClientMessage`/`WsServerMessage` | +| `@dispatch/wire` | `Chunk`/`StoredChunk`(+`seq`)/`ChatMessage`/`AgentEvent`/`TurnSealedEvent`/`Usage`/`StepId` + metrics: `StepMetrics`/`TurnMetrics`, `usage.stepId`, `step-complete`, `done.durationMs`/`done.usage`, `tool-result.durationMs`, **`done.contextSize`/`TurnMetrics.contextSize`**, **`ReasoningEffort`**, **`QueuedMessage`/`QueuePayload`/`TurnSteeringEvent`** | +| `@dispatch/transport-contract` | `ChatRequest`(+`reasoningEffort`)/`ModelsResponse`/`ConversationHistoryResponse`/`ConversationMetricsResponse` + `WarmRequest`/`WarmResponse` + `CwdResponse`/`SetCwdRequest` + `ReasoningEffortResponse`/`SetReasoningEffortRequest` + **`QueueRequest`/`QueueResponse`/`ChatQueueMessage`** + LSP (`LspStatusResponse`/`LspServerInfo`/`LspServerState`) + WS chat ops + `WsClientMessage`/`WsServerMessage` | Endpoints in use (HTTP **24203**, WS **24205**, CORS `*` incl. `PUT`): `POST /chat` (NDJSON) · `GET /models` · @@ -95,12 +115,14 @@ Endpoints in use (HTTP **24203**, WS **24205**, CORS `*` incl. `PUT`): `GET /conversations/:id/metrics` · `GET`/`PUT /conversations/:id/cwd` · `GET`/`PUT /conversations/:id/reasoning-effort` (sticky thinking-depth; `null` ⇒ default `high`) · `GET /conversations/:id/lsp` · `POST /chat/warm` · `POST /conversations/:id/close` (explicit -tab-close: abort turn + stop/disable warming) · WS `chat.send`→`chat.delta` · -WS `chat.subscribe`/`chat.unsubscribe` (watch a conversation's turns without sending; replay + live). +tab-close: abort turn + stop/disable warming) · **`POST /conversations/:id/queue`** (enqueue +steering message; auto-starts a turn if idle) · WS `chat.send`→`chat.delta` · +WS `chat.subscribe`/`chat.unsubscribe` (watch a conversation's turns without sending; replay + live) · +**WS `chat.queue`** (enqueue steering; fire-and-forget — surface updates on success). Mirrored in-repo for headless agents: `.dispatch/{ui-contract,wire,transport-contract}.reference.md` (regenerate on any contract bump; all current as of `ui-contract@0.2.0` / -`transport-contract@0.11.0` / `wire@0.7.0`). +`transport-contract@0.12.0` / `wire@0.8.0`). ## 2. Open asks FOR THE BACKEND diff --git a/src/app/App.svelte b/src/app/App.svelte index dffa937..ee72ca5 100644 --- a/src/app/App.svelte +++ b/src/app/App.svelte @@ -18,12 +18,18 @@ } from "../features/chat"; import { manifest as conversationCacheManifest } from "../features/conversation-cache"; import { manifest as markdownManifest } from "../features/markdown"; + import { + ChatLimitField, + manifest as settingsManifest, + type ChatLimitSaveResult, + } from "../features/settings"; import { createSmartScrollController, manifest as smartScrollManifest, ScrollToBottom, } from "../features/smart-scroll"; import { manifest as surfaceHostManifest, SurfaceView } from "../features/surface-host"; + import { parseMessageQueuePayload } from "../features/surface-host/logic/message-queue"; import { manifest as tabsManifest, TabBar } from "../features/tabs"; import { manifest as viewsManifest, ViewSidebar } from "../features/views"; import { @@ -42,6 +48,10 @@ // and keep it out of the generic Extensions surface list — SurfaceView itself // stays fully generic (it never switches on a surface id). const CACHE_WARMING_ID = "cache-warming"; + // The message-queue extension's per-conversation surface (steering). Pulled + // out of the generic Extensions list and rendered as a compact panel above the + // composer — pending steering messages are tied to the chat, not the sidebar. + const MESSAGE_QUEUE_ID = "message-queue"; // The view kinds offered in the sidebar's dropdown. Generic data — the // `viewContent` snippet below maps each kind id to its renderer. @@ -50,10 +60,11 @@ { id: "lsp", label: "Language Servers" }, { id: "extensions", label: "Extensions" }, { id: "cache-warming", label: "Cache Warming" }, + { id: "settings", label: "Settings" }, ] as const; - // Default sidebar layout: Model panel on top, then Language Servers, Extensions, Cache Warming. - const initialViews = ["model", "lsp", "extensions", "cache-warming"] as const; + // Default sidebar layout: Model, Language Servers, Extensions, Cache Warming, Settings. + const initialViews = ["model", "lsp", "extensions", "cache-warming", "settings"] as const; // Frontend module list for the "Loaded Modules" view, AGGREGATED from each // feature's public `manifest` export so it can't drift from what's actually @@ -71,6 +82,7 @@ cacheWarmingManifest, workspaceManifest, smartScrollManifest, + settingsManifest, ].map((m) => [m.name, m.description] as const); // Smart-scroll: keep the transcript pinned to the bottom while it streams, @@ -120,6 +132,18 @@ smartScroll.contentChanged(); }); + // The message-queue surface spec + whether it currently has pending messages + // (steering). Rendered as a compact panel above the composer only when non-empty. + const messageQueueSpec = $derived(store.surface(MESSAGE_QUEUE_ID)); + const hasQueuedMessages = $derived.by(() => { + const spec = messageQueueSpec; + if (spec === null) return false; + const field = spec.fields.find((f) => f.kind === "custom" && f.rendererId === MESSAGE_QUEUE_ID); + if (field === undefined || field.kind !== "custom") return false; + const data = parseMessageQueuePayload(field.payload); + return data !== null && data.messages.length > 0; + }); + // Conversation/tab switch → snap to the bottom of the new transcript. $effect(() => { void store.activeConversationId; @@ -140,6 +164,10 @@ store.send(text); } + function handleQueue(text: string) { + store.queueMessage(text); + } + function handleSelectModel(model: string) { store.selectModel(model); } @@ -168,6 +196,25 @@ : { ok: false, error: result.error }; } + // Adapt the store's chat-limit result to the settings feature's port. On a + // raise the active chat refills (prepends older history); preserve the + // reader's viewport over the prepend (the manual analogue of CSS scroll + // anchoring), exactly like `handleShowEarlier`. + async function saveChatLimit(value: number): Promise { + const el = transcriptEl; + const prevHeight = el?.scrollHeight ?? 0; + const prevTop = el?.scrollTop ?? 0; + const result = await store.setChatLimit(value); + await tick(); + if (el) { + const delta = el.scrollHeight - prevHeight; + if (delta > 0) el.scrollTop = prevTop + delta; + } + return result.ok + ? { ok: true, chatLimit: result.chatLimit } + : { ok: false, error: result.error }; + } + // Adapt the store's cwd/LSP results to the workspace feature's ports. async function saveCwd(cwd: string): Promise { const result = await store.setCwd(cwd); @@ -262,8 +309,18 @@ smartScroll.resume()} /> + {#if hasQueuedMessages && messageQueueSpec !== null} + +
+ +
+ {/if} +

Surfaces

- {#each store.surfaces.filter((s) => s.id !== CACHE_WARMING_ID) as spec (spec.id)} + {#each store.surfaces.filter((s) => s.id !== CACHE_WARMING_ID && s.id !== MESSAGE_QUEUE_ID) as spec (spec.id)} {/each}
@@ -344,5 +401,11 @@ {warmNow} /> {/key} + {:else if kind === "settings"} + +
+ +
{/if} {/snippet} diff --git a/src/app/store.svelte.ts b/src/app/store.svelte.ts index e8bb5e1..dc06ea1 100644 --- a/src/app/store.svelte.ts +++ b/src/app/store.svelte.ts @@ -60,6 +60,11 @@ export type ReasoningEffortResult = | { readonly ok: true; readonly reasoningEffort: ReasoningEffort } | { readonly ok: false; readonly error: string }; +/** Outcome of persisting a chat-limit setting (localStorage; FE-local). */ +export type ChatLimitResult = + | { readonly ok: true; readonly chatLimit: number } + | { readonly ok: false; readonly error: string }; + export interface AppStore { readonly tabs: readonly Tab[]; readonly activeConversationId: string | null; @@ -73,6 +78,14 @@ export interface AppStore { /** The current spec for one surface by id (discovery-by-id), or null if absent. */ surface(surfaceId: string): SurfaceSpec | null; send(text: string): void; + /** + * Enqueue a steering message onto the focused conversation's queue + * (`chat.queue` WS op). While a turn is generating, the message is delivered + * mid-turn at the next tool-result boundary; when idle, the server + * auto-starts a turn (equivalent to `send`). Safe to offer whenever the user + * wants to add input — the server owns the idle-vs-generating decision. + */ + queueMessage(text: string): void; selectModel(model: string): void; newDraft(): void; selectTab(conversationId: string): void; @@ -109,6 +122,16 @@ export interface AppStore { * The backend lazily spawns servers, so this may take a moment on the first call for a cwd. */ lspStatus(): Promise; + /** The persisted chat limit (max loaded chunks per conversation). */ + readonly chatLimit: number; + /** + * Persist + live-apply a new chat limit: writes `dispatch.chatLimit` to + * localStorage and propagates to every live chat store (trim if lower, + * deferred via the unload gate while a reader is scrolled up; no-op if + * higher — page unloaded history back in via "Show earlier"). Stores created + * afterwards pick the new limit up at creation. Always succeeds (FE-local). + */ + setChatLimit(limit: number): Promise; /** * Wire the chat-limit unload gate (composition-root injection, called once by * the shell after it owns the scroll region): unloading old chunks is allowed @@ -189,15 +212,17 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { const tabsStore: TabsStore = createTabsStore(storageAdapter); // The chat limit (max loaded chunks per conversation) — a persisted local - // setting with no UI yet: edit `localStorage["dispatch.chatLimit"]`. The - // default is written back on first run so the knob is discoverable. + // setting surfaced in the sidebar's Settings view. Reactive so the field + + // any live-apply re-trim update together. The default is written back on + // first run so the knob is discoverable in localStorage too. const chatLimitStore = createLocalStore("dispatch.chatLimit", { storage: localStorageOpt, }); const storedChatLimit = chatLimitStore.load(); - const chatLimit = normalizeChatLimit(storedChatLimit); + const normalizedChatLimit = normalizeChatLimit(storedChatLimit); + let chatLimit = $state(normalizedChatLimit); if (storedChatLimit === null) { - chatLimitStore.save(chatLimit); + chatLimitStore.save(normalizedChatLimit); } // Unload gate — attached by the shell once it owns the scroll region (see @@ -225,7 +250,11 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { historySync, metricsSync, cache, - chatLimit, + // Read from the persisted store (kept in sync with the reactive `chatLimit` + // by `setChatLimit` + boot) so this snapshot doesn't reference the `$state` + // — each store captures its limit at creation; live updates go through + // `setChatLimit`. + chatLimit: normalizeChatLimit(chatLimitStore.load()), canUnload: () => (unloadGate === null ? true : unloadGate()), }); } @@ -516,6 +545,9 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { get reasoningEffort(): ReasoningEffort | null { return reasoningEffort; }, + get chatLimit(): number { + return chatLimit; + }, get currentConversationId(): string { return workspaceConversationId(); }, @@ -555,6 +587,15 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { } }, + queueMessage(text: string): void { + // Only offered while generating (Composer switches to `chat.queue` + // when `status === "running"`), so a draft (never generating) never + // reaches here. `chat.queue` auto-starts a turn if idle, so even a race + // (turn sealed between the status read and the send) is safe — the + // server starts a fresh turn with the message as its opening prompt. + activeChat.queueMessage(text); + }, + selectModel(model: string): void { activeModel = model; const activeId = tabsStore.activeConversationId; @@ -695,6 +736,24 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { } }, + async setChatLimit(limit: number): Promise { + const next = normalizeChatLimit(limit); + chatLimitStore.save(next); + chatLimit = next; + // Propagate to every live chat store. The ACTIVE one is awaited so its + // refill (on a raise) lands before the caller returns — letting the + // shell preserve scroll over the prepended older chunks. Background + // stores refill fire-and-forget. Future stores pick the new limit up at + // creation (via the persisted store). + const active = getActiveChat(); + await active.setChatLimit(next); + for (const s of chatStores.values()) { + if (s !== active) void s.setChatLimit(next); + } + if (draftStore !== active) void draftStore.setChatLimit(next); + return { ok: true, chatLimit: next }; + }, + async lspStatus(): Promise { const id = workspaceConversationId(); try { diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts index 35a586c..a346545 100644 --- a/src/core/chunks/reducer.test.ts +++ b/src/core/chunks/reducer.test.ts @@ -7,6 +7,7 @@ import type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnSteeringEvent, TurnTextDeltaEvent, TurnToolCallEvent, TurnToolResultEvent, @@ -437,6 +438,59 @@ describe("foldEvent — user-message (the turn's user prompt; backend CR-3)", () }); }); +describe("foldEvent — steering (mid-turn steering injection)", () => { + const steering = (text: string): TurnSteeringEvent => ({ + type: "steering", + conversationId: "c1", + turnId: "t1", + text, + }); + + it("appends a provisional user bubble + keeps generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, toolResult("t1", "tc1", "read", "output")); + s = foldEvent(s, steering("actually, use a different file")); + const chunks = selectChunks(s); + const last = chunks[chunks.length - 1]; + expect(last?.role).toBe("user"); + expect(last?.chunk).toEqual({ type: "text", text: "actually, use a different file" }); + expect(last?.provisional).toBe(true); + expect(s.generating).toBe(true); + }); + + it("does NOT dedup against the sender's queue (unlike user-message)", () => { + // The sender enqueued the message via `chat.queue` — the queue SURFACE + // showed it. The `steering` event places it in the transcript; the surface + // separately clears on drain. No de-dup here (the transcript never showed + // the queued message). + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, steering("steer once")); + s = foldEvent(s, steering("steer again")); + const users = selectChunks(s).filter((c) => c.role === "user"); + expect(users).toHaveLength(2); + }); + + it("ignores an empty steering event", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, steering("")); + expect(selectChunks(s)).toHaveLength(0); + expect(s.generating).toBe(true); // turn-start already set it + }); + + it("flushes an accumulating chunk before appending the steering bubble", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "partial response")); + s = foldEvent(s, steering("mid-turn correction")); + expect(s.accumulating).toBeNull(); + const roles = selectChunks(s).map((c) => c.role); + expect(roles).toEqual(["assistant", "user"]); + }); +}); + describe("applyHistory", () => { it("orders committed chunks by seq", () => { const s = initialState(); diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts index 0a57839..035846c 100644 --- a/src/core/chunks/reducer.ts +++ b/src/core/chunks/reducer.ts @@ -83,6 +83,8 @@ export function applyHistory( * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one). * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and * add a new provisional chunk. + * - `steering` appends a user bubble mid-turn (drained from the message queue + * at a tool-result boundary; the queue surface separately clears on drain). * - `usage` stores the latest Usage. * - `done` finalizes any accumulating chunk (turn still provisional). * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId. @@ -239,6 +241,23 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript generating: false, }; } + + case "steering": { + // A steering message drained from the queue at a tool-result boundary + // (the model sees it alongside the tool results). Append a user bubble + // to the provisional transcript; the turn is still in flight. The queue + // surface clears separately on drain (a different channel) — no de-dup + // here (unlike `user-message`, steering is never optimistically echoed + // into the transcript by the sender). + if (event.text.length === 0) return state; + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }], + accumulating: null, + generating: true, + }; + } } } diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts index a258873..2fdd3cb 100644 --- a/src/core/wire/conformance.test.ts +++ b/src/core/wire/conformance.test.ts @@ -75,6 +75,7 @@ describe("classifies every AgentEvent type", () => { { type: "error", conversationId: "c1", turnId: "t1", message: "oops" }, { type: "done", conversationId: "c1", turnId: "t1", reason: "complete" }, { type: "turn-sealed", conversationId: "c1", turnId: "t1" }, + { type: "steering", conversationId: "c1", turnId: "t1", text: "steer mid-turn" }, ]; it("returns a stable label for every AgentEvent.type variant", () => { @@ -93,11 +94,12 @@ describe("classifies every AgentEvent type", () => { "error", "done", "turn-sealed", + "steering", ]); }); - it("covers all 13 AgentEvent variants", () => { - expect(samples).toHaveLength(13); + it("covers all 14 AgentEvent variants", () => { + expect(samples).toHaveLength(14); }); }); @@ -152,6 +154,7 @@ describe("classifies every WsClientMessage type", () => { { type: "chat.send" as const, message: "hi" }, { type: "chat.subscribe" as const, conversationId: "c1" }, { type: "chat.unsubscribe" as const, conversationId: "c1" }, + { type: "chat.queue" as const, conversationId: "c1", text: "steer" }, ]; const labels = msgs.map(assertWsClientMessageExhaustive); expect(labels).toEqual([ @@ -161,6 +164,7 @@ describe("classifies every WsClientMessage type", () => { "chat.send", "chat.subscribe", "chat.unsubscribe", + "chat.queue", ]); }); }); diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts index 13be78c..6e87e5c 100644 --- a/src/core/wire/conformance.ts +++ b/src/core/wire/conformance.ts @@ -34,6 +34,8 @@ export function assertAgentEventExhaustive(event: AgentEvent): string { return "turn-sealed"; case "step-complete": return "step-complete"; + case "steering": + return "steering"; default: return event satisfies never; } @@ -102,6 +104,8 @@ export function assertWsClientMessageExhaustive(msg: WsClientMessage): string { return "chat.subscribe"; case "chat.unsubscribe": return "chat.unsubscribe"; + case "chat.queue": + return "chat.queue"; default: return msg satisfies never; } diff --git a/src/features/chat/ports.ts b/src/features/chat/ports.ts index f8c665f..ffe2c94 100644 --- a/src/features/chat/ports.ts +++ b/src/features/chat/ports.ts @@ -1,12 +1,17 @@ import type { + ChatQueueMessage, ChatSendMessage, ConversationHistoryResponse, ConversationMetricsResponse, } from "@dispatch/transport-contract"; -/** Injected transport port — sends chat messages to the server. */ +/** + * Injected transport port — sends chat messages to the server. Accepts both + * `chat.send` (start a turn) and `chat.queue` (enqueue a steering message; + * auto-starts a turn if idle). + */ export interface ChatTransport { - send(msg: ChatSendMessage): void; + send(msg: ChatSendMessage | ChatQueueMessage): void; } /** diff --git a/src/features/chat/store.svelte.ts b/src/features/chat/store.svelte.ts index e74980d..9beabfc 100644 --- a/src/features/chat/store.svelte.ts +++ b/src/features/chat/store.svelte.ts @@ -1,9 +1,10 @@ import type { ChatDeltaMessage, ChatErrorMessage, + ChatQueueMessage, ChatSendMessage, } from "@dispatch/transport-contract"; -import type { ChatMessage } from "@dispatch/wire"; +import type { ChatMessage, StoredChunk } from "@dispatch/wire"; import type { RenderedChunk, TranscriptState } from "../../core/chunks"; import { appendUserMessage, @@ -89,7 +90,29 @@ export interface ChatStore { readonly thinkingKeyBase: number; handleDelta(msg: ChatDeltaMessage | ChatErrorMessage): void; send(text: string): void; + /** + * Enqueue a steering message onto the conversation's queue (`chat.queue` + * WS op). While a turn is generating, the message is delivered mid-turn at + * the next tool-result boundary (a `steering` `AgentEvent` fires + the + * message-queue surface updates). When no turn is active, the server + * auto-starts a turn with the message as its opening prompt (equivalent to + * `chat.send`). No optimistic transcript echo — the queue SURFACE carries the + * pending message until drain; the `steering` event places it in the + * transcript. `text` must be non-empty (the server 400/errors otherwise). + */ + queueMessage(text: string): void; setModel(model: string): void; + /** + * Update the chat limit LIVE: re-normalizes, then adjusts the loaded window. + * Lowering it unloads older committed chunks (deferred via the gate while the + * reader is scrolled up, catching up on the next mutation). Raising it + * REFILLS older history (cache first, then CR-5 `?beforeSeq=`) up to the + * fresh-load window (`initialWindowSize` = 75% of the limit) — the same + * window a fresh `load()` would show — so upping the limit reveals more + * history instead of leaving a partial view. New deltas + loads use the new + * limit. The refill awaits, so a caller can preserve scroll over the prepend. + */ + setChatLimit(limit: number): Promise; load(): Promise; /** * Page one unload-unit (`ceil(limit/4)`) of earlier history back in — the @@ -117,7 +140,7 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { let _model = $state(deps.model); let disposed = false; - const chatLimit = normalizeChatLimit(deps.chatLimit); + let chatLimit = normalizeChatLimit(deps.chatLimit); /** * Enforce the chat limit after a transcript mutation — unless the injected @@ -166,6 +189,52 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { } } + /** + * Fetch up to `want` older chunks (seq < `oldest`) — cache first, then a + * CR-5 `?beforeSeq=&limit=` server backfill when the cache is too shallow, + * persisting it so the next read is local. Returns every locally-known + * chunk older than `oldest` (the caller — `restoreEarlier` — takes the + * newest `count` of them). Shared by `showEarlier` and the raise-refill. + */ + async function backfillOlder(oldest: number, want: number): Promise { + let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest); + const oldestKnown = earlier[0]?.seq ?? oldest; + if (earlier.length < want && oldestKnown > 1) { + const res = await deps.historySync(deps.conversationId, 0, { + beforeSeq: oldestKnown, + limit: want - earlier.length, + }); + const merged = await deps.cache.commit(deps.conversationId, res.chunks); + earlier = merged.filter((c) => c.seq < oldest); + } + return earlier; + } + + /** + * Refill toward the fresh-load window after a limit RAISE: pull older + * history (cache first, then server) so the loaded set grows to match what a + * fresh `load()` would show at the new limit. No-op when already at the + * origin (seq 1) or already within the window. `restoreEarlier` re-derives + * the window start at apply time, so a delta landing during the await can't + * corrupt the merge. NOT gated (refilling prepends above the viewport; the + * caller preserves scroll position). + */ + async function refill(): Promise { + if (disposed) return; + const oldest = transcript.committed[0]?.seq ?? transcript.hiddenBeforeSeq; + if (oldest <= 1) return; + const want = initialWindowSize(chatLimit) - transcript.committed.length; + if (want <= 0) return; + try { + const earlier = await backfillOlder(oldest, want); + if (earlier.length === 0) return; + transcript = restoreEarlier(transcript, earlier, want); + _error = null; + } catch (err) { + _error = err instanceof Error ? err.message : String(err); + } + } + return { get messages(): readonly ChatMessage[] { return selectMessages(transcript); @@ -230,10 +299,31 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { deps.transport.send(msg); }, + queueMessage(text: string): void { + const trimmed = text.trim(); + if (trimmed.length === 0) return; + const msg: ChatQueueMessage = { + type: "chat.queue", + conversationId: deps.conversationId, + text: trimmed, + }; + deps.transport.send(msg); + }, + setModel(model: string): void { _model = model; }, + async setChatLimit(limit: number): Promise { + const prev = chatLimit; + chatLimit = normalizeChatLimit(limit); + if (chatLimit < prev) { + maybeTrim(); + } else if (chatLimit > prev) { + await refill(); + } + }, + async load(): Promise { // Fresh load shows only the newest 75% of the limit — headroom before the // first trim. A warm cache is windowed locally (synchronously with its @@ -256,23 +346,7 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { if (oldest <= 1) return; const want = unloadCount(chatLimit); try { - let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest); - // The local cache may not reach far enough back (a server-windowed - // fresh load cached only the window): page the missing OLDER run in - // from the server (CR-5 `?beforeSeq=&limit=`) and persist it, so the - // next page-in is local. Seqs are gap-free, so the fetched run is - // contiguous with what we hold. NOTE: the backfill response's - // `latestSeq` is a window cursor — never fed to the tail cursor - // (ours derives from the cache's max seq). - const oldestKnown = earlier[0]?.seq ?? oldest; - if (earlier.length < want && oldestKnown > 1) { - const res = await deps.historySync(deps.conversationId, 0, { - beforeSeq: oldestKnown, - limit: want - earlier.length, - }); - const merged = await deps.cache.commit(deps.conversationId, res.chunks); - earlier = merged.filter((c) => c.seq < oldest); - } + const earlier = await backfillOlder(oldest, want); transcript = restoreEarlier(transcript, earlier, want); _error = null; } catch (err) { diff --git a/src/features/chat/store.test.ts b/src/features/chat/store.test.ts index 3232009..2d75139 100644 --- a/src/features/chat/store.test.ts +++ b/src/features/chat/store.test.ts @@ -144,6 +144,93 @@ describe("createChatStore", () => { store.dispose(); }); + describe("queueMessage (chat.queue — steering)", () => { + it("posts a chat.queue with conversationId + text", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + store.queueMessage("steer left"); + + expect(transport.sent).toHaveLength(0); // chat.send stays empty + expect(transport.sentQueue).toHaveLength(1); + expect(transport.sentQueue[0]?.type).toBe("chat.queue"); + expect(transport.sentQueue[0]?.conversationId).toBe(CONV_ID); + expect(transport.sentQueue[0]?.text).toBe("steer left"); + + store.dispose(); + }); + + it("trims whitespace before sending", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + store.queueMessage(" padded "); + + expect(transport.sentQueue[0]?.text).toBe("padded"); + + store.dispose(); + }); + + it("does not send for empty/whitespace-only text", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + store.queueMessage(" "); + store.queueMessage(""); + + expect(transport.sentQueue).toHaveLength(0); + + store.dispose(); + }); + + it("does NOT optimistically echo into the transcript (the surface carries the queue)", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + store.queueMessage("queued steering message"); + + expect(store.chunks).toHaveLength(0); // no transcript echo + + store.dispose(); + }); + }); + it("chat.error sets error", () => { const transport = createFakeTransport(); const historySync = createFakeHistorySync(); @@ -1248,6 +1335,195 @@ describe("createChatStore", () => { store.dispose(); }); + it("setChatLimit: lowering the limit trims older committed chunks live", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + + // Load 80 committed chunks (under the limit — no trim yet). + historySync.returnChunks = Array.from({ length: 80 }, (_, i) => makeStoredChunk(i + 1)); + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" })); + await vi.waitFor(() => { + expect(store.chunks).toHaveLength(80); + }); + + // Lower the limit to 10: 80 → unload ceil(10/4)=3 per quarter, needs + // ceil((80-10)/3)=24 quarters → drop min(72, 80)=72 → 8 remain. + await store.setChatLimit(10); + expect(store.chunks).toHaveLength(8); + expect(store.chunks[0]?.seq).toBe(73); + expect(store.hasEarlier).toBe(true); + + store.dispose(); + }); + + it("setChatLimit: raising the limit refills older history up to the fresh-load window", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + // Cache holds 200 chunks; load at limit 100 → window 75 → seqs 126..200. + await cache.impl.commit( + CONV_ID, + Array.from({ length: 200 }, (_, i) => makeStoredChunk(i + 1)), + ); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + await store.load(); + expect(store.chunks).toHaveLength(75); + expect(store.chunks[0]?.seq).toBe(126); + expect(store.hasEarlier).toBe(true); + + // Raise to 200 → window floor(0.75×200)=150 → refill 75 older chunks + // (seqs 51..125) from the cache. No server backfill (cache is deep enough). + await store.setChatLimit(200); + expect(historySync.calls).toHaveLength(1); // the load-time tail sync only + expect(store.chunks).toHaveLength(150); + expect(store.chunks[0]?.seq).toBe(51); + expect(store.hasEarlier).toBe(true); // 51 > 1 + + store.dispose(); + }); + + it("setChatLimit: raising backfills from the server when the cache is too shallow", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + // Server holds 200; cold-cache load at limit 100 → window 75 → seqs 126..200. + historySync.returnChunks = Array.from({ length: 200 }, (_, i) => makeStoredChunk(i + 1)); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + await store.load(); + expect(store.chunks[0]?.seq).toBe(126); + + // Raise to 200 → want 75 older. Cache only holds 126..200 → backfill + // seqs 51..125 from the server (CR-5 ?beforeSeq=126&limit=75). + await store.setChatLimit(200); + const backfill = historySync.calls[1]; + expect(backfill?.window).toEqual({ beforeSeq: 126, limit: 75 }); + expect(store.chunks).toHaveLength(150); + expect(store.chunks[0]?.seq).toBe(51); + + store.dispose(); + }); + + it("setChatLimit: raising refills all available older history (down to the origin)", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + + // 101 chunks → one trim pass drops 25 → 76 remain (seqs 26..101). + historySync.returnChunks = Array.from({ length: 101 }, (_, i) => makeStoredChunk(i + 1)); + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" })); + await vi.waitFor(() => { + expect(store.chunks).toHaveLength(76); + }); + expect(store.chunks[0]?.seq).toBe(26); + expect(store.hasEarlier).toBe(true); + + // Raise to 500 → window 375 → want 299 older. The cache holds only + // seqs 1..25 below the window (no more server-side) → restore all 25 → + // 101 loaded, reaching the origin. + await store.setChatLimit(500); + expect(store.chunks).toHaveLength(101); + expect(store.chunks[0]?.seq).toBe(1); + expect(store.hasEarlier).toBe(false); + + store.dispose(); + }); + + it("setChatLimit: raising is a no-op when the window already starts at the origin", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + await cache.impl.commit( + CONV_ID, + Array.from({ length: 50 }, (_, i) => makeStoredChunk(i + 1)), + ); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + await store.load(); // only 50 chunks → all loaded, window starts at seq 1 + expect(store.chunks).toHaveLength(50); + expect(store.hasEarlier).toBe(false); + const callsAfterLoad = historySync.calls.length; + + await store.setChatLimit(500); // raise → refill no-ops (oldest = 1) + expect(store.chunks).toHaveLength(50); + expect(store.chunks[0]?.seq).toBe(1); + expect(historySync.calls).toHaveLength(callsAfterLoad); // no backfill + + store.dispose(); + }); + + it("setChatLimit: a nonsensical value is normalized (no crash, no trim)", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + chatLimit: 100, + }); + + historySync.returnChunks = Array.from({ length: 50 }, (_, i) => makeStoredChunk(i + 1)); + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" })); + await vi.waitFor(() => { + expect(store.chunks).toHaveLength(50); + }); + + // NaN normalizes to the default (256). prev was 100 → raise → refill, + // but the loaded window already starts at seq 1 (origin) → no-op. + await store.setChatLimit(Number.NaN); + expect(store.chunks).toHaveLength(50); + + store.dispose(); + }); + it("resync is a no-op after dispose", async () => { const transport = createFakeTransport(); const historySync = createFakeHistorySync(); diff --git a/src/features/chat/test-helpers.ts b/src/features/chat/test-helpers.ts index 6bb98a1..100449f 100644 --- a/src/features/chat/test-helpers.ts +++ b/src/features/chat/test-helpers.ts @@ -1,19 +1,29 @@ +import type { ChatQueueMessage, ChatSendMessage } from "@dispatch/transport-contract"; import type { StoredChunk } from "@dispatch/wire"; import type { ConversationCache } from "../conversation-cache"; import type { ChatTransport, HistorySync, HistoryWindow, MetricsSync } from "./ports"; export interface FakeTransport { - readonly sent: import("@dispatch/transport-contract").ChatSendMessage[]; + /** All `chat.send` messages sent through the fake transport. */ + readonly sent: ChatSendMessage[]; + /** All `chat.queue` messages sent through the fake transport. */ + readonly sentQueue: ChatQueueMessage[]; readonly impl: ChatTransport; } export function createFakeTransport(): FakeTransport { - const sent: import("@dispatch/transport-contract").ChatSendMessage[] = []; + const sent: ChatSendMessage[] = []; + const sentQueue: ChatQueueMessage[] = []; return { sent, + sentQueue, impl: { send(msg) { - sent.push(msg); + if (msg.type === "chat.queue") { + sentQueue.push(msg); + } else { + sent.push(msg); + } }, }, }; diff --git a/src/features/chat/ui/Composer.svelte b/src/features/chat/ui/Composer.svelte index 24c2c19..d519efc 100644 --- a/src/features/chat/ui/Composer.svelte +++ b/src/features/chat/ui/Composer.svelte @@ -8,10 +8,18 @@ let { onSend, + onQueue, contextSize = undefined, status = "idle", }: { onSend: (text: string) => void; + /** + * Enqueue a steering message (`chat.queue`). When provided AND the status + * is `running`, the send button becomes a "Queue" button that steers the + * in-flight turn instead of starting a new one. When absent, `onSend` is + * used regardless (tests / non-steering contexts). + */ + onQueue?: (text: string) => void; // Current context occupancy (latest turn's contextSize), or `undefined` // when unknown — the status bar then shows "— tokens", never 0%. contextSize?: number | undefined; @@ -26,6 +34,13 @@ const usage = $derived(computeContextUsage(contextSize, MAX_CONTEXT)); const hasUsage = $derived(contextSize !== undefined); + // While a turn is generating, the send button becomes a "Queue" button that + // enqueues a steering message (`chat.queue`) instead of starting a new turn + // (`chat.send`). Falls back to `onSend` when no `onQueue` is wired. + const steering = $derived(status === "running" && onQueue !== undefined); + const submitLabel = $derived(steering ? "Queue" : "Send"); + const placeholder = $derived(steering ? "Steer the conversation..." : "Type a message..."); + // As the window fills, escalate color: calm → warning → danger. function fillClass(pct: number): string { if (pct >= 90) return "progress-error"; @@ -58,7 +73,11 @@ function handleSubmit(): void { const trimmed = text.trim(); if (trimmed.length === 0) return; - onSend(trimmed); + if (steering) { + onQueue?.(trimmed); + } else { + onSend(trimmed); + } text = ""; } @@ -84,12 +103,12 @@ class="textarea textarea-bordered flex-1 resize-none leading-normal !min-h-0 h-auto" bind:value={text} onkeydown={handleKeydown} - placeholder="Type a message..." + placeholder={placeholder} rows="1" aria-label="Message input" > diff --git a/src/features/surface-host/logic/message-queue.test.ts b/src/features/surface-host/logic/message-queue.test.ts new file mode 100644 index 0000000..ce078d9 --- /dev/null +++ b/src/features/surface-host/logic/message-queue.test.ts @@ -0,0 +1,48 @@ +import type { QueuedMessage } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { parseMessageQueuePayload } from "./message-queue"; + +const msg = (id: string, text: string, queuedAt = 1_700_000_000_000): QueuedMessage => ({ + id, + text, + queuedAt, +}); + +describe("parseMessageQueuePayload", () => { + it("parses a well-formed payload with messages", () => { + const data = parseMessageQueuePayload({ + messages: [msg("m1", "steer left"), msg("m2", "actually, go right")], + }); + expect(data).toEqual({ + messages: [msg("m1", "steer left"), msg("m2", "actually, go right")], + }); + }); + + it("parses an empty-messages payload (queue is empty)", () => { + expect(parseMessageQueuePayload({ messages: [] })).toEqual({ messages: [] }); + }); + + it("preserves message order", () => { + const data = parseMessageQueuePayload({ + messages: [msg("a", "first"), msg("b", "second"), msg("c", "third")], + }); + expect(data?.messages.map((m) => m.id)).toEqual(["a", "b", "c"]); + }); + + it.each([ + ["null", null], + ["a number", 7], + ["a string", "nope"], + ["missing messages key", { foo: [] }], + ["messages not an array", { messages: "x" }], + ["entry not an object", { messages: ["x"] }], + ["entry missing id", { messages: [{ text: "x", queuedAt: 1 }] }], + ["entry with non-string id", { messages: [{ id: 1, text: "x", queuedAt: 1 }] }], + ["entry missing text", { messages: [{ id: "m1", queuedAt: 1 }] }], + ["entry with non-string text", { messages: [{ id: "m1", text: 1, queuedAt: 1 }] }], + ["entry missing queuedAt", { messages: [{ id: "m1", text: "x" }] }], + ["entry with non-finite queuedAt", { messages: [msg("m1", "x", Number.NaN)] }], + ])("returns null for invalid payload: %s", (_label, payload) => { + expect(parseMessageQueuePayload(payload)).toBeNull(); + }); +}); diff --git a/src/features/surface-host/logic/message-queue.ts b/src/features/surface-host/logic/message-queue.ts new file mode 100644 index 0000000..a8e1567 --- /dev/null +++ b/src/features/surface-host/logic/message-queue.ts @@ -0,0 +1,45 @@ +import type { QueuedMessage } from "@dispatch/wire"; + +/** + * Pure parser for the `rendererId: "message-queue"` custom-field payload. + * + * The message-queue extension's per-conversation surface emits ONE `custom` + * field with `rendererId: "message-queue"` and `payload: QueuePayload` + * (`{ messages: QueuedMessage[] }` — the current queue snapshot). This parser + * validates the untyped `payload: unknown` at the network seam so a + * hostile/partial payload can never crash the renderer (graceful skip → null). + * + * Empty `messages` is a valid, parseable state (the queue is empty — nothing to + * render); the caller hides the panel. Null is returned only for a malformed + * payload shape. + */ +export interface MessageQueueData { + readonly messages: readonly QueuedMessage[]; +} + +function isQueuedMessage(v: unknown): v is QueuedMessage { + if (typeof v !== "object" || v === null) return false; + const o = v as Record; + return ( + typeof o.id === "string" && + typeof o.text === "string" && + typeof o.queuedAt === "number" && + Number.isFinite(o.queuedAt) + ); +} + +export function parseMessageQueuePayload(payload: unknown): MessageQueueData | null { + if (typeof payload !== "object" || payload === null) return null; + const obj = payload as Record; + const raw = obj.messages; + if (!Array.isArray(raw)) return null; + const messages: QueuedMessage[] = []; + for (const entry of raw) { + if (!isQueuedMessage(entry)) return null; + messages.push(entry); + } + return { messages }; +} + +/** The `rendererId` the message-queue extension's `custom` surface field uses. */ +export const MESSAGE_QUEUE_RENDERER_ID = "message-queue"; diff --git a/src/features/surface-host/ui/MessageQueueList.svelte b/src/features/surface-host/ui/MessageQueueList.svelte new file mode 100644 index 0000000..12de970 --- /dev/null +++ b/src/features/surface-host/ui/MessageQueueList.svelte @@ -0,0 +1,22 @@ + + +{#if data !== null && data.messages.length > 0} +
    + {#each data.messages as msg (msg.id)} +
  • +

    {msg.text}

    + +
  • + {/each} +
+{/if} diff --git a/src/features/surface-host/ui/SurfaceView.svelte b/src/features/surface-host/ui/SurfaceView.svelte index 24be8b8..e5f807a 100644 --- a/src/features/surface-host/ui/SurfaceView.svelte +++ b/src/features/surface-host/ui/SurfaceView.svelte @@ -2,6 +2,7 @@ import type { InvokeMessage, SurfaceSpec } from "@dispatch/ui-contract"; import { groupRenderFields, planSurface } from "../logic/plan"; import Button from "./Button.svelte"; + import MessageQueueList from "./MessageQueueList.svelte"; import Number from "./Number.svelte"; import Progress from "./Progress.svelte"; import Selector from "./Selector.svelte"; @@ -40,6 +41,8 @@ unknown ids gracefully render nothing. --> {#if group.field.rendererId === "table"} + {:else if group.field.rendererId === "message-queue"} + {/if} {/if} {/each} -- cgit v1.2.3