diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 15:08:24 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 15:08:24 +0900 |
| commit | 5ef7cc2916c544a66d68805063b02290f24d9a25 (patch) | |
| tree | 51724187d01813bbbbaef513eb8cada2e1bda1a6 | |
| parent | fb37680bd013509ab5d72619f261713e8473e988 (diff) | |
| download | dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.tar.gz dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.zip | |
feat(chat): multi-client live view — watch in-flight turns + user prompt on stream
- subscribe every open conversation on load + WS reconnect (resync), unsubscribe on tab close
- derive a stream-based 'generating' state for watchers (Composer running indicator)
- fold the user-message turn event so watchers render the prompt mid-turn (de-dup vs sender's optimistic echo)
- re-pin [email protected] / [email protected]; re-mirror contracts; add user-message to the exhaustiveness guard
| -rw-r--r-- | .dispatch/transport-contract.reference.md | 59 | ||||
| -rw-r--r-- | .dispatch/wire.reference.md | 30 | ||||
| -rw-r--r-- | backend-handoff.md | 75 | ||||
| -rw-r--r-- | src/app/App.svelte | 6 | ||||
| -rw-r--r-- | src/app/store.svelte.ts | 31 | ||||
| -rw-r--r-- | src/app/store.test.ts | 153 | ||||
| -rw-r--r-- | src/core/chunks/index.ts | 10 | ||||
| -rw-r--r-- | src/core/chunks/reducer.test.ts | 160 | ||||
| -rw-r--r-- | src/core/chunks/reducer.ts | 66 | ||||
| -rw-r--r-- | src/core/chunks/selectors.ts | 9 | ||||
| -rw-r--r-- | src/core/chunks/types.ts | 8 | ||||
| -rw-r--r-- | src/core/wire/conformance.test.ts | 17 | ||||
| -rw-r--r-- | src/core/wire/conformance.ts | 6 | ||||
| -rw-r--r-- | src/features/chat/store.svelte.ts | 30 | ||||
| -rw-r--r-- | src/features/chat/store.test.ts | 110 |
15 files changed, 746 insertions, 24 deletions
diff --git a/.dispatch/transport-contract.reference.md b/.dispatch/transport-contract.reference.md index 40ced1e..86eac50 100644 --- a/.dispatch/transport-contract.reference.md +++ b/.dispatch/transport-contract.reference.md @@ -5,9 +5,31 @@ > hangs on a permission prompt). Your CODE still imports `@dispatch/transport-contract` normally — > this file is for READING only. > -> **Orchestrator:** SNAPSHOT of `[email protected]` (the metrics endpoint shipped + -> version-bumped + LIVE-VERIFIED). Depends on `@dispatch/[email protected]` (see `wire.reference.md`) + -> `@dispatch/[email protected]` (see `ui-contract.reference.md`). +> **Orchestrator:** SNAPSHOT of `[email protected]` (CR-3 user-message shipped). Depends on +> `@dispatch/[email protected]` (see `wire.reference.md`) + `@dispatch/[email protected]` (see +> `ui-contract.reference.md`). +> +> **2026-06-12 delta (CR-3 user-message handoff — package bumped `0.7.0` → `0.8.0`):** NO transport +> shape change — it re-exports `AgentEvent` (which `chat.delta` / `/chat` NDJSON carry), and that union +> gained the additive `TurnInputEvent` (`{ type: "user-message"; conversationId; turnId; text }`), the +> turn's user prompt, emitted as the FIRST event of every turn (before `turn-start`) and replayed to +> watchers/late-joiners. See the `wire.reference.md` CR-3 delta + `TurnInputEvent` for the definition. +> +> **2026-06-12 delta (turn-continuity handoff — package bumped `0.6.0` → `0.7.0`, ADDITIVE):** a turn +> is no longer bound to the WS connection — it runs to completion server-side regardless of any +> client, and any number of connections can watch the same conversation (incl. a late-joiner that +> connects mid-turn). Two new client→server WS messages: `ChatSubscribeMessage` +> (`{ type: "chat.subscribe"; conversationId }`) and `ChatUnsubscribeMessage` +> (`{ type: "chat.unsubscribe"; conversationId }`); `WsClientMessage` now unions both. Server→client +> is UNCHANGED (turn events still arrive as `chat.delta`, replayed AND live). Semantics: `chat.subscribe` +> registers the connection + immediately REPLAYS the in-flight turn's events so far (from its +> `turn-start`) then streams live (nothing replayed if idle); `chat.send` AUTO-subscribes the sending +> connection (a 2nd send while generating ⇒ `chat.error` + you stay subscribed to watch the running +> turn); `chat.unsubscribe`/socket-close drops the subscription but NEVER stops the turn; subscriptions +> persist across turns. FE consumes via the `chat` feature + app store (re-subscribe every open +> conversation on (re)connect + page load; derive a "running" state structurally from +> `turn-start`…no-`done`/`turn-sealed`-yet). OUT of scope: per-step crash-resume, concurrent-send +> arbitration, explicit stop. > > **2026-06-12 delta (context-size handoff — package bumped `0.5.0` → `0.6.0`, depends on > `[email protected]`):** no NEW transport shape — the optional `contextSize?: number` rides the @@ -293,8 +315,37 @@ export interface ChatErrorMessage { readonly message: string; } +/** + * Client → server: start WATCHING a conversation's live turn events WITHOUT sending. + * On subscribe the server REPLAYS the current in-flight turn so far (from its + * `turn-start`) as `chat.delta`, then streams live; nothing replayed if idle (rely + * on `GET /conversations/:id` history). Infer "generating" from a replayed + * `turn-start` with no matching `done`/`turn-sealed` yet. `chat.send` already + * auto-subscribes the sender, so this is for conversations you VIEW but didn't send to + * (a 2nd device, or a reloaded/reconnected client). Idempotent per (connection, id). + */ +export interface ChatSubscribeMessage { + readonly type: "chat.subscribe"; + readonly conversationId: string; +} + +/** + * Client → server: stop watching a conversation's turn events on this connection. + * Does NOT stop/affect the turn (it runs to completion regardless of subscribers). + * Socket close drops all of a connection's subscriptions the same way — again + * WITHOUT aborting any in-flight turn. + */ +export interface ChatUnsubscribeMessage { + readonly type: "chat.unsubscribe"; + readonly conversationId: string; +} + /** Every client → server WS message: surface ops + chat ops. Discriminate on `type`. */ -export type WsClientMessage = SurfaceClientMessage | ChatSendMessage; +export type WsClientMessage = + | SurfaceClientMessage + | ChatSendMessage + | ChatSubscribeMessage + | ChatUnsubscribeMessage; /** Every server → client WS message: surface ops + chat ops. Discriminate on `type`. */ export type WsServerMessage = SurfaceServerMessage | ChatDeltaMessage | ChatErrorMessage; diff --git a/.dispatch/wire.reference.md b/.dispatch/wire.reference.md index cf1410a..40f94cf 100644 --- a/.dispatch/wire.reference.md +++ b/.dispatch/wire.reference.md @@ -4,8 +4,16 @@ > types WITHOUT following the `file:` dep symlink out of this repo (which hangs on a permission > prompt). Your CODE still imports `@dispatch/wire` normally — this file is for READING only. > -> **Orchestrator:** SNAPSHOT of `[email protected]` (the metrics types below shipped + version-bumped). -> Regenerate whenever `@dispatch/wire` changes. +> **Orchestrator:** SNAPSHOT of `[email protected]` (the metrics types below + the `user-message` turn event +> shipped + version-bumped). Regenerate whenever `@dispatch/wire` changes. +> +> **2026-06-12 delta (CR-3 user-message handoff — package bumped `0.5.0` → `0.6.0`, ADDITIVE):** adds a +> new `AgentEvent` union member `TurnInputEvent` (`{ type: "user-message"; conversationId; turnId; text }`) +> that surfaces the turn's USER prompt INTO the outward event stream. Emitted ONCE as the FIRST event of +> every turn (before `turn-start`), so it is buffered + replayed to every subscriber — live AND late-join +> — and rides `chat.delta`/NDJSON like any other event. Fixes CR-3 (a pure watcher couldn't see the prompt +> until seal). The sender still echoes its own prompt optimistically, so consumers DE-DUP against that +> (by text); a pure watcher renders it directly. Persistence/metrics unchanged. See `TurnInputEvent` below. > > **2026-06-12 delta (context-size handoff — package bumped `0.4.0` → `0.5.0`):** adds an OPTIONAL > `contextSize?: number` to BOTH `TurnDoneEvent` (live `done`) and `TurnMetrics` (persisted) — the @@ -249,6 +257,7 @@ export interface TurnMetrics { export type AgentEvent = | StatusEvent | TurnStartEvent + | TurnInputEvent | TurnTextDeltaEvent | TurnReasoningDeltaEvent | TurnToolCallEvent @@ -274,6 +283,23 @@ export interface TurnStartEvent { readonly turnId: string; } +/** + * The user prompt that opened this turn, surfaced INTO the turn's outward event + * stream so a WATCHER (subscribed but not the sender) can render the prompt + * mid-turn — the user message is otherwise persisted only at seal. Emitted ONCE + * as the FIRST event of the turn (before `turn-start`); buffered + replayed to + * every subscriber (live + late-join). The sender echoes its own prompt + * optimistically, so DE-DUP against that (by text); a pure watcher renders it + * directly. Carries the raw `text` passed to the provider. (Turn-scoped: it + * carries `turnId`, so a multi-turn transcript attributes each prompt to its turn.) + */ +export interface TurnInputEvent { + readonly type: "user-message"; + readonly conversationId: string; + readonly turnId: string; + readonly text: string; +} + /** Incremental text content from the model during a turn. */ export interface TurnTextDeltaEvent { readonly type: "text-delta"; diff --git a/backend-handoff.md b/backend-handoff.md index 847282b..b784eef 100644 --- a/backend-handoff.md +++ b/backend-handoff.md @@ -5,15 +5,32 @@ > **From:** dispatch-web orchestrator · **To:** arch-rewrite orchestrator · **Courier:** the user. > `lsp` does NOT span the repos (ORCHESTRATOR §5) — every cross-repo ask flows through here. -_Last updated: 2026-06-12. **FE is current on `[email protected]` / `[email protected]`.** All handoffs +_Last updated: 2026-06-12. **FE is current on `[email protected]` / `[email protected]`.** All handoffs to date are consumed: surfaces + WS, conversation transcript/metrics, tabs + model selector, cache-warming (incl. authoritative timer + retention + cache-rate fix), **per-conversation cwd + LSP -status**, and **context size** (the `contextSize` field — `done` live + `TurnMetrics` persisted — -rendered as a current-usage readout above the composer). +status**, **context size** (the `contextSize` field — `done` live + `TurnMetrics` persisted — +rendered as a current-usage readout above the composer), and **turn continuity + multi-client live +view** (`chat.subscribe`/`chat.unsubscribe`; re-attach to a running turn on reconnect/reload/second +device; stream-derived "generating…" state). **Open asks:** CR-1 (Loaded Extensions as a real table) + CR-2 (optional catalog `scope` flag) below. +**CR-3 (watcher couldn't see the USER prompt until seal) → RESOLVED ✅** — backend shipped the +`user-message` turn event (`[email protected]` / `[email protected]`); FE re-pinned + consumption live. The cwd/LSP draft-path verification (`backend-handoff-cwd-lsp.md`) came back **all ✅ confirmed** by the backend (answers in their `frontend-lsp-cwd-handoff.md`) — see §2._ +**Turn-continuity handoff (`frontend-turn-continuity-handoff.md`) → CONSUMED ✅.** Re-pinned +`[email protected]→0.7.0` (additive; `wire` unchanged at `0.5.0`); re-mirrored +`.dispatch/transport-contract.reference.md` with `ChatSubscribeMessage`/`ChatUnsubscribeMessage` + the +widened `WsClientMessage` union. FE now: re-subscribes `chat.subscribe` for EVERY open conversation on +page load + on WS (re)connect (and on close sends `chat.unsubscribe`); `chat.send` still auto-subscribes +the sender, so the draft/promotion path adds none. A pure `generating` flag is folded structurally in +`core/chunks` (`turn-start`/deltas ⇒ true; `done`/`turn-sealed`/`error` ⇒ false; NOT inferred from the +free-form `status` string) and surfaced as `ChatStore.generating` → the Composer status icon now shows a +"running" spinner for any watching client. `ChatStore.resync()` (called from `onReopen`) clears a stale +spinner then pulls a turn that sealed while disconnected from history. 558 tests green. NO new backend +ask. NOT yet live-probed — needs the two-WS / second-device manual check from the handoff's "Quick +manual check" against a running backend. + **Context-size handoff (`frontend-context-size-handoff.md`) → CONSUMED ✅.** Re-pinned `[email protected]→0.5.0` + `[email protected]→0.6.0`; re-mirrored both `.dispatch/*.reference.md`; added "context size" + "context window" to FE `GLOSSARY.md`. `core/metrics` now threads `contextSize` through the `done` fold + @@ -26,7 +43,7 @@ backend ask — but the max-limit denominator is now a live FE need; see §3. ## 1. Pinned backend contracts (consumed by the FE) | Package | Used for | |---|---| @@ -37,7 +54,8 @@ Pinned as `file:` deps: **`[email protected]`; `[email protected]`; `transport-contract Endpoints in use (HTTP **24203**, WS **24205**, CORS `*` incl. `PUT`): `POST /chat` (NDJSON) · `GET /models` · `GET /conversations/:id?sinceSeq=<n>` · `GET /conversations/:id/metrics` · `GET`/`PUT /conversations/:id/cwd` · -`GET /conversations/:id/lsp` · `POST /chat/warm` · WS `chat.send`→`chat.delta`. +`GET /conversations/:id/lsp` · `POST /chat/warm` · WS `chat.send`→`chat.delta` · +WS `chat.subscribe`/`chat.unsubscribe` (watch a conversation's turns without sending; replay + live). Mirrored in-repo for headless agents: `.dispatch/{ui-contract,wire,transport-contract}.reference.md` (regenerate on any contract bump; all current as of `[email protected]` / `[email protected]`). @@ -108,6 +126,53 @@ empty would override the persisted draft cwd. Verified safe today: `chat/store.s `chat.send` with only `type`/`conversationId`/`message`/`model` — no `cwd` field. (The backend offered to harden `/chat` to treat blank as "not provided" if we ever want it — not needed while we omit the field.) +### CR-3 (BUG, multi-client) → **RESOLVED ✅** (Option B shipped; courier `frontend-cr3-user-message-handoff.md`) + +The backend implemented Option B + live-verified it: a new `AgentEvent` member `TurnInputEvent` +(`{ type: "user-message"; conversationId; turnId; text }`) is emitted as the FIRST event of every turn +(before `turn-start`), buffered + replayed to live AND late-join subscribers. `[email protected]→0.6.0`, +`[email protected]→0.8.0` (re-exports the union; no transport-shape change). **FE consumed:** +re-pinned both, re-mirrored `.dispatch/{wire,transport-contract}.reference.md`, promoted the staged +`core/chunks` fold to a typed `case "user-message"` (appends the prompt with a text de-dup vs the sender's +optimistic echo), and added `user-message` to the FE exhaustiveness guard. A pure watcher now shows the user +bubble the moment the turn starts. The original report follows for history. + +**Symptom (reproduced live):** open a conversation in two windows; window A sends a message. Window B +(`chat.subscribe`, a pure watcher) renders the streaming **reply** but NOT the user **prompt** that +triggered it — the user bubble only pops in after `turn-sealed`. + +**Root cause (backend):** the user prompt is never part of the turn's live/replayable stream, and isn't +persisted until the turn ends — so a watcher has no source for it mid-turn. +- The replay buffer holds only `AgentEvent`s (`session-orchestrator/src/orchestrator.ts` `ActiveTurn.buffer`, + pushed in `emitToHub`). `buildUserMessage(text)` (`pure.ts`) is passed straight to the provider and is + **never `emitToHub`'d** → not buffered, not replayed. +- The prompt is persisted only at turn end, atomically with the reply: `orchestrator.ts:244-245` + (`toPersist = [userMsg, ...result.messages]; conversationStore.append(...)`), just before `turn-sealed`. + So a mid-turn `GET /conversations/:id` returns nothing for it either. + +The sender looks fine only because the FE optimistically echoes its own prompt; a pure watcher never sent, +so it has nothing to show. **No FE-only fix is possible** — the prompt text simply isn't sent until seal. + +**Requested fix — Option B (preferred): emit the prompt into the turn's event stream.** +- **`@dispatch/wire` (additive):** add a `TurnInputEvent` to the `AgentEvent` union, e.g. + `{ type: "user-message"; conversationId: string; turnId: string; text: string }`. Bump `wire`. +- **`session-orchestrator`:** `emitToHub(conversationId, { type: "user-message", conversationId, turnId, text })` + at the very start of `runTurnDetached` (before `runTurn`), so it is the first buffered event → replayed to + every subscriber, live and late-join. (No `runTurn`/kernel change needed — the orchestrator already holds + `text` + `turnId` + the hub.) +- Emitting it (and only it) does not change persistence semantics; the existing seal-time append is unchanged. + +**FE side — already staged (inert until you ship it):** `core/chunks` folds a `user-message` event into a +provisional user chunk for watchers, with a content dedup so the sender's optimistic echo isn't duplicated +(`reducer.ts` forward-compat branch + tests). The moment the backend emits `user-message`, both windows show +the prompt immediately; nothing breaks before then. On the new `wire`, we'll re-pin + re-mirror + add it to +the FE exhaustiveness guard. + +**Alternative — Option A (no wire change):** persist the user message at turn START (append `[userMsg]` +before `runTurn`; append only `result.messages` at seal) — then watchers fetch it via history. We do NOT +prefer this: it needs an extra history round-trip per watched turn and changes persistence semantics (an +errored turn would leave a persisted prompt with no reply). + ## 3. Likely NEXT backend asks (heads-up, not yet requested) - **Model max context-window LIMIT** (the denominator for context size) — the context-size handoff diff --git a/src/app/App.svelte b/src/app/App.svelte index dbb346a..50f24e7 100644 --- a/src/app/App.svelte +++ b/src/app/App.svelte @@ -219,7 +219,11 @@ <Composer onSend={handleSend} contextSize={store.activeChat.currentContextSize} - status={store.activeChat.error ? "error" : "idle"} + status={store.activeChat.error + ? "error" + : store.activeChat.generating + ? "running" + : "idle"} /> </div> diff --git a/src/app/store.svelte.ts b/src/app/store.svelte.ts index 6991530..df92b31 100644 --- a/src/app/store.svelte.ts +++ b/src/app/store.svelte.ts @@ -239,6 +239,23 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { } } + /** + * Start watching a conversation's live turn events (`chat.subscribe`). Sent for + * EVERY open conversation — not just the active one — so a backgrounded tab keeps + * streaming a running turn, and a reloaded/second client re-attaches to an + * in-flight turn (the server replays it from `turn-start`). Idempotent server-side; + * the socket queues it until the connection is open. NOT needed right after + * `chat.send` (that auto-subscribes the sending connection). + */ + function subscribeChat(conversationId: string): void { + socket?.send({ type: "chat.subscribe", conversationId }); + } + + /** Stop watching a conversation's turn events (`chat.unsubscribe`). Never stops the turn. */ + function unsubscribeChat(conversationId: string): void { + socket?.send({ type: "chat.unsubscribe", conversationId }); + } + /** The conversation the surfaces should scope to (undefined for a draft). */ function focusedConversationId(): string | undefined { return tabsStore.activeConversationId ?? undefined; @@ -307,6 +324,14 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { : { type: "subscribe", surfaceId, conversationId: sub.conversationId }; socket?.send(msg); } + // Re-attach to every open conversation's turn stream. A turn that kept + // running while we were disconnected resumes streaming (server replays it + // from `turn-start`); one that sealed while we were gone is committed from + // history by `resync()` (which also clears a now-stale "generating"). + for (const tab of tabsStore.tabs) { + subscribeChat(tab.conversationId); + chatStores.get(tab.conversationId)?.resync(); + } }, }; if (opts?.socketFactory !== undefined) { @@ -341,6 +366,10 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { const store = createChatFor(tab.conversationId, tab.model); chatStores.set(tab.conversationId, store); void store.load(); + // Watch each restored conversation's live turns: after a reload mid-turn the + // server replays the in-flight turn so we keep rendering it. Queued until the + // socket opens. + subscribeChat(tab.conversationId); } if (persistedState.activeConversationId !== null) { const activeTab = persistedState.tabs.find( @@ -460,6 +489,8 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { closeTab(conversationId: string): void { tabsStore.closeTab(conversationId); + // Stop watching the closed conversation's turns (does NOT stop the turn). + unsubscribeChat(conversationId); const store = chatStores.get(conversationId); if (store !== undefined) { store.dispose(); diff --git a/src/app/store.test.ts b/src/app/store.test.ts index 19530e2..803d7dc 100644 --- a/src/app/store.test.ts +++ b/src/app/store.test.ts @@ -1,6 +1,6 @@ import type { ConversationHistoryResponse, WsServerMessage } from "@dispatch/transport-contract"; import type { SurfaceServerMessage } from "@dispatch/ui-contract"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import type { WebSocketLike } from "../adapters/ws"; import { createAppStore } from "./store.svelte"; @@ -51,6 +51,55 @@ function fakeSocket(): FakeSocket { return ws; } +/** + * A fake socket that supports the close→reconnect cycle (the base `fakeSocket` + * swallows `onclose`). The factory returns the SAME instance on every connect, so + * `sent` accumulates and `open()` can be driven again after `closeRemote()`. + */ +interface ReconnectableSocket extends WebSocketLike { + sent: string[]; + open(): void; + closeRemote(): void; +} + +function reconnectableSocket(): ReconnectableSocket { + let onopen: (() => void) | null = null; + let onmessage: ((ev: { data: string }) => void) | null = null; + let onclose: ((ev: { code: number; reason: string }) => void) | null = null; + const sent: string[] = []; + return { + send(data: string) { + sent.push(data); + }, + close() {}, + get onopen() { + return onopen; + }, + set onopen(fn) { + onopen = fn; + }, + get onmessage() { + return onmessage; + }, + set onmessage(fn) { + onmessage = fn; + }, + get onclose() { + return onclose; + }, + set onclose(fn) { + onclose = fn; + }, + sent, + open() { + onopen?.(); + }, + closeRemote() { + onclose?.({ code: 1006, reason: "" }); + }, + }; +} + interface FakeFetchOptions { models?: readonly string[]; history?: Record<string, ConversationHistoryResponse>; @@ -70,7 +119,7 @@ function fakeFetchImpl(opts?: FakeFetchOptions): typeof fetch { }; } -function parseSent(ws: FakeSocket): unknown[] { +function parseSent(ws: { sent: string[] }): unknown[] { return ws.sent.map((s) => JSON.parse(s)); } @@ -752,4 +801,104 @@ describe("createAppStore", () => { store.dispose(); }); + + it("subscribes to chat for each restored tab on page load", () => { + const storage = createFakeStorage(); + // First session: create a tab, then dispose. + const ws1 = fakeSocket(); + const store1 = createAppStore({ + socketFactory: () => ws1, + fetchImpl: fakeFetchImpl(), + localStorage: storage, + }); + ws1.resolveOpen(); + store1.send("persist me"); + const convId = store1.tabs[0]?.conversationId as string; + expect(convId).toBeDefined(); + store1.dispose(); + + // Second session: the restored tab must be re-subscribed for live turns. + const ws2 = fakeSocket(); + const store2 = createAppStore({ + socketFactory: () => ws2, + fetchImpl: fakeFetchImpl(), + localStorage: storage, + }); + ws2.resolveOpen(); // flush the queued chat.subscribe + + const subscribed = parseSent(ws2) + .filter((p) => (p as { type: string }).type === "chat.subscribe") + .map((p) => (p as { conversationId: string }).conversationId); + expect(subscribed).toContain(convId); + + store2.dispose(); + }); + + it("unsubscribes from chat when a tab is closed", () => { + const ws = fakeSocket(); + const store = createAppStore({ + socketFactory: () => ws, + fetchImpl: fakeFetchImpl(), + localStorage: createFakeStorage(), + }); + ws.resolveOpen(); + + store.send("first"); + const convId = activeConversationId(store); + + ws.sent.length = 0; + store.closeTab(convId); + + const unsubscribed = parseSent(ws) + .filter((p) => (p as { type: string }).type === "chat.unsubscribe") + .map((p) => (p as { conversationId: string }).conversationId); + expect(unsubscribed).toContain(convId); + + store.dispose(); + }); + + it("re-subscribes chat (and resyncs) for every open conversation on reconnect", async () => { + const fetchedUrls: string[] = []; + const fetchImpl: typeof fetch = async (input: string | URL | Request): Promise<Response> => { + const url = typeof input === "string" ? input : input instanceof URL ? input.href : input.url; + fetchedUrls.push(url); + if (url.endsWith("/models")) { + return new Response(JSON.stringify({ models: ["opencode/deepseek-v4-flash"] }), { + status: 200, + }); + } + return new Response(JSON.stringify({ chunks: [], latestSeq: 0 }), { status: 200 }); + }; + + const ws = reconnectableSocket(); + const store = createAppStore({ + socketFactory: () => ws, + fetchImpl, + httpUrl: "http://localhost:24203", + localStorage: createFakeStorage(), + }); + ws.open(); + + store.send("hi"); + const convId = activeConversationId(store); + + // Drop the connection, wait past the reconnect backoff, then re-open. + ws.sent.length = 0; + fetchedUrls.length = 0; + ws.closeRemote(); + await new Promise((r) => setTimeout(r, 800)); + ws.open(); // reconnect → onReopen + + const subscribed = parseSent(ws) + .filter((p) => (p as { type: string }).type === "chat.subscribe") + .map((p) => (p as { conversationId: string }).conversationId); + expect(subscribed).toContain(convId); + + // resync() pulled the tail from history for the reconnected conversation. + await vi.waitFor(() => { + expect(fetchedUrls.some((u) => u.includes(`/conversations/${convId}?sinceSeq=`))).toBe(true); + }); + + store.dispose(); + }); }); diff --git a/src/core/chunks/index.ts b/src/core/chunks/index.ts index 0718c0d..ecfee74 100644 --- a/src/core/chunks/index.ts +++ b/src/core/chunks/index.ts @@ -1,7 +1,13 @@ export type { RenderGroup, ToolBatchEntry } from "./groups"; export { groupRenderedChunks } from "./groups"; -export { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer"; -export { selectChunks, selectMessages } from "./selectors"; +export { + appendUserMessage, + applyHistory, + clearGenerating, + foldEvent, + initialState, +} from "./reducer"; +export { selectChunks, selectGenerating, selectMessages } from "./selectors"; export type { AccumulatingChunk, ProvisionalChunk, diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts index f2f1b75..35a586c 100644 --- a/src/core/chunks/reducer.test.ts +++ b/src/core/chunks/reducer.test.ts @@ -3,6 +3,7 @@ import type { StoredChunk, TurnDoneEvent, TurnErrorEvent, + TurnInputEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, @@ -12,8 +13,14 @@ import type { TurnUsageEvent, } from "@dispatch/wire"; import { describe, expect, it } from "vitest"; -import { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer"; -import { selectChunks, selectMessages } from "./selectors"; +import { + appendUserMessage, + applyHistory, + clearGenerating, + foldEvent, + initialState, +} from "./reducer"; +import { selectChunks, selectGenerating, selectMessages } from "./selectors"; const turnStart = (turnId: string): TurnStartEvent => ({ type: "turn-start", @@ -112,6 +119,101 @@ describe("initialState", () => { expect(s.currentTurnId).toBeNull(); expect(s.latestUsage).toBeNull(); expect(s.sealedTurnId).toBeNull(); + expect(s.generating).toBe(false); + }); +}); + +describe("foldEvent — generating (turn-running state)", () => { + it("turn-start sets generating true", () => { + let s = initialState(); + expect(selectGenerating(s)).toBe(false); + s = foldEvent(s, turnStart("t1")); + expect(s.generating).toBe(true); + expect(selectGenerating(s)).toBe(true); + }); + + it("a content delta sets generating true (e.g. a late-joiner replay missing turn-start)", () => { + let s = initialState(); + s = foldEvent(s, textDelta("t1", "hi")); + expect(s.generating).toBe(true); + s = initialState(); + s = foldEvent(s, reasoningDelta("t1", "hmm")); + expect(s.generating).toBe(true); + s = initialState(); + s = foldEvent(s, toolCall("t1", "tc1", "bash", {})); + expect(s.generating).toBe(true); + }); + + it("stays generating across the turn's deltas", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "wor")); + s = foldEvent(s, textDelta("t1", "king")); + expect(s.generating).toBe(true); + }); + + it("done clears generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "answer")); + s = foldEvent(s, doneEvent("t1")); + expect(s.generating).toBe(false); + }); + + it("turn-sealed clears generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, turnSealed("t1")); + expect(s.generating).toBe(false); + }); + + it("error clears generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, errorEvent("t1", "boom")); + expect(s.generating).toBe(false); + }); + + it("a new turn re-asserts generating after the previous one finished", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, doneEvent("t1")); + s = foldEvent(s, turnSealed("t1")); + expect(s.generating).toBe(false); + s = foldEvent(s, turnStart("t2")); + expect(s.generating).toBe(true); + }); + + it("status does not change generating (free-form string, not inferred)", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + const next = foldEvent(s, { type: "status", conversationId: "c1", status: "idle" }); + expect(next.generating).toBe(true); + }); +}); + +describe("clearGenerating", () => { + it("clears a set generating flag", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + expect(s.generating).toBe(true); + const cleared = clearGenerating(s); + expect(cleared.generating).toBe(false); + }); + + it("returns the same object when already not generating (no-op)", () => { + const s = initialState(); + expect(clearGenerating(s)).toBe(s); + }); + + it("preserves transcript content while clearing generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "partial")); + const cleared = clearGenerating(s); + expect(cleared.generating).toBe(false); + expect(cleared.accumulating).toEqual({ kind: "text", text: "partial" }); + expect(cleared.currentTurnId).toBe("t1"); }); }); @@ -281,6 +383,60 @@ describe("foldEvent — status and tool-output", () => { }); }); +describe("foldEvent — user-message (the turn's user prompt; backend CR-3)", () => { + const userMessage = (text: string): TurnInputEvent => ({ + type: "user-message", + conversationId: "c1", + turnId: "t1", + text, + }); + + it("a watcher renders the prompt: appends a provisional user chunk + marks generating", () => { + let s = initialState(); + s = foldEvent(s, userMessage("what is 2+2?")); + const chunks = selectChunks(s); + expect(chunks).toHaveLength(1); + expect(chunks[0]?.role).toBe("user"); + expect(chunks[0]?.chunk).toEqual({ type: "text", text: "what is 2+2?" }); + expect(chunks[0]?.provisional).toBe(true); + expect(s.generating).toBe(true); + }); + + it("dedups the SENDER's optimistic echo (no duplicate user bubble)", () => { + let s = initialState(); + s = appendUserMessage(s, "hi"); // optimistic echo from the sender's send() + s = foldEvent(s, userMessage("hi")); // server echo for the same turn + const users = selectChunks(s).filter((c) => c.role === "user"); + expect(users).toHaveLength(1); + }); + + it("appends when the trailing provisional differs (no false dedup)", () => { + let s = initialState(); + s = appendUserMessage(s, "first"); + s = foldEvent(s, userMessage("second")); + const users = selectChunks(s).filter((c) => c.role === "user"); + expect(users).toHaveLength(2); + }); + + it("ignores an empty user-message", () => { + let s = initialState(); + s = foldEvent(s, userMessage("")); + expect(selectChunks(s)).toHaveLength(0); + expect(s.generating).toBe(false); + }); + + it("flushes an accumulating chunk before appending the prompt", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "partial")); + s = foldEvent(s, userMessage("new prompt")); + // the partial assistant text was flushed to provisional, then the user prompt appended + expect(s.accumulating).toBeNull(); + const roles = selectChunks(s).map((c) => c.role); + expect(roles).toEqual(["assistant", "user"]); + }); +}); + describe("applyHistory", () => { it("orders committed chunks by seq", () => { const s = initialState(); diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts index 54b1922..7ce55ce 100644 --- a/src/core/chunks/reducer.ts +++ b/src/core/chunks/reducer.ts @@ -10,9 +10,22 @@ export function initialState(): TranscriptState { currentTurnId: null, latestUsage: null, sealedTurnId: null, + generating: false, }; } +/** + * Clear the `generating` flag without touching anything else. Used on a WS + * (re)connect: a turn may have sealed while we were disconnected, so the live + * `turn-sealed`/`done` that would have cleared `generating` was missed. The + * caller resets here, then re-subscribes — if the turn is still running the + * server's replay re-asserts `generating` via the replayed `turn-start`. + */ +export function clearGenerating(state: TranscriptState): TranscriptState { + if (!state.generating) return state; + return { ...state, generating: false }; +} + function flushAccumulating( provisional: readonly ProvisionalChunk[], acc: AccumulatingChunk | null, @@ -55,6 +68,8 @@ export function applyHistory( * Fold one live AgentEvent into the provisional state. * * - `turn-start` records the turnId. + * - `user-message` appends the turn's user prompt (de-duped vs the sender's + * optimistic echo) so a watcher renders it mid-turn. * - `text-delta` extends the current accumulating TextChunk (or starts one). * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one). * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and @@ -63,6 +78,11 @@ export function applyHistory( * - `done` finalizes any accumulating chunk (turn still provisional). * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId. * - `status` and `tool-output` are ignored (best-effort no-ops). + * + * `generating` is folded structurally: a `turn-start` or any content delta sets + * it true; `done` / `turn-sealed` / `error` clear it. This is what a watching + * (or reconnected) client renders as "generating…", with no dependence on the + * free-form `status` event string. */ export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState { switch (event.type) { @@ -71,31 +91,66 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript return state; case "turn-start": - return { ...state, currentTurnId: event.turnId }; + return { ...state, currentTurnId: event.turnId, generating: true }; + + case "user-message": { + // The turn's USER prompt, surfaced on the event stream (backend CR-3) so a + // WATCHER/late-joiner renders it mid-turn instead of waiting for seal. The + // SENDER already echoed its own prompt optimistically (`appendUserMessage`), + // so DE-DUP: skip if the trailing provisional chunk is already an identical + // user text chunk. A pure watcher has no such echo → it appends and renders. + if (event.text.length === 0) return state; + const last = state.provisional[state.provisional.length - 1]; + if ( + last !== undefined && + last.role === "user" && + last.chunk.type === "text" && + last.chunk.text === event.text + ) { + return { ...state, generating: true }; + } + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }], + accumulating: null, + generating: true, + }; + } case "text-delta": { const acc = state.accumulating; if (acc !== null && acc.kind === "text") { - return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } }; + return { + ...state, + accumulating: { kind: "text", text: acc.text + event.delta }, + generating: true, + }; } const provisional = flushAccumulating(state.provisional, acc); return { ...state, provisional, accumulating: { kind: "text", text: event.delta }, + generating: true, }; } case "reasoning-delta": { const acc = state.accumulating; if (acc !== null && acc.kind === "thinking") { - return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } }; + return { + ...state, + accumulating: { kind: "thinking", text: acc.text + event.delta }, + generating: true, + }; } const provisional = flushAccumulating(state.provisional, acc); return { ...state, provisional, accumulating: { kind: "thinking", text: event.delta }, + generating: true, }; } @@ -112,6 +167,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional: [...provisional, { role: "assistant", chunk }], accumulating: null, + generating: true, }; } @@ -129,6 +185,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional: [...provisional, { role: "tool", chunk }], accumulating: null, + generating: true, }; } @@ -142,6 +199,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional: [...provisional, { role: "assistant", chunk }], accumulating: null, + generating: false, }; } @@ -158,6 +216,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional, accumulating: null, + generating: false, }; } @@ -168,6 +227,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript provisional, accumulating: null, sealedTurnId: event.turnId, + generating: false, }; } } diff --git a/src/core/chunks/selectors.ts b/src/core/chunks/selectors.ts index 839ba65..6929de2 100644 --- a/src/core/chunks/selectors.ts +++ b/src/core/chunks/selectors.ts @@ -24,6 +24,15 @@ export function selectChunks(state: TranscriptState): readonly RenderedChunk[] { } /** + * Whether a turn is currently generating (for a "generating…" indicator). True + * for ANY client watching the conversation — the sender, a second device, or a + * reconnected client whose in-flight turn was replayed. + */ +export function selectGenerating(state: TranscriptState): boolean { + return state.generating; +} + +/** * Group consecutive same-role rendered chunks into ChatMessages. */ export function selectMessages(state: TranscriptState): readonly ChatMessage[] { diff --git a/src/core/chunks/types.ts b/src/core/chunks/types.ts index e031ce3..faa0d3f 100644 --- a/src/core/chunks/types.ts +++ b/src/core/chunks/types.ts @@ -20,6 +20,14 @@ export interface TranscriptState { readonly currentTurnId: string | null; readonly latestUsage: Usage | null; readonly sealedTurnId: string | null; + /** + * True while a turn is generating on the server — derived STRUCTURALLY from the + * event stream: a `turn-start` (or any turn delta) with no matching `done` / + * `turn-sealed` / `error` yet. A late-joiner that subscribes mid-turn gets the + * in-flight turn replayed from its `turn-start`, so this lights up for any + * watching client. NOT inferred from the free-form `status` event string. + */ + readonly generating: boolean; } /** A chunk ready for rendering: either committed (with seq) or provisional. */ diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts index 690ba4e..a258873 100644 --- a/src/core/wire/conformance.test.ts +++ b/src/core/wire/conformance.test.ts @@ -27,6 +27,7 @@ describe("classifies every AgentEvent type", () => { const samples: AgentEvent[] = [ { type: "status", conversationId: "c1", status: "idle" }, { type: "turn-start", conversationId: "c1", turnId: "t1" }, + { type: "user-message", conversationId: "c1", turnId: "t1", text: "hi" }, { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" }, { type: "reasoning-delta", conversationId: "c1", turnId: "t1", delta: "thinking" }, { @@ -81,6 +82,7 @@ describe("classifies every AgentEvent type", () => { expect(labels).toEqual([ "status", "turn-start", + "user-message", "text-delta", "reasoning-delta", "tool-call", @@ -94,8 +96,8 @@ describe("classifies every AgentEvent type", () => { ]); }); - it("covers all 12 AgentEvent variants", () => { - expect(samples).toHaveLength(12); + it("covers all 13 AgentEvent variants", () => { + expect(samples).toHaveLength(13); }); }); @@ -148,9 +150,18 @@ describe("classifies every WsClientMessage type", () => { { type: "unsubscribe" as const, surfaceId: "s" }, { type: "invoke" as const, surfaceId: "s", actionId: "a" }, { type: "chat.send" as const, message: "hi" }, + { type: "chat.subscribe" as const, conversationId: "c1" }, + { type: "chat.unsubscribe" as const, conversationId: "c1" }, ]; const labels = msgs.map(assertWsClientMessageExhaustive); - expect(labels).toEqual(["subscribe", "unsubscribe", "invoke", "chat.send"]); + expect(labels).toEqual([ + "subscribe", + "unsubscribe", + "invoke", + "chat.send", + "chat.subscribe", + "chat.unsubscribe", + ]); }); }); diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts index d89772e..13be78c 100644 --- a/src/core/wire/conformance.ts +++ b/src/core/wire/conformance.ts @@ -12,6 +12,8 @@ export function assertAgentEventExhaustive(event: AgentEvent): string { return "status"; case "turn-start": return "turn-start"; + case "user-message": + return "user-message"; case "text-delta": return "text-delta"; case "reasoning-delta": @@ -96,6 +98,10 @@ export function assertWsClientMessageExhaustive(msg: WsClientMessage): string { return "invoke"; case "chat.send": return "chat.send"; + case "chat.subscribe": + return "chat.subscribe"; + case "chat.unsubscribe": + return "chat.unsubscribe"; default: return msg satisfies never; } diff --git a/src/features/chat/store.svelte.ts b/src/features/chat/store.svelte.ts index 6344aec..37049bf 100644 --- a/src/features/chat/store.svelte.ts +++ b/src/features/chat/store.svelte.ts @@ -8,9 +8,11 @@ import type { RenderedChunk, TranscriptState } from "../../core/chunks"; import { appendUserMessage, applyHistory, + clearGenerating, foldEvent, initialState, selectChunks, + selectGenerating, selectMessages, } from "../../core/chunks"; import type { MetricsState, TurnMetricsEntry } from "../../core/metrics"; @@ -43,6 +45,13 @@ export interface ChatStore { * known yet. Never `0` for the unknown case. */ readonly currentContextSize: number | undefined; + /** + * Whether a turn is currently generating server-side — derived from the event + * stream (`turn-start`…no-`done`/`turn-sealed`-yet). True for ANY watching + * client: the sender, a second device, or a reconnected client whose in-flight + * turn was replayed. Drives the composer's "generating…" indicator. + */ + readonly generating: boolean; readonly pendingSync: boolean; readonly error: string | null; readonly model: string | undefined; @@ -50,6 +59,14 @@ export interface ChatStore { send(text: string): void; setModel(model: string): void; load(): Promise<void>; + /** + * Re-sync after a WS (re)connect. Clears any stale `generating` (a turn may + * have sealed while disconnected — the live `turn-sealed` was missed), then + * pulls newly-sealed turns from history (+ metrics). If the turn is still + * running, the server's post-subscribe replay re-asserts `generating`. The + * app store pairs this with a `chat.subscribe` for the conversation. + */ + resync(): void; dispose(): void; } @@ -101,6 +118,9 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { get currentContextSize(): number | undefined { return selectCurrentContextSize(metrics); }, + get generating(): boolean { + return selectGenerating(transcript); + }, get pendingSync(): boolean { return _pendingSync; }, @@ -154,6 +174,16 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { await syncMetrics(); }, + resync(): void { + if (disposed) return; + // A turn may have sealed while we were disconnected (missed `turn-sealed`): + // clear the now-stale spinner BEFORE re-subscribing, so a finished turn + // doesn't spin forever. A still-running turn's replay re-asserts it. + transcript = clearGenerating(transcript); + void syncTail(); + void syncMetrics(); + }, + dispose(): void { disposed = true; }, diff --git a/src/features/chat/store.test.ts b/src/features/chat/store.test.ts index 1c99e7c..6507d69 100644 --- a/src/features/chat/store.test.ts +++ b/src/features/chat/store.test.ts @@ -802,4 +802,114 @@ describe("createChatStore", () => { store.dispose(); }); + + it("generating reflects the turn lifecycle (idle → running → idle)", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + expect(store.generating).toBe(false); + + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + expect(store.generating).toBe(true); + + store.handleDelta( + deltaEvent({ type: "text-delta", conversationId: CONV_ID, turnId: "t1", delta: "hi" }), + ); + expect(store.generating).toBe(true); + + store.handleDelta( + deltaEvent({ type: "done", conversationId: CONV_ID, turnId: "t1", reason: "end-turn" }), + ); + expect(store.generating).toBe(false); + + store.dispose(); + }); + + it("generating lights up for a watcher whose turn was replayed (no send first)", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + // A late-joiner receives the in-flight turn replayed from turn-start. + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + store.handleDelta( + deltaEvent({ type: "text-delta", conversationId: CONV_ID, turnId: "t1", delta: "partial" }), + ); + expect(store.generating).toBe(true); + expect(transport.sent).toHaveLength(0); // it never sent — it's just watching + + store.dispose(); + }); + + it("resync clears a stale generating flag and re-syncs history + metrics", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + // Disconnected mid-turn: turn-start seen, but the live done/turn-sealed was + // missed, so generating is stuck true. + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + expect(store.generating).toBe(true); + + // The turn actually sealed while we were gone — history now has the chunks. + historySync.returnChunks = [makeStoredChunk(1), makeStoredChunk(2)]; + + store.resync(); + + // Generating is cleared synchronously (a finished turn must not spin forever). + expect(store.generating).toBe(false); + + await vi.waitFor(() => { + expect(historySync.calls).toHaveLength(1); + expect(metricsSync.calls).toHaveLength(1); + }); + + store.dispose(); + }); + + it("resync is a no-op after dispose", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + store.dispose(); + store.resync(); + + await new Promise((r) => setTimeout(r, 10)); + expect(historySync.calls).toHaveLength(0); + expect(metricsSync.calls).toHaveLength(0); + }); }); |
