summaryrefslogtreecommitdiffhomepage
path: root/src/features
diff options
context:
space:
mode:
Diffstat (limited to 'src/features')
-rw-r--r--src/features/chat/ports.ts9
-rw-r--r--src/features/chat/store.svelte.ts112
-rw-r--r--src/features/chat/store.test.ts276
-rw-r--r--src/features/chat/test-helpers.ts16
-rw-r--r--src/features/chat/ui/Composer.svelte25
-rw-r--r--src/features/surface-host/logic/message-queue.test.ts48
-rw-r--r--src/features/surface-host/logic/message-queue.ts45
-rw-r--r--src/features/surface-host/ui/MessageQueueList.svelte22
-rw-r--r--src/features/surface-host/ui/SurfaceView.svelte3
9 files changed, 529 insertions, 27 deletions
diff --git a/src/features/chat/ports.ts b/src/features/chat/ports.ts
index f8c665f..ffe2c94 100644
--- a/src/features/chat/ports.ts
+++ b/src/features/chat/ports.ts
@@ -1,12 +1,17 @@
import type {
+ ChatQueueMessage,
ChatSendMessage,
ConversationHistoryResponse,
ConversationMetricsResponse,
} from "@dispatch/transport-contract";
-/** Injected transport port — sends chat messages to the server. */
+/**
+ * Injected transport port — sends chat messages to the server. Accepts both
+ * `chat.send` (start a turn) and `chat.queue` (enqueue a steering message;
+ * auto-starts a turn if idle).
+ */
export interface ChatTransport {
- send(msg: ChatSendMessage): void;
+ send(msg: ChatSendMessage | ChatQueueMessage): void;
}
/**
diff --git a/src/features/chat/store.svelte.ts b/src/features/chat/store.svelte.ts
index e74980d..9beabfc 100644
--- a/src/features/chat/store.svelte.ts
+++ b/src/features/chat/store.svelte.ts
@@ -1,9 +1,10 @@
import type {
ChatDeltaMessage,
ChatErrorMessage,
+ ChatQueueMessage,
ChatSendMessage,
} from "@dispatch/transport-contract";
-import type { ChatMessage } from "@dispatch/wire";
+import type { ChatMessage, StoredChunk } from "@dispatch/wire";
import type { RenderedChunk, TranscriptState } from "../../core/chunks";
import {
appendUserMessage,
@@ -89,7 +90,29 @@ export interface ChatStore {
readonly thinkingKeyBase: number;
handleDelta(msg: ChatDeltaMessage | ChatErrorMessage): void;
send(text: string): void;
+ /**
+ * Enqueue a steering message onto the conversation's queue (`chat.queue`
+ * WS op). While a turn is generating, the message is delivered mid-turn at
+ * the next tool-result boundary (a `steering` `AgentEvent` fires + the
+ * message-queue surface updates). When no turn is active, the server
+ * auto-starts a turn with the message as its opening prompt (equivalent to
+ * `chat.send`). No optimistic transcript echo — the queue SURFACE carries the
+ * pending message until drain; the `steering` event places it in the
+ * transcript. `text` must be non-empty (the server 400/errors otherwise).
+ */
+ queueMessage(text: string): void;
setModel(model: string): void;
+ /**
+ * Update the chat limit LIVE: re-normalizes, then adjusts the loaded window.
+ * Lowering it unloads older committed chunks (deferred via the gate while the
+ * reader is scrolled up, catching up on the next mutation). Raising it
+ * REFILLS older history (cache first, then CR-5 `?beforeSeq=`) up to the
+ * fresh-load window (`initialWindowSize` = 75% of the limit) — the same
+ * window a fresh `load()` would show — so upping the limit reveals more
+ * history instead of leaving a partial view. New deltas + loads use the new
+ * limit. The refill awaits, so a caller can preserve scroll over the prepend.
+ */
+ setChatLimit(limit: number): Promise<void>;
load(): Promise<void>;
/**
* Page one unload-unit (`ceil(limit/4)`) of earlier history back in — the
@@ -117,7 +140,7 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
let _model = $state<string | undefined>(deps.model);
let disposed = false;
- const chatLimit = normalizeChatLimit(deps.chatLimit);
+ let chatLimit = normalizeChatLimit(deps.chatLimit);
/**
* Enforce the chat limit after a transcript mutation — unless the injected
@@ -166,6 +189,52 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
}
}
+ /**
+ * Fetch up to `want` older chunks (seq < `oldest`) — cache first, then a
+ * CR-5 `?beforeSeq=&limit=` server backfill when the cache is too shallow,
+ * persisting it so the next read is local. Returns every locally-known
+ * chunk older than `oldest` (the caller — `restoreEarlier` — takes the
+ * newest `count` of them). Shared by `showEarlier` and the raise-refill.
+ */
+ async function backfillOlder(oldest: number, want: number): Promise<readonly StoredChunk[]> {
+ let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest);
+ const oldestKnown = earlier[0]?.seq ?? oldest;
+ if (earlier.length < want && oldestKnown > 1) {
+ const res = await deps.historySync(deps.conversationId, 0, {
+ beforeSeq: oldestKnown,
+ limit: want - earlier.length,
+ });
+ const merged = await deps.cache.commit(deps.conversationId, res.chunks);
+ earlier = merged.filter((c) => c.seq < oldest);
+ }
+ return earlier;
+ }
+
+ /**
+ * Refill toward the fresh-load window after a limit RAISE: pull older
+ * history (cache first, then server) so the loaded set grows to match what a
+ * fresh `load()` would show at the new limit. No-op when already at the
+ * origin (seq 1) or already within the window. `restoreEarlier` re-derives
+ * the window start at apply time, so a delta landing during the await can't
+ * corrupt the merge. NOT gated (refilling prepends above the viewport; the
+ * caller preserves scroll position).
+ */
+ async function refill(): Promise<void> {
+ if (disposed) return;
+ const oldest = transcript.committed[0]?.seq ?? transcript.hiddenBeforeSeq;
+ if (oldest <= 1) return;
+ const want = initialWindowSize(chatLimit) - transcript.committed.length;
+ if (want <= 0) return;
+ try {
+ const earlier = await backfillOlder(oldest, want);
+ if (earlier.length === 0) return;
+ transcript = restoreEarlier(transcript, earlier, want);
+ _error = null;
+ } catch (err) {
+ _error = err instanceof Error ? err.message : String(err);
+ }
+ }
+
return {
get messages(): readonly ChatMessage[] {
return selectMessages(transcript);
@@ -230,10 +299,31 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
deps.transport.send(msg);
},
+ queueMessage(text: string): void {
+ const trimmed = text.trim();
+ if (trimmed.length === 0) return;
+ const msg: ChatQueueMessage = {
+ type: "chat.queue",
+ conversationId: deps.conversationId,
+ text: trimmed,
+ };
+ deps.transport.send(msg);
+ },
+
setModel(model: string): void {
_model = model;
},
+ async setChatLimit(limit: number): Promise<void> {
+ const prev = chatLimit;
+ chatLimit = normalizeChatLimit(limit);
+ if (chatLimit < prev) {
+ maybeTrim();
+ } else if (chatLimit > prev) {
+ await refill();
+ }
+ },
+
async load(): Promise<void> {
// Fresh load shows only the newest 75% of the limit — headroom before the
// first trim. A warm cache is windowed locally (synchronously with its
@@ -256,23 +346,7 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
if (oldest <= 1) return;
const want = unloadCount(chatLimit);
try {
- let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest);
- // The local cache may not reach far enough back (a server-windowed
- // fresh load cached only the window): page the missing OLDER run in
- // from the server (CR-5 `?beforeSeq=&limit=`) and persist it, so the
- // next page-in is local. Seqs are gap-free, so the fetched run is
- // contiguous with what we hold. NOTE: the backfill response's
- // `latestSeq` is a window cursor — never fed to the tail cursor
- // (ours derives from the cache's max seq).
- const oldestKnown = earlier[0]?.seq ?? oldest;
- if (earlier.length < want && oldestKnown > 1) {
- const res = await deps.historySync(deps.conversationId, 0, {
- beforeSeq: oldestKnown,
- limit: want - earlier.length,
- });
- const merged = await deps.cache.commit(deps.conversationId, res.chunks);
- earlier = merged.filter((c) => c.seq < oldest);
- }
+ const earlier = await backfillOlder(oldest, want);
transcript = restoreEarlier(transcript, earlier, want);
_error = null;
} catch (err) {
diff --git a/src/features/chat/store.test.ts b/src/features/chat/store.test.ts
index 3232009..2d75139 100644
--- a/src/features/chat/store.test.ts
+++ b/src/features/chat/store.test.ts
@@ -144,6 +144,93 @@ describe("createChatStore", () => {
store.dispose();
});
+ describe("queueMessage (chat.queue — steering)", () => {
+ it("posts a chat.queue with conversationId + text", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage("steer left");
+
+ expect(transport.sent).toHaveLength(0); // chat.send stays empty
+ expect(transport.sentQueue).toHaveLength(1);
+ expect(transport.sentQueue[0]?.type).toBe("chat.queue");
+ expect(transport.sentQueue[0]?.conversationId).toBe(CONV_ID);
+ expect(transport.sentQueue[0]?.text).toBe("steer left");
+
+ store.dispose();
+ });
+
+ it("trims whitespace before sending", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage(" padded ");
+
+ expect(transport.sentQueue[0]?.text).toBe("padded");
+
+ store.dispose();
+ });
+
+ it("does not send for empty/whitespace-only text", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage(" ");
+ store.queueMessage("");
+
+ expect(transport.sentQueue).toHaveLength(0);
+
+ store.dispose();
+ });
+
+ it("does NOT optimistically echo into the transcript (the surface carries the queue)", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.queueMessage("queued steering message");
+
+ expect(store.chunks).toHaveLength(0); // no transcript echo
+
+ store.dispose();
+ });
+ });
+
it("chat.error sets error", () => {
const transport = createFakeTransport();
const historySync = createFakeHistorySync();
@@ -1248,6 +1335,195 @@ describe("createChatStore", () => {
store.dispose();
});
+ it("setChatLimit: lowering the limit trims older committed chunks live", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+
+ // Load 80 committed chunks (under the limit — no trim yet).
+ historySync.returnChunks = Array.from({ length: 80 }, (_, i) => makeStoredChunk(i + 1));
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" }));
+ await vi.waitFor(() => {
+ expect(store.chunks).toHaveLength(80);
+ });
+
+ // Lower the limit to 10: 80 → unload ceil(10/4)=3 per quarter, needs
+ // ceil((80-10)/3)=24 quarters → drop min(72, 80)=72 → 8 remain.
+ await store.setChatLimit(10);
+ expect(store.chunks).toHaveLength(8);
+ expect(store.chunks[0]?.seq).toBe(73);
+ expect(store.hasEarlier).toBe(true);
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising the limit refills older history up to the fresh-load window", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ // Cache holds 200 chunks; load at limit 100 → window 75 → seqs 126..200.
+ await cache.impl.commit(
+ CONV_ID,
+ Array.from({ length: 200 }, (_, i) => makeStoredChunk(i + 1)),
+ );
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+ await store.load();
+ expect(store.chunks).toHaveLength(75);
+ expect(store.chunks[0]?.seq).toBe(126);
+ expect(store.hasEarlier).toBe(true);
+
+ // Raise to 200 → window floor(0.75×200)=150 → refill 75 older chunks
+ // (seqs 51..125) from the cache. No server backfill (cache is deep enough).
+ await store.setChatLimit(200);
+ expect(historySync.calls).toHaveLength(1); // the load-time tail sync only
+ expect(store.chunks).toHaveLength(150);
+ expect(store.chunks[0]?.seq).toBe(51);
+ expect(store.hasEarlier).toBe(true); // 51 > 1
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising backfills from the server when the cache is too shallow", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ // Server holds 200; cold-cache load at limit 100 → window 75 → seqs 126..200.
+ historySync.returnChunks = Array.from({ length: 200 }, (_, i) => makeStoredChunk(i + 1));
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+ await store.load();
+ expect(store.chunks[0]?.seq).toBe(126);
+
+ // Raise to 200 → want 75 older. Cache only holds 126..200 → backfill
+ // seqs 51..125 from the server (CR-5 ?beforeSeq=126&limit=75).
+ await store.setChatLimit(200);
+ const backfill = historySync.calls[1];
+ expect(backfill?.window).toEqual({ beforeSeq: 126, limit: 75 });
+ expect(store.chunks).toHaveLength(150);
+ expect(store.chunks[0]?.seq).toBe(51);
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising refills all available older history (down to the origin)", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+
+ // 101 chunks → one trim pass drops 25 → 76 remain (seqs 26..101).
+ historySync.returnChunks = Array.from({ length: 101 }, (_, i) => makeStoredChunk(i + 1));
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" }));
+ await vi.waitFor(() => {
+ expect(store.chunks).toHaveLength(76);
+ });
+ expect(store.chunks[0]?.seq).toBe(26);
+ expect(store.hasEarlier).toBe(true);
+
+ // Raise to 500 → window 375 → want 299 older. The cache holds only
+ // seqs 1..25 below the window (no more server-side) → restore all 25 →
+ // 101 loaded, reaching the origin.
+ await store.setChatLimit(500);
+ expect(store.chunks).toHaveLength(101);
+ expect(store.chunks[0]?.seq).toBe(1);
+ expect(store.hasEarlier).toBe(false);
+
+ store.dispose();
+ });
+
+ it("setChatLimit: raising is a no-op when the window already starts at the origin", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ await cache.impl.commit(
+ CONV_ID,
+ Array.from({ length: 50 }, (_, i) => makeStoredChunk(i + 1)),
+ );
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+ await store.load(); // only 50 chunks → all loaded, window starts at seq 1
+ expect(store.chunks).toHaveLength(50);
+ expect(store.hasEarlier).toBe(false);
+ const callsAfterLoad = historySync.calls.length;
+
+ await store.setChatLimit(500); // raise → refill no-ops (oldest = 1)
+ expect(store.chunks).toHaveLength(50);
+ expect(store.chunks[0]?.seq).toBe(1);
+ expect(historySync.calls).toHaveLength(callsAfterLoad); // no backfill
+
+ store.dispose();
+ });
+
+ it("setChatLimit: a nonsensical value is normalized (no crash, no trim)", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ chatLimit: 100,
+ });
+
+ historySync.returnChunks = Array.from({ length: 50 }, (_, i) => makeStoredChunk(i + 1));
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ store.handleDelta(deltaEvent({ type: "turn-sealed", conversationId: CONV_ID, turnId: "t1" }));
+ await vi.waitFor(() => {
+ expect(store.chunks).toHaveLength(50);
+ });
+
+ // NaN normalizes to the default (256). prev was 100 → raise → refill,
+ // but the loaded window already starts at seq 1 (origin) → no-op.
+ await store.setChatLimit(Number.NaN);
+ expect(store.chunks).toHaveLength(50);
+
+ store.dispose();
+ });
+
it("resync is a no-op after dispose", async () => {
const transport = createFakeTransport();
const historySync = createFakeHistorySync();
diff --git a/src/features/chat/test-helpers.ts b/src/features/chat/test-helpers.ts
index 6bb98a1..100449f 100644
--- a/src/features/chat/test-helpers.ts
+++ b/src/features/chat/test-helpers.ts
@@ -1,19 +1,29 @@
+import type { ChatQueueMessage, ChatSendMessage } from "@dispatch/transport-contract";
import type { StoredChunk } from "@dispatch/wire";
import type { ConversationCache } from "../conversation-cache";
import type { ChatTransport, HistorySync, HistoryWindow, MetricsSync } from "./ports";
export interface FakeTransport {
- readonly sent: import("@dispatch/transport-contract").ChatSendMessage[];
+ /** All `chat.send` messages sent through the fake transport. */
+ readonly sent: ChatSendMessage[];
+ /** All `chat.queue` messages sent through the fake transport. */
+ readonly sentQueue: ChatQueueMessage[];
readonly impl: ChatTransport;
}
export function createFakeTransport(): FakeTransport {
- const sent: import("@dispatch/transport-contract").ChatSendMessage[] = [];
+ const sent: ChatSendMessage[] = [];
+ const sentQueue: ChatQueueMessage[] = [];
return {
sent,
+ sentQueue,
impl: {
send(msg) {
- sent.push(msg);
+ if (msg.type === "chat.queue") {
+ sentQueue.push(msg);
+ } else {
+ sent.push(msg);
+ }
},
},
};
diff --git a/src/features/chat/ui/Composer.svelte b/src/features/chat/ui/Composer.svelte
index 24c2c19..d519efc 100644
--- a/src/features/chat/ui/Composer.svelte
+++ b/src/features/chat/ui/Composer.svelte
@@ -8,10 +8,18 @@
let {
onSend,
+ onQueue,
contextSize = undefined,
status = "idle",
}: {
onSend: (text: string) => void;
+ /**
+ * Enqueue a steering message (`chat.queue`). When provided AND the status
+ * is `running`, the send button becomes a "Queue" button that steers the
+ * in-flight turn instead of starting a new one. When absent, `onSend` is
+ * used regardless (tests / non-steering contexts).
+ */
+ onQueue?: (text: string) => void;
// Current context occupancy (latest turn's contextSize), or `undefined`
// when unknown — the status bar then shows "— tokens", never 0%.
contextSize?: number | undefined;
@@ -26,6 +34,13 @@
const usage = $derived(computeContextUsage(contextSize, MAX_CONTEXT));
const hasUsage = $derived(contextSize !== undefined);
+ // While a turn is generating, the send button becomes a "Queue" button that
+ // enqueues a steering message (`chat.queue`) instead of starting a new turn
+ // (`chat.send`). Falls back to `onSend` when no `onQueue` is wired.
+ const steering = $derived(status === "running" && onQueue !== undefined);
+ const submitLabel = $derived(steering ? "Queue" : "Send");
+ const placeholder = $derived(steering ? "Steer the conversation..." : "Type a message...");
+
// As the window fills, escalate color: calm → warning → danger.
function fillClass(pct: number): string {
if (pct >= 90) return "progress-error";
@@ -58,7 +73,11 @@
function handleSubmit(): void {
const trimmed = text.trim();
if (trimmed.length === 0) return;
- onSend(trimmed);
+ if (steering) {
+ onQueue?.(trimmed);
+ } else {
+ onSend(trimmed);
+ }
text = "";
}
@@ -84,12 +103,12 @@
class="textarea textarea-bordered flex-1 resize-none leading-normal !min-h-0 h-auto"
bind:value={text}
onkeydown={handleKeydown}
- placeholder="Type a message..."
+ placeholder={placeholder}
rows="1"
aria-label="Message input"
></textarea>
<button class="btn btn-primary w-20 shrink-0" type="submit" disabled={!hasText}>
- Send
+ {submitLabel}
</button>
</div>
diff --git a/src/features/surface-host/logic/message-queue.test.ts b/src/features/surface-host/logic/message-queue.test.ts
new file mode 100644
index 0000000..ce078d9
--- /dev/null
+++ b/src/features/surface-host/logic/message-queue.test.ts
@@ -0,0 +1,48 @@
+import type { QueuedMessage } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import { parseMessageQueuePayload } from "./message-queue";
+
+const msg = (id: string, text: string, queuedAt = 1_700_000_000_000): QueuedMessage => ({
+ id,
+ text,
+ queuedAt,
+});
+
+describe("parseMessageQueuePayload", () => {
+ it("parses a well-formed payload with messages", () => {
+ const data = parseMessageQueuePayload({
+ messages: [msg("m1", "steer left"), msg("m2", "actually, go right")],
+ });
+ expect(data).toEqual({
+ messages: [msg("m1", "steer left"), msg("m2", "actually, go right")],
+ });
+ });
+
+ it("parses an empty-messages payload (queue is empty)", () => {
+ expect(parseMessageQueuePayload({ messages: [] })).toEqual({ messages: [] });
+ });
+
+ it("preserves message order", () => {
+ const data = parseMessageQueuePayload({
+ messages: [msg("a", "first"), msg("b", "second"), msg("c", "third")],
+ });
+ expect(data?.messages.map((m) => m.id)).toEqual(["a", "b", "c"]);
+ });
+
+ it.each([
+ ["null", null],
+ ["a number", 7],
+ ["a string", "nope"],
+ ["missing messages key", { foo: [] }],
+ ["messages not an array", { messages: "x" }],
+ ["entry not an object", { messages: ["x"] }],
+ ["entry missing id", { messages: [{ text: "x", queuedAt: 1 }] }],
+ ["entry with non-string id", { messages: [{ id: 1, text: "x", queuedAt: 1 }] }],
+ ["entry missing text", { messages: [{ id: "m1", queuedAt: 1 }] }],
+ ["entry with non-string text", { messages: [{ id: "m1", text: 1, queuedAt: 1 }] }],
+ ["entry missing queuedAt", { messages: [{ id: "m1", text: "x" }] }],
+ ["entry with non-finite queuedAt", { messages: [msg("m1", "x", Number.NaN)] }],
+ ])("returns null for invalid payload: %s", (_label, payload) => {
+ expect(parseMessageQueuePayload(payload)).toBeNull();
+ });
+});
diff --git a/src/features/surface-host/logic/message-queue.ts b/src/features/surface-host/logic/message-queue.ts
new file mode 100644
index 0000000..a8e1567
--- /dev/null
+++ b/src/features/surface-host/logic/message-queue.ts
@@ -0,0 +1,45 @@
+import type { QueuedMessage } from "@dispatch/wire";
+
+/**
+ * Pure parser for the `rendererId: "message-queue"` custom-field payload.
+ *
+ * The message-queue extension's per-conversation surface emits ONE `custom`
+ * field with `rendererId: "message-queue"` and `payload: QueuePayload`
+ * (`{ messages: QueuedMessage[] }` — the current queue snapshot). This parser
+ * validates the untyped `payload: unknown` at the network seam so a
+ * hostile/partial payload can never crash the renderer (graceful skip → null).
+ *
+ * Empty `messages` is a valid, parseable state (the queue is empty — nothing to
+ * render); the caller hides the panel. Null is returned only for a malformed
+ * payload shape.
+ */
+export interface MessageQueueData {
+ readonly messages: readonly QueuedMessage[];
+}
+
+function isQueuedMessage(v: unknown): v is QueuedMessage {
+ if (typeof v !== "object" || v === null) return false;
+ const o = v as Record<string, unknown>;
+ return (
+ typeof o.id === "string" &&
+ typeof o.text === "string" &&
+ typeof o.queuedAt === "number" &&
+ Number.isFinite(o.queuedAt)
+ );
+}
+
+export function parseMessageQueuePayload(payload: unknown): MessageQueueData | null {
+ if (typeof payload !== "object" || payload === null) return null;
+ const obj = payload as Record<string, unknown>;
+ const raw = obj.messages;
+ if (!Array.isArray(raw)) return null;
+ const messages: QueuedMessage[] = [];
+ for (const entry of raw) {
+ if (!isQueuedMessage(entry)) return null;
+ messages.push(entry);
+ }
+ return { messages };
+}
+
+/** The `rendererId` the message-queue extension's `custom` surface field uses. */
+export const MESSAGE_QUEUE_RENDERER_ID = "message-queue";
diff --git a/src/features/surface-host/ui/MessageQueueList.svelte b/src/features/surface-host/ui/MessageQueueList.svelte
new file mode 100644
index 0000000..12de970
--- /dev/null
+++ b/src/features/surface-host/ui/MessageQueueList.svelte
@@ -0,0 +1,22 @@
+<script lang="ts">
+ import { parseMessageQueuePayload } from "../logic/message-queue";
+
+ let { payload }: { readonly payload: unknown } = $props();
+
+ // Parse defensively; an unparseable payload yields null → render nothing
+ // (graceful skip, per the custom-field contract).
+ const data = $derived(parseMessageQueuePayload(payload));
+</script>
+
+{#if data !== null && data.messages.length > 0}
+ <ul class="flex flex-col gap-1 text-sm">
+ {#each data.messages as msg (msg.id)}
+ <li class="rounded-box bg-base-200 px-3 py-2">
+ <p class="whitespace-pre-wrap">{msg.text}</p>
+ <time class="text-xs opacity-50" datetime={new Date(msg.queuedAt).toISOString()}>
+ {new Date(msg.queuedAt).toLocaleTimeString()}
+ </time>
+ </li>
+ {/each}
+ </ul>
+{/if}
diff --git a/src/features/surface-host/ui/SurfaceView.svelte b/src/features/surface-host/ui/SurfaceView.svelte
index 24be8b8..e5f807a 100644
--- a/src/features/surface-host/ui/SurfaceView.svelte
+++ b/src/features/surface-host/ui/SurfaceView.svelte
@@ -2,6 +2,7 @@
import type { InvokeMessage, SurfaceSpec } from "@dispatch/ui-contract";
import { groupRenderFields, planSurface } from "../logic/plan";
import Button from "./Button.svelte";
+ import MessageQueueList from "./MessageQueueList.svelte";
import Number from "./Number.svelte";
import Progress from "./Progress.svelte";
import Selector from "./Selector.svelte";
@@ -40,6 +41,8 @@
unknown ids gracefully render nothing. -->
{#if group.field.rendererId === "table"}
<SurfaceTable payload={group.field.payload} />
+ {:else if group.field.rendererId === "message-queue"}
+ <MessageQueueList payload={group.field.payload} />
{/if}
{/if}
{/each}