summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-12 15:08:24 +0900
committerAdam Malczewski <[email protected]>2026-06-12 15:08:24 +0900
commit5ef7cc2916c544a66d68805063b02290f24d9a25 (patch)
tree51724187d01813bbbbaef513eb8cada2e1bda1a6
parentfb37680bd013509ab5d72619f261713e8473e988 (diff)
downloaddispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.tar.gz
dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.zip
feat(chat): multi-client live view — watch in-flight turns + user prompt on stream
- subscribe every open conversation on load + WS reconnect (resync), unsubscribe on tab close - derive a stream-based 'generating' state for watchers (Composer running indicator) - fold the user-message turn event so watchers render the prompt mid-turn (de-dup vs sender's optimistic echo) - re-pin [email protected] / [email protected]; re-mirror contracts; add user-message to the exhaustiveness guard
-rw-r--r--.dispatch/transport-contract.reference.md59
-rw-r--r--.dispatch/wire.reference.md30
-rw-r--r--backend-handoff.md75
-rw-r--r--src/app/App.svelte6
-rw-r--r--src/app/store.svelte.ts31
-rw-r--r--src/app/store.test.ts153
-rw-r--r--src/core/chunks/index.ts10
-rw-r--r--src/core/chunks/reducer.test.ts160
-rw-r--r--src/core/chunks/reducer.ts66
-rw-r--r--src/core/chunks/selectors.ts9
-rw-r--r--src/core/chunks/types.ts8
-rw-r--r--src/core/wire/conformance.test.ts17
-rw-r--r--src/core/wire/conformance.ts6
-rw-r--r--src/features/chat/store.svelte.ts30
-rw-r--r--src/features/chat/store.test.ts110
15 files changed, 746 insertions, 24 deletions
diff --git a/.dispatch/transport-contract.reference.md b/.dispatch/transport-contract.reference.md
index 40ced1e..86eac50 100644
--- a/.dispatch/transport-contract.reference.md
+++ b/.dispatch/transport-contract.reference.md
@@ -5,9 +5,31 @@
> hangs on a permission prompt). Your CODE still imports `@dispatch/transport-contract` normally —
> this file is for READING only.
>
-> **Orchestrator:** SNAPSHOT of `[email protected]` (the metrics endpoint shipped +
-> version-bumped + LIVE-VERIFIED). Depends on `@dispatch/[email protected]` (see `wire.reference.md`) +
-> `@dispatch/[email protected]` (see `ui-contract.reference.md`).
+> **Orchestrator:** SNAPSHOT of `[email protected]` (CR-3 user-message shipped). Depends on
+> `@dispatch/[email protected]` (see `wire.reference.md`) + `@dispatch/[email protected]` (see
+> `ui-contract.reference.md`).
+>
+> **2026-06-12 delta (CR-3 user-message handoff — package bumped `0.7.0` → `0.8.0`):** NO transport
+> shape change — it re-exports `AgentEvent` (which `chat.delta` / `/chat` NDJSON carry), and that union
+> gained the additive `TurnInputEvent` (`{ type: "user-message"; conversationId; turnId; text }`), the
+> turn's user prompt, emitted as the FIRST event of every turn (before `turn-start`) and replayed to
+> watchers/late-joiners. See the `wire.reference.md` CR-3 delta + `TurnInputEvent` for the definition.
+>
+> **2026-06-12 delta (turn-continuity handoff — package bumped `0.6.0` → `0.7.0`, ADDITIVE):** a turn
+> is no longer bound to the WS connection — it runs to completion server-side regardless of any
+> client, and any number of connections can watch the same conversation (incl. a late-joiner that
+> connects mid-turn). Two new client→server WS messages: `ChatSubscribeMessage`
+> (`{ type: "chat.subscribe"; conversationId }`) and `ChatUnsubscribeMessage`
+> (`{ type: "chat.unsubscribe"; conversationId }`); `WsClientMessage` now unions both. Server→client
+> is UNCHANGED (turn events still arrive as `chat.delta`, replayed AND live). Semantics: `chat.subscribe`
+> registers the connection + immediately REPLAYS the in-flight turn's events so far (from its
+> `turn-start`) then streams live (nothing replayed if idle); `chat.send` AUTO-subscribes the sending
+> connection (a 2nd send while generating ⇒ `chat.error` + you stay subscribed to watch the running
+> turn); `chat.unsubscribe`/socket-close drops the subscription but NEVER stops the turn; subscriptions
+> persist across turns. FE consumes via the `chat` feature + app store (re-subscribe every open
+> conversation on (re)connect + page load; derive a "running" state structurally from
+> `turn-start`…no-`done`/`turn-sealed`-yet). OUT of scope: per-step crash-resume, concurrent-send
+> arbitration, explicit stop.
>
> **2026-06-12 delta (context-size handoff — package bumped `0.5.0` → `0.6.0`, depends on
> `[email protected]`):** no NEW transport shape — the optional `contextSize?: number` rides the
@@ -293,8 +315,37 @@ export interface ChatErrorMessage {
readonly message: string;
}
+/**
+ * Client → server: start WATCHING a conversation's live turn events WITHOUT sending.
+ * On subscribe the server REPLAYS the current in-flight turn so far (from its
+ * `turn-start`) as `chat.delta`, then streams live; nothing replayed if idle (rely
+ * on `GET /conversations/:id` history). Infer "generating" from a replayed
+ * `turn-start` with no matching `done`/`turn-sealed` yet. `chat.send` already
+ * auto-subscribes the sender, so this is for conversations you VIEW but didn't send to
+ * (a 2nd device, or a reloaded/reconnected client). Idempotent per (connection, id).
+ */
+export interface ChatSubscribeMessage {
+ readonly type: "chat.subscribe";
+ readonly conversationId: string;
+}
+
+/**
+ * Client → server: stop watching a conversation's turn events on this connection.
+ * Does NOT stop/affect the turn (it runs to completion regardless of subscribers).
+ * Socket close drops all of a connection's subscriptions the same way — again
+ * WITHOUT aborting any in-flight turn.
+ */
+export interface ChatUnsubscribeMessage {
+ readonly type: "chat.unsubscribe";
+ readonly conversationId: string;
+}
+
/** Every client → server WS message: surface ops + chat ops. Discriminate on `type`. */
-export type WsClientMessage = SurfaceClientMessage | ChatSendMessage;
+export type WsClientMessage =
+ | SurfaceClientMessage
+ | ChatSendMessage
+ | ChatSubscribeMessage
+ | ChatUnsubscribeMessage;
/** Every server → client WS message: surface ops + chat ops. Discriminate on `type`. */
export type WsServerMessage = SurfaceServerMessage | ChatDeltaMessage | ChatErrorMessage;
diff --git a/.dispatch/wire.reference.md b/.dispatch/wire.reference.md
index cf1410a..40f94cf 100644
--- a/.dispatch/wire.reference.md
+++ b/.dispatch/wire.reference.md
@@ -4,8 +4,16 @@
> types WITHOUT following the `file:` dep symlink out of this repo (which hangs on a permission
> prompt). Your CODE still imports `@dispatch/wire` normally — this file is for READING only.
>
-> **Orchestrator:** SNAPSHOT of `[email protected]` (the metrics types below shipped + version-bumped).
-> Regenerate whenever `@dispatch/wire` changes.
+> **Orchestrator:** SNAPSHOT of `[email protected]` (the metrics types below + the `user-message` turn event
+> shipped + version-bumped). Regenerate whenever `@dispatch/wire` changes.
+>
+> **2026-06-12 delta (CR-3 user-message handoff — package bumped `0.5.0` → `0.6.0`, ADDITIVE):** adds a
+> new `AgentEvent` union member `TurnInputEvent` (`{ type: "user-message"; conversationId; turnId; text }`)
+> that surfaces the turn's USER prompt INTO the outward event stream. Emitted ONCE as the FIRST event of
+> every turn (before `turn-start`), so it is buffered + replayed to every subscriber — live AND late-join
+> — and rides `chat.delta`/NDJSON like any other event. Fixes CR-3 (a pure watcher couldn't see the prompt
+> until seal). The sender still echoes its own prompt optimistically, so consumers DE-DUP against that
+> (by text); a pure watcher renders it directly. Persistence/metrics unchanged. See `TurnInputEvent` below.
>
> **2026-06-12 delta (context-size handoff — package bumped `0.4.0` → `0.5.0`):** adds an OPTIONAL
> `contextSize?: number` to BOTH `TurnDoneEvent` (live `done`) and `TurnMetrics` (persisted) — the
@@ -249,6 +257,7 @@ export interface TurnMetrics {
export type AgentEvent =
| StatusEvent
| TurnStartEvent
+ | TurnInputEvent
| TurnTextDeltaEvent
| TurnReasoningDeltaEvent
| TurnToolCallEvent
@@ -274,6 +283,23 @@ export interface TurnStartEvent {
readonly turnId: string;
}
+/**
+ * The user prompt that opened this turn, surfaced INTO the turn's outward event
+ * stream so a WATCHER (subscribed but not the sender) can render the prompt
+ * mid-turn — the user message is otherwise persisted only at seal. Emitted ONCE
+ * as the FIRST event of the turn (before `turn-start`); buffered + replayed to
+ * every subscriber (live + late-join). The sender echoes its own prompt
+ * optimistically, so DE-DUP against that (by text); a pure watcher renders it
+ * directly. Carries the raw `text` passed to the provider. (Turn-scoped: it
+ * carries `turnId`, so a multi-turn transcript attributes each prompt to its turn.)
+ */
+export interface TurnInputEvent {
+ readonly type: "user-message";
+ readonly conversationId: string;
+ readonly turnId: string;
+ readonly text: string;
+}
+
/** Incremental text content from the model during a turn. */
export interface TurnTextDeltaEvent {
readonly type: "text-delta";
diff --git a/backend-handoff.md b/backend-handoff.md
index 847282b..b784eef 100644
--- a/backend-handoff.md
+++ b/backend-handoff.md
@@ -5,15 +5,32 @@
> **From:** dispatch-web orchestrator · **To:** arch-rewrite orchestrator · **Courier:** the user.
> `lsp` does NOT span the repos (ORCHESTRATOR §5) — every cross-repo ask flows through here.
-_Last updated: 2026-06-12. **FE is current on `[email protected]` / `[email protected]`.** All handoffs
+_Last updated: 2026-06-12. **FE is current on `[email protected]` / `[email protected]`.** All handoffs
to date are consumed: surfaces + WS, conversation transcript/metrics, tabs + model selector,
cache-warming (incl. authoritative timer + retention + cache-rate fix), **per-conversation cwd + LSP
-status**, and **context size** (the `contextSize` field — `done` live + `TurnMetrics` persisted —
-rendered as a current-usage readout above the composer).
+status**, **context size** (the `contextSize` field — `done` live + `TurnMetrics` persisted —
+rendered as a current-usage readout above the composer), and **turn continuity + multi-client live
+view** (`chat.subscribe`/`chat.unsubscribe`; re-attach to a running turn on reconnect/reload/second
+device; stream-derived "generating…" state).
**Open asks:** CR-1 (Loaded Extensions as a real table) + CR-2 (optional catalog `scope` flag) below.
+**CR-3 (watcher couldn't see the USER prompt until seal) → RESOLVED ✅** — backend shipped the
+`user-message` turn event (`[email protected]` / `[email protected]`); FE re-pinned + consumption live.
The cwd/LSP draft-path verification (`backend-handoff-cwd-lsp.md`) came back **all ✅ confirmed** by the
backend (answers in their `frontend-lsp-cwd-handoff.md`) — see §2._
+**Turn-continuity handoff (`frontend-turn-continuity-handoff.md`) → CONSUMED ✅.** Re-pinned
+`[email protected]→0.7.0` (additive; `wire` unchanged at `0.5.0`); re-mirrored
+`.dispatch/transport-contract.reference.md` with `ChatSubscribeMessage`/`ChatUnsubscribeMessage` + the
+widened `WsClientMessage` union. FE now: re-subscribes `chat.subscribe` for EVERY open conversation on
+page load + on WS (re)connect (and on close sends `chat.unsubscribe`); `chat.send` still auto-subscribes
+the sender, so the draft/promotion path adds none. A pure `generating` flag is folded structurally in
+`core/chunks` (`turn-start`/deltas ⇒ true; `done`/`turn-sealed`/`error` ⇒ false; NOT inferred from the
+free-form `status` string) and surfaced as `ChatStore.generating` → the Composer status icon now shows a
+"running" spinner for any watching client. `ChatStore.resync()` (called from `onReopen`) clears a stale
+spinner then pulls a turn that sealed while disconnected from history. 558 tests green. NO new backend
+ask. NOT yet live-probed — needs the two-WS / second-device manual check from the handoff's "Quick
+manual check" against a running backend.
+
**Context-size handoff (`frontend-context-size-handoff.md`) → CONSUMED ✅.** Re-pinned `[email protected]→0.5.0`
+ `[email protected]→0.6.0`; re-mirrored both `.dispatch/*.reference.md`; added "context size" +
"context window" to FE `GLOSSARY.md`. `core/metrics` now threads `contextSize` through the `done` fold +
@@ -26,7 +43,7 @@ backend ask — but the max-limit denominator is now a live FE need; see §3.
## 1. Pinned backend contracts (consumed by the FE)
-Pinned as `file:` deps: **`[email protected]`; `[email protected]`; `[email protected]`**.
+Pinned as `file:` deps: **`[email protected]`; `[email protected]`; `[email protected]`**.
| Package | Used for |
|---|---|
@@ -37,7 +54,8 @@ Pinned as `file:` deps: **`[email protected]`; `[email protected]`; `transport-contract
Endpoints in use (HTTP **24203**, WS **24205**, CORS `*` incl. `PUT`):
`POST /chat` (NDJSON) · `GET /models` · `GET /conversations/:id?sinceSeq=<n>` ·
`GET /conversations/:id/metrics` · `GET`/`PUT /conversations/:id/cwd` ·
-`GET /conversations/:id/lsp` · `POST /chat/warm` · WS `chat.send`→`chat.delta`.
+`GET /conversations/:id/lsp` · `POST /chat/warm` · WS `chat.send`→`chat.delta` ·
+WS `chat.subscribe`/`chat.unsubscribe` (watch a conversation's turns without sending; replay + live).
Mirrored in-repo for headless agents: `.dispatch/{ui-contract,wire,transport-contract}.reference.md`
(regenerate on any contract bump; all current as of `[email protected]` / `[email protected]`).
@@ -108,6 +126,53 @@ empty would override the persisted draft cwd. Verified safe today: `chat/store.s
`chat.send` with only `type`/`conversationId`/`message`/`model` — no `cwd` field. (The backend offered to
harden `/chat` to treat blank as "not provided" if we ever want it — not needed while we omit the field.)
+### CR-3 (BUG, multi-client) → **RESOLVED ✅** (Option B shipped; courier `frontend-cr3-user-message-handoff.md`)
+
+The backend implemented Option B + live-verified it: a new `AgentEvent` member `TurnInputEvent`
+(`{ type: "user-message"; conversationId; turnId; text }`) is emitted as the FIRST event of every turn
+(before `turn-start`), buffered + replayed to live AND late-join subscribers. `[email protected]→0.6.0`,
+`[email protected]→0.8.0` (re-exports the union; no transport-shape change). **FE consumed:**
+re-pinned both, re-mirrored `.dispatch/{wire,transport-contract}.reference.md`, promoted the staged
+`core/chunks` fold to a typed `case "user-message"` (appends the prompt with a text de-dup vs the sender's
+optimistic echo), and added `user-message` to the FE exhaustiveness guard. A pure watcher now shows the user
+bubble the moment the turn starts. The original report follows for history.
+
+**Symptom (reproduced live):** open a conversation in two windows; window A sends a message. Window B
+(`chat.subscribe`, a pure watcher) renders the streaming **reply** but NOT the user **prompt** that
+triggered it — the user bubble only pops in after `turn-sealed`.
+
+**Root cause (backend):** the user prompt is never part of the turn's live/replayable stream, and isn't
+persisted until the turn ends — so a watcher has no source for it mid-turn.
+- The replay buffer holds only `AgentEvent`s (`session-orchestrator/src/orchestrator.ts` `ActiveTurn.buffer`,
+ pushed in `emitToHub`). `buildUserMessage(text)` (`pure.ts`) is passed straight to the provider and is
+ **never `emitToHub`'d** → not buffered, not replayed.
+- The prompt is persisted only at turn end, atomically with the reply: `orchestrator.ts:244-245`
+ (`toPersist = [userMsg, ...result.messages]; conversationStore.append(...)`), just before `turn-sealed`.
+ So a mid-turn `GET /conversations/:id` returns nothing for it either.
+
+The sender looks fine only because the FE optimistically echoes its own prompt; a pure watcher never sent,
+so it has nothing to show. **No FE-only fix is possible** — the prompt text simply isn't sent until seal.
+
+**Requested fix — Option B (preferred): emit the prompt into the turn's event stream.**
+- **`@dispatch/wire` (additive):** add a `TurnInputEvent` to the `AgentEvent` union, e.g.
+ `{ type: "user-message"; conversationId: string; turnId: string; text: string }`. Bump `wire`.
+- **`session-orchestrator`:** `emitToHub(conversationId, { type: "user-message", conversationId, turnId, text })`
+ at the very start of `runTurnDetached` (before `runTurn`), so it is the first buffered event → replayed to
+ every subscriber, live and late-join. (No `runTurn`/kernel change needed — the orchestrator already holds
+ `text` + `turnId` + the hub.)
+- Emitting it (and only it) does not change persistence semantics; the existing seal-time append is unchanged.
+
+**FE side — already staged (inert until you ship it):** `core/chunks` folds a `user-message` event into a
+provisional user chunk for watchers, with a content dedup so the sender's optimistic echo isn't duplicated
+(`reducer.ts` forward-compat branch + tests). The moment the backend emits `user-message`, both windows show
+the prompt immediately; nothing breaks before then. On the new `wire`, we'll re-pin + re-mirror + add it to
+the FE exhaustiveness guard.
+
+**Alternative — Option A (no wire change):** persist the user message at turn START (append `[userMsg]`
+before `runTurn`; append only `result.messages` at seal) — then watchers fetch it via history. We do NOT
+prefer this: it needs an extra history round-trip per watched turn and changes persistence semantics (an
+errored turn would leave a persisted prompt with no reply).
+
## 3. Likely NEXT backend asks (heads-up, not yet requested)
- **Model max context-window LIMIT** (the denominator for context size) — the context-size handoff
diff --git a/src/app/App.svelte b/src/app/App.svelte
index dbb346a..50f24e7 100644
--- a/src/app/App.svelte
+++ b/src/app/App.svelte
@@ -219,7 +219,11 @@
<Composer
onSend={handleSend}
contextSize={store.activeChat.currentContextSize}
- status={store.activeChat.error ? "error" : "idle"}
+ status={store.activeChat.error
+ ? "error"
+ : store.activeChat.generating
+ ? "running"
+ : "idle"}
/>
</div>
diff --git a/src/app/store.svelte.ts b/src/app/store.svelte.ts
index 6991530..df92b31 100644
--- a/src/app/store.svelte.ts
+++ b/src/app/store.svelte.ts
@@ -239,6 +239,23 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
}
}
+ /**
+ * Start watching a conversation's live turn events (`chat.subscribe`). Sent for
+ * EVERY open conversation — not just the active one — so a backgrounded tab keeps
+ * streaming a running turn, and a reloaded/second client re-attaches to an
+ * in-flight turn (the server replays it from `turn-start`). Idempotent server-side;
+ * the socket queues it until the connection is open. NOT needed right after
+ * `chat.send` (that auto-subscribes the sending connection).
+ */
+ function subscribeChat(conversationId: string): void {
+ socket?.send({ type: "chat.subscribe", conversationId });
+ }
+
+ /** Stop watching a conversation's turn events (`chat.unsubscribe`). Never stops the turn. */
+ function unsubscribeChat(conversationId: string): void {
+ socket?.send({ type: "chat.unsubscribe", conversationId });
+ }
+
/** The conversation the surfaces should scope to (undefined for a draft). */
function focusedConversationId(): string | undefined {
return tabsStore.activeConversationId ?? undefined;
@@ -307,6 +324,14 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
: { type: "subscribe", surfaceId, conversationId: sub.conversationId };
socket?.send(msg);
}
+ // Re-attach to every open conversation's turn stream. A turn that kept
+ // running while we were disconnected resumes streaming (server replays it
+ // from `turn-start`); one that sealed while we were gone is committed from
+ // history by `resync()` (which also clears a now-stale "generating").
+ for (const tab of tabsStore.tabs) {
+ subscribeChat(tab.conversationId);
+ chatStores.get(tab.conversationId)?.resync();
+ }
},
};
if (opts?.socketFactory !== undefined) {
@@ -341,6 +366,10 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
const store = createChatFor(tab.conversationId, tab.model);
chatStores.set(tab.conversationId, store);
void store.load();
+ // Watch each restored conversation's live turns: after a reload mid-turn the
+ // server replays the in-flight turn so we keep rendering it. Queued until the
+ // socket opens.
+ subscribeChat(tab.conversationId);
}
if (persistedState.activeConversationId !== null) {
const activeTab = persistedState.tabs.find(
@@ -460,6 +489,8 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
closeTab(conversationId: string): void {
tabsStore.closeTab(conversationId);
+ // Stop watching the closed conversation's turns (does NOT stop the turn).
+ unsubscribeChat(conversationId);
const store = chatStores.get(conversationId);
if (store !== undefined) {
store.dispose();
diff --git a/src/app/store.test.ts b/src/app/store.test.ts
index 19530e2..803d7dc 100644
--- a/src/app/store.test.ts
+++ b/src/app/store.test.ts
@@ -1,6 +1,6 @@
import type { ConversationHistoryResponse, WsServerMessage } from "@dispatch/transport-contract";
import type { SurfaceServerMessage } from "@dispatch/ui-contract";
-import { describe, expect, it } from "vitest";
+import { describe, expect, it, vi } from "vitest";
import type { WebSocketLike } from "../adapters/ws";
import { createAppStore } from "./store.svelte";
@@ -51,6 +51,55 @@ function fakeSocket(): FakeSocket {
return ws;
}
+/**
+ * A fake socket that supports the close→reconnect cycle (the base `fakeSocket`
+ * swallows `onclose`). The factory returns the SAME instance on every connect, so
+ * `sent` accumulates and `open()` can be driven again after `closeRemote()`.
+ */
+interface ReconnectableSocket extends WebSocketLike {
+ sent: string[];
+ open(): void;
+ closeRemote(): void;
+}
+
+function reconnectableSocket(): ReconnectableSocket {
+ let onopen: (() => void) | null = null;
+ let onmessage: ((ev: { data: string }) => void) | null = null;
+ let onclose: ((ev: { code: number; reason: string }) => void) | null = null;
+ const sent: string[] = [];
+ return {
+ send(data: string) {
+ sent.push(data);
+ },
+ close() {},
+ get onopen() {
+ return onopen;
+ },
+ set onopen(fn) {
+ onopen = fn;
+ },
+ get onmessage() {
+ return onmessage;
+ },
+ set onmessage(fn) {
+ onmessage = fn;
+ },
+ get onclose() {
+ return onclose;
+ },
+ set onclose(fn) {
+ onclose = fn;
+ },
+ sent,
+ open() {
+ onopen?.();
+ },
+ closeRemote() {
+ onclose?.({ code: 1006, reason: "" });
+ },
+ };
+}
+
interface FakeFetchOptions {
models?: readonly string[];
history?: Record<string, ConversationHistoryResponse>;
@@ -70,7 +119,7 @@ function fakeFetchImpl(opts?: FakeFetchOptions): typeof fetch {
};
}
-function parseSent(ws: FakeSocket): unknown[] {
+function parseSent(ws: { sent: string[] }): unknown[] {
return ws.sent.map((s) => JSON.parse(s));
}
@@ -752,4 +801,104 @@ describe("createAppStore", () => {
store.dispose();
});
+
+ it("subscribes to chat for each restored tab on page load", () => {
+ const storage = createFakeStorage();
+ // First session: create a tab, then dispose.
+ const ws1 = fakeSocket();
+ const store1 = createAppStore({
+ socketFactory: () => ws1,
+ fetchImpl: fakeFetchImpl(),
+ localStorage: storage,
+ });
+ ws1.resolveOpen();
+ store1.send("persist me");
+ const convId = store1.tabs[0]?.conversationId as string;
+ expect(convId).toBeDefined();
+ store1.dispose();
+
+ // Second session: the restored tab must be re-subscribed for live turns.
+ const ws2 = fakeSocket();
+ const store2 = createAppStore({
+ socketFactory: () => ws2,
+ fetchImpl: fakeFetchImpl(),
+ localStorage: storage,
+ });
+ ws2.resolveOpen(); // flush the queued chat.subscribe
+
+ const subscribed = parseSent(ws2)
+ .filter((p) => (p as { type: string }).type === "chat.subscribe")
+ .map((p) => (p as { conversationId: string }).conversationId);
+ expect(subscribed).toContain(convId);
+
+ store2.dispose();
+ });
+
+ it("unsubscribes from chat when a tab is closed", () => {
+ const ws = fakeSocket();
+ const store = createAppStore({
+ socketFactory: () => ws,
+ fetchImpl: fakeFetchImpl(),
+ localStorage: createFakeStorage(),
+ });
+ ws.resolveOpen();
+
+ store.send("first");
+ const convId = activeConversationId(store);
+
+ ws.sent.length = 0;
+ store.closeTab(convId);
+
+ const unsubscribed = parseSent(ws)
+ .filter((p) => (p as { type: string }).type === "chat.unsubscribe")
+ .map((p) => (p as { conversationId: string }).conversationId);
+ expect(unsubscribed).toContain(convId);
+
+ store.dispose();
+ });
+
+ it("re-subscribes chat (and resyncs) for every open conversation on reconnect", async () => {
+ const fetchedUrls: string[] = [];
+ const fetchImpl: typeof fetch = async (input: string | URL | Request): Promise<Response> => {
+ const url = typeof input === "string" ? input : input instanceof URL ? input.href : input.url;
+ fetchedUrls.push(url);
+ if (url.endsWith("/models")) {
+ return new Response(JSON.stringify({ models: ["opencode/deepseek-v4-flash"] }), {
+ status: 200,
+ });
+ }
+ return new Response(JSON.stringify({ chunks: [], latestSeq: 0 }), { status: 200 });
+ };
+
+ const ws = reconnectableSocket();
+ const store = createAppStore({
+ socketFactory: () => ws,
+ fetchImpl,
+ httpUrl: "http://localhost:24203",
+ localStorage: createFakeStorage(),
+ });
+ ws.open();
+
+ store.send("hi");
+ const convId = activeConversationId(store);
+
+ // Drop the connection, wait past the reconnect backoff, then re-open.
+ ws.sent.length = 0;
+ fetchedUrls.length = 0;
+ ws.closeRemote();
+ await new Promise((r) => setTimeout(r, 800));
+ ws.open(); // reconnect → onReopen
+
+ const subscribed = parseSent(ws)
+ .filter((p) => (p as { type: string }).type === "chat.subscribe")
+ .map((p) => (p as { conversationId: string }).conversationId);
+ expect(subscribed).toContain(convId);
+
+ // resync() pulled the tail from history for the reconnected conversation.
+ await vi.waitFor(() => {
+ expect(fetchedUrls.some((u) => u.includes(`/conversations/${convId}?sinceSeq=`))).toBe(true);
+ });
+
+ store.dispose();
+ });
});
diff --git a/src/core/chunks/index.ts b/src/core/chunks/index.ts
index 0718c0d..ecfee74 100644
--- a/src/core/chunks/index.ts
+++ b/src/core/chunks/index.ts
@@ -1,7 +1,13 @@
export type { RenderGroup, ToolBatchEntry } from "./groups";
export { groupRenderedChunks } from "./groups";
-export { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer";
-export { selectChunks, selectMessages } from "./selectors";
+export {
+ appendUserMessage,
+ applyHistory,
+ clearGenerating,
+ foldEvent,
+ initialState,
+} from "./reducer";
+export { selectChunks, selectGenerating, selectMessages } from "./selectors";
export type {
AccumulatingChunk,
ProvisionalChunk,
diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts
index f2f1b75..35a586c 100644
--- a/src/core/chunks/reducer.test.ts
+++ b/src/core/chunks/reducer.test.ts
@@ -3,6 +3,7 @@ import type {
StoredChunk,
TurnDoneEvent,
TurnErrorEvent,
+ TurnInputEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
@@ -12,8 +13,14 @@ import type {
TurnUsageEvent,
} from "@dispatch/wire";
import { describe, expect, it } from "vitest";
-import { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer";
-import { selectChunks, selectMessages } from "./selectors";
+import {
+ appendUserMessage,
+ applyHistory,
+ clearGenerating,
+ foldEvent,
+ initialState,
+} from "./reducer";
+import { selectChunks, selectGenerating, selectMessages } from "./selectors";
const turnStart = (turnId: string): TurnStartEvent => ({
type: "turn-start",
@@ -112,6 +119,101 @@ describe("initialState", () => {
expect(s.currentTurnId).toBeNull();
expect(s.latestUsage).toBeNull();
expect(s.sealedTurnId).toBeNull();
+ expect(s.generating).toBe(false);
+ });
+});
+
+describe("foldEvent — generating (turn-running state)", () => {
+ it("turn-start sets generating true", () => {
+ let s = initialState();
+ expect(selectGenerating(s)).toBe(false);
+ s = foldEvent(s, turnStart("t1"));
+ expect(s.generating).toBe(true);
+ expect(selectGenerating(s)).toBe(true);
+ });
+
+ it("a content delta sets generating true (e.g. a late-joiner replay missing turn-start)", () => {
+ let s = initialState();
+ s = foldEvent(s, textDelta("t1", "hi"));
+ expect(s.generating).toBe(true);
+ s = initialState();
+ s = foldEvent(s, reasoningDelta("t1", "hmm"));
+ expect(s.generating).toBe(true);
+ s = initialState();
+ s = foldEvent(s, toolCall("t1", "tc1", "bash", {}));
+ expect(s.generating).toBe(true);
+ });
+
+ it("stays generating across the turn's deltas", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "wor"));
+ s = foldEvent(s, textDelta("t1", "king"));
+ expect(s.generating).toBe(true);
+ });
+
+ it("done clears generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "answer"));
+ s = foldEvent(s, doneEvent("t1"));
+ expect(s.generating).toBe(false);
+ });
+
+ it("turn-sealed clears generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, turnSealed("t1"));
+ expect(s.generating).toBe(false);
+ });
+
+ it("error clears generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, errorEvent("t1", "boom"));
+ expect(s.generating).toBe(false);
+ });
+
+ it("a new turn re-asserts generating after the previous one finished", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, doneEvent("t1"));
+ s = foldEvent(s, turnSealed("t1"));
+ expect(s.generating).toBe(false);
+ s = foldEvent(s, turnStart("t2"));
+ expect(s.generating).toBe(true);
+ });
+
+ it("status does not change generating (free-form string, not inferred)", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ const next = foldEvent(s, { type: "status", conversationId: "c1", status: "idle" });
+ expect(next.generating).toBe(true);
+ });
+});
+
+describe("clearGenerating", () => {
+ it("clears a set generating flag", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ expect(s.generating).toBe(true);
+ const cleared = clearGenerating(s);
+ expect(cleared.generating).toBe(false);
+ });
+
+ it("returns the same object when already not generating (no-op)", () => {
+ const s = initialState();
+ expect(clearGenerating(s)).toBe(s);
+ });
+
+ it("preserves transcript content while clearing generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "partial"));
+ const cleared = clearGenerating(s);
+ expect(cleared.generating).toBe(false);
+ expect(cleared.accumulating).toEqual({ kind: "text", text: "partial" });
+ expect(cleared.currentTurnId).toBe("t1");
});
});
@@ -281,6 +383,60 @@ describe("foldEvent — status and tool-output", () => {
});
});
+describe("foldEvent — user-message (the turn's user prompt; backend CR-3)", () => {
+ const userMessage = (text: string): TurnInputEvent => ({
+ type: "user-message",
+ conversationId: "c1",
+ turnId: "t1",
+ text,
+ });
+
+ it("a watcher renders the prompt: appends a provisional user chunk + marks generating", () => {
+ let s = initialState();
+ s = foldEvent(s, userMessage("what is 2+2?"));
+ const chunks = selectChunks(s);
+ expect(chunks).toHaveLength(1);
+ expect(chunks[0]?.role).toBe("user");
+ expect(chunks[0]?.chunk).toEqual({ type: "text", text: "what is 2+2?" });
+ expect(chunks[0]?.provisional).toBe(true);
+ expect(s.generating).toBe(true);
+ });
+
+ it("dedups the SENDER's optimistic echo (no duplicate user bubble)", () => {
+ let s = initialState();
+ s = appendUserMessage(s, "hi"); // optimistic echo from the sender's send()
+ s = foldEvent(s, userMessage("hi")); // server echo for the same turn
+ const users = selectChunks(s).filter((c) => c.role === "user");
+ expect(users).toHaveLength(1);
+ });
+
+ it("appends when the trailing provisional differs (no false dedup)", () => {
+ let s = initialState();
+ s = appendUserMessage(s, "first");
+ s = foldEvent(s, userMessage("second"));
+ const users = selectChunks(s).filter((c) => c.role === "user");
+ expect(users).toHaveLength(2);
+ });
+
+ it("ignores an empty user-message", () => {
+ let s = initialState();
+ s = foldEvent(s, userMessage(""));
+ expect(selectChunks(s)).toHaveLength(0);
+ expect(s.generating).toBe(false);
+ });
+
+ it("flushes an accumulating chunk before appending the prompt", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "partial"));
+ s = foldEvent(s, userMessage("new prompt"));
+ // the partial assistant text was flushed to provisional, then the user prompt appended
+ expect(s.accumulating).toBeNull();
+ const roles = selectChunks(s).map((c) => c.role);
+ expect(roles).toEqual(["assistant", "user"]);
+ });
+});
+
describe("applyHistory", () => {
it("orders committed chunks by seq", () => {
const s = initialState();
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts
index 54b1922..7ce55ce 100644
--- a/src/core/chunks/reducer.ts
+++ b/src/core/chunks/reducer.ts
@@ -10,9 +10,22 @@ export function initialState(): TranscriptState {
currentTurnId: null,
latestUsage: null,
sealedTurnId: null,
+ generating: false,
};
}
+/**
+ * Clear the `generating` flag without touching anything else. Used on a WS
+ * (re)connect: a turn may have sealed while we were disconnected, so the live
+ * `turn-sealed`/`done` that would have cleared `generating` was missed. The
+ * caller resets here, then re-subscribes — if the turn is still running the
+ * server's replay re-asserts `generating` via the replayed `turn-start`.
+ */
+export function clearGenerating(state: TranscriptState): TranscriptState {
+ if (!state.generating) return state;
+ return { ...state, generating: false };
+}
+
function flushAccumulating(
provisional: readonly ProvisionalChunk[],
acc: AccumulatingChunk | null,
@@ -55,6 +68,8 @@ export function applyHistory(
* Fold one live AgentEvent into the provisional state.
*
* - `turn-start` records the turnId.
+ * - `user-message` appends the turn's user prompt (de-duped vs the sender's
+ * optimistic echo) so a watcher renders it mid-turn.
* - `text-delta` extends the current accumulating TextChunk (or starts one).
* - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one).
* - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and
@@ -63,6 +78,11 @@ export function applyHistory(
* - `done` finalizes any accumulating chunk (turn still provisional).
* - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId.
* - `status` and `tool-output` are ignored (best-effort no-ops).
+ *
+ * `generating` is folded structurally: a `turn-start` or any content delta sets
+ * it true; `done` / `turn-sealed` / `error` clear it. This is what a watching
+ * (or reconnected) client renders as "generating…", with no dependence on the
+ * free-form `status` event string.
*/
export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState {
switch (event.type) {
@@ -71,31 +91,66 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
return state;
case "turn-start":
- return { ...state, currentTurnId: event.turnId };
+ return { ...state, currentTurnId: event.turnId, generating: true };
+
+ case "user-message": {
+ // The turn's USER prompt, surfaced on the event stream (backend CR-3) so a
+ // WATCHER/late-joiner renders it mid-turn instead of waiting for seal. The
+ // SENDER already echoed its own prompt optimistically (`appendUserMessage`),
+ // so DE-DUP: skip if the trailing provisional chunk is already an identical
+ // user text chunk. A pure watcher has no such echo → it appends and renders.
+ if (event.text.length === 0) return state;
+ const last = state.provisional[state.provisional.length - 1];
+ if (
+ last !== undefined &&
+ last.role === "user" &&
+ last.chunk.type === "text" &&
+ last.chunk.text === event.text
+ ) {
+ return { ...state, generating: true };
+ }
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ return {
+ ...state,
+ provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }],
+ accumulating: null,
+ generating: true,
+ };
+ }
case "text-delta": {
const acc = state.accumulating;
if (acc !== null && acc.kind === "text") {
- return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } };
+ return {
+ ...state,
+ accumulating: { kind: "text", text: acc.text + event.delta },
+ generating: true,
+ };
}
const provisional = flushAccumulating(state.provisional, acc);
return {
...state,
provisional,
accumulating: { kind: "text", text: event.delta },
+ generating: true,
};
}
case "reasoning-delta": {
const acc = state.accumulating;
if (acc !== null && acc.kind === "thinking") {
- return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } };
+ return {
+ ...state,
+ accumulating: { kind: "thinking", text: acc.text + event.delta },
+ generating: true,
+ };
}
const provisional = flushAccumulating(state.provisional, acc);
return {
...state,
provisional,
accumulating: { kind: "thinking", text: event.delta },
+ generating: true,
};
}
@@ -112,6 +167,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional: [...provisional, { role: "assistant", chunk }],
accumulating: null,
+ generating: true,
};
}
@@ -129,6 +185,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional: [...provisional, { role: "tool", chunk }],
accumulating: null,
+ generating: true,
};
}
@@ -142,6 +199,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional: [...provisional, { role: "assistant", chunk }],
accumulating: null,
+ generating: false,
};
}
@@ -158,6 +216,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional,
accumulating: null,
+ generating: false,
};
}
@@ -168,6 +227,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
provisional,
accumulating: null,
sealedTurnId: event.turnId,
+ generating: false,
};
}
}
diff --git a/src/core/chunks/selectors.ts b/src/core/chunks/selectors.ts
index 839ba65..6929de2 100644
--- a/src/core/chunks/selectors.ts
+++ b/src/core/chunks/selectors.ts
@@ -24,6 +24,15 @@ export function selectChunks(state: TranscriptState): readonly RenderedChunk[] {
}
/**
+ * Whether a turn is currently generating (for a "generating…" indicator). True
+ * for ANY client watching the conversation — the sender, a second device, or a
+ * reconnected client whose in-flight turn was replayed.
+ */
+export function selectGenerating(state: TranscriptState): boolean {
+ return state.generating;
+}
+
+/**
* Group consecutive same-role rendered chunks into ChatMessages.
*/
export function selectMessages(state: TranscriptState): readonly ChatMessage[] {
diff --git a/src/core/chunks/types.ts b/src/core/chunks/types.ts
index e031ce3..faa0d3f 100644
--- a/src/core/chunks/types.ts
+++ b/src/core/chunks/types.ts
@@ -20,6 +20,14 @@ export interface TranscriptState {
readonly currentTurnId: string | null;
readonly latestUsage: Usage | null;
readonly sealedTurnId: string | null;
+ /**
+ * True while a turn is generating on the server — derived STRUCTURALLY from the
+ * event stream: a `turn-start` (or any turn delta) with no matching `done` /
+ * `turn-sealed` / `error` yet. A late-joiner that subscribes mid-turn gets the
+ * in-flight turn replayed from its `turn-start`, so this lights up for any
+ * watching client. NOT inferred from the free-form `status` event string.
+ */
+ readonly generating: boolean;
}
/** A chunk ready for rendering: either committed (with seq) or provisional. */
diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts
index 690ba4e..a258873 100644
--- a/src/core/wire/conformance.test.ts
+++ b/src/core/wire/conformance.test.ts
@@ -27,6 +27,7 @@ describe("classifies every AgentEvent type", () => {
const samples: AgentEvent[] = [
{ type: "status", conversationId: "c1", status: "idle" },
{ type: "turn-start", conversationId: "c1", turnId: "t1" },
+ { type: "user-message", conversationId: "c1", turnId: "t1", text: "hi" },
{ type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" },
{ type: "reasoning-delta", conversationId: "c1", turnId: "t1", delta: "thinking" },
{
@@ -81,6 +82,7 @@ describe("classifies every AgentEvent type", () => {
expect(labels).toEqual([
"status",
"turn-start",
+ "user-message",
"text-delta",
"reasoning-delta",
"tool-call",
@@ -94,8 +96,8 @@ describe("classifies every AgentEvent type", () => {
]);
});
- it("covers all 12 AgentEvent variants", () => {
- expect(samples).toHaveLength(12);
+ it("covers all 13 AgentEvent variants", () => {
+ expect(samples).toHaveLength(13);
});
});
@@ -148,9 +150,18 @@ describe("classifies every WsClientMessage type", () => {
{ type: "unsubscribe" as const, surfaceId: "s" },
{ type: "invoke" as const, surfaceId: "s", actionId: "a" },
{ type: "chat.send" as const, message: "hi" },
+ { type: "chat.subscribe" as const, conversationId: "c1" },
+ { type: "chat.unsubscribe" as const, conversationId: "c1" },
];
const labels = msgs.map(assertWsClientMessageExhaustive);
- expect(labels).toEqual(["subscribe", "unsubscribe", "invoke", "chat.send"]);
+ expect(labels).toEqual([
+ "subscribe",
+ "unsubscribe",
+ "invoke",
+ "chat.send",
+ "chat.subscribe",
+ "chat.unsubscribe",
+ ]);
});
});
diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts
index d89772e..13be78c 100644
--- a/src/core/wire/conformance.ts
+++ b/src/core/wire/conformance.ts
@@ -12,6 +12,8 @@ export function assertAgentEventExhaustive(event: AgentEvent): string {
return "status";
case "turn-start":
return "turn-start";
+ case "user-message":
+ return "user-message";
case "text-delta":
return "text-delta";
case "reasoning-delta":
@@ -96,6 +98,10 @@ export function assertWsClientMessageExhaustive(msg: WsClientMessage): string {
return "invoke";
case "chat.send":
return "chat.send";
+ case "chat.subscribe":
+ return "chat.subscribe";
+ case "chat.unsubscribe":
+ return "chat.unsubscribe";
default:
return msg satisfies never;
}
diff --git a/src/features/chat/store.svelte.ts b/src/features/chat/store.svelte.ts
index 6344aec..37049bf 100644
--- a/src/features/chat/store.svelte.ts
+++ b/src/features/chat/store.svelte.ts
@@ -8,9 +8,11 @@ import type { RenderedChunk, TranscriptState } from "../../core/chunks";
import {
appendUserMessage,
applyHistory,
+ clearGenerating,
foldEvent,
initialState,
selectChunks,
+ selectGenerating,
selectMessages,
} from "../../core/chunks";
import type { MetricsState, TurnMetricsEntry } from "../../core/metrics";
@@ -43,6 +45,13 @@ export interface ChatStore {
* known yet. Never `0` for the unknown case.
*/
readonly currentContextSize: number | undefined;
+ /**
+ * Whether a turn is currently generating server-side — derived from the event
+ * stream (`turn-start`…no-`done`/`turn-sealed`-yet). True for ANY watching
+ * client: the sender, a second device, or a reconnected client whose in-flight
+ * turn was replayed. Drives the composer's "generating…" indicator.
+ */
+ readonly generating: boolean;
readonly pendingSync: boolean;
readonly error: string | null;
readonly model: string | undefined;
@@ -50,6 +59,14 @@ export interface ChatStore {
send(text: string): void;
setModel(model: string): void;
load(): Promise<void>;
+ /**
+ * Re-sync after a WS (re)connect. Clears any stale `generating` (a turn may
+ * have sealed while disconnected — the live `turn-sealed` was missed), then
+ * pulls newly-sealed turns from history (+ metrics). If the turn is still
+ * running, the server's post-subscribe replay re-asserts `generating`. The
+ * app store pairs this with a `chat.subscribe` for the conversation.
+ */
+ resync(): void;
dispose(): void;
}
@@ -101,6 +118,9 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
get currentContextSize(): number | undefined {
return selectCurrentContextSize(metrics);
},
+ get generating(): boolean {
+ return selectGenerating(transcript);
+ },
get pendingSync(): boolean {
return _pendingSync;
},
@@ -154,6 +174,16 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore {
await syncMetrics();
},
+ resync(): void {
+ if (disposed) return;
+ // A turn may have sealed while we were disconnected (missed `turn-sealed`):
+ // clear the now-stale spinner BEFORE re-subscribing, so a finished turn
+ // doesn't spin forever. A still-running turn's replay re-asserts it.
+ transcript = clearGenerating(transcript);
+ void syncTail();
+ void syncMetrics();
+ },
+
dispose(): void {
disposed = true;
},
diff --git a/src/features/chat/store.test.ts b/src/features/chat/store.test.ts
index 1c99e7c..6507d69 100644
--- a/src/features/chat/store.test.ts
+++ b/src/features/chat/store.test.ts
@@ -802,4 +802,114 @@ describe("createChatStore", () => {
store.dispose();
});
+
+ it("generating reflects the turn lifecycle (idle → running → idle)", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ expect(store.generating).toBe(false);
+
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ expect(store.generating).toBe(true);
+
+ store.handleDelta(
+ deltaEvent({ type: "text-delta", conversationId: CONV_ID, turnId: "t1", delta: "hi" }),
+ );
+ expect(store.generating).toBe(true);
+
+ store.handleDelta(
+ deltaEvent({ type: "done", conversationId: CONV_ID, turnId: "t1", reason: "end-turn" }),
+ );
+ expect(store.generating).toBe(false);
+
+ store.dispose();
+ });
+
+ it("generating lights up for a watcher whose turn was replayed (no send first)", () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ // A late-joiner receives the in-flight turn replayed from turn-start.
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ store.handleDelta(
+ deltaEvent({ type: "text-delta", conversationId: CONV_ID, turnId: "t1", delta: "partial" }),
+ );
+ expect(store.generating).toBe(true);
+ expect(transport.sent).toHaveLength(0); // it never sent — it's just watching
+
+ store.dispose();
+ });
+
+ it("resync clears a stale generating flag and re-syncs history + metrics", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ // Disconnected mid-turn: turn-start seen, but the live done/turn-sealed was
+ // missed, so generating is stuck true.
+ store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" }));
+ expect(store.generating).toBe(true);
+
+ // The turn actually sealed while we were gone — history now has the chunks.
+ historySync.returnChunks = [makeStoredChunk(1), makeStoredChunk(2)];
+
+ store.resync();
+
+ // Generating is cleared synchronously (a finished turn must not spin forever).
+ expect(store.generating).toBe(false);
+
+ await vi.waitFor(() => {
+ expect(historySync.calls).toHaveLength(1);
+ expect(metricsSync.calls).toHaveLength(1);
+ });
+
+ store.dispose();
+ });
+
+ it("resync is a no-op after dispose", async () => {
+ const transport = createFakeTransport();
+ const historySync = createFakeHistorySync();
+ const metricsSync = createFakeMetricsSync();
+ const cache = createFakeCache();
+ const store = createChatStore({
+ conversationId: CONV_ID,
+ transport: transport.impl,
+ historySync: historySync.impl,
+ metricsSync: metricsSync.impl,
+ cache: cache.impl,
+ });
+
+ store.dispose();
+ store.resync();
+
+ await new Promise((r) => setTimeout(r, 10));
+ expect(historySync.calls).toHaveLength(0);
+ expect(metricsSync.calls).toHaveLength(0);
+ });
});