diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 15:08:24 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 15:08:24 +0900 |
| commit | 5ef7cc2916c544a66d68805063b02290f24d9a25 (patch) | |
| tree | 51724187d01813bbbbaef513eb8cada2e1bda1a6 /src/features | |
| parent | fb37680bd013509ab5d72619f261713e8473e988 (diff) | |
| download | dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.tar.gz dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.zip | |
feat(chat): multi-client live view — watch in-flight turns + user prompt on stream
- subscribe every open conversation on load + WS reconnect (resync), unsubscribe on tab close
- derive a stream-based 'generating' state for watchers (Composer running indicator)
- fold the user-message turn event so watchers render the prompt mid-turn (de-dup vs sender's optimistic echo)
- re-pin [email protected] / [email protected]; re-mirror contracts; add user-message to the exhaustiveness guard
Diffstat (limited to 'src/features')
| -rw-r--r-- | src/features/chat/store.svelte.ts | 30 | ||||
| -rw-r--r-- | src/features/chat/store.test.ts | 110 |
2 files changed, 140 insertions, 0 deletions
diff --git a/src/features/chat/store.svelte.ts b/src/features/chat/store.svelte.ts index 6344aec..37049bf 100644 --- a/src/features/chat/store.svelte.ts +++ b/src/features/chat/store.svelte.ts @@ -8,9 +8,11 @@ import type { RenderedChunk, TranscriptState } from "../../core/chunks"; import { appendUserMessage, applyHistory, + clearGenerating, foldEvent, initialState, selectChunks, + selectGenerating, selectMessages, } from "../../core/chunks"; import type { MetricsState, TurnMetricsEntry } from "../../core/metrics"; @@ -43,6 +45,13 @@ export interface ChatStore { * known yet. Never `0` for the unknown case. */ readonly currentContextSize: number | undefined; + /** + * Whether a turn is currently generating server-side — derived from the event + * stream (`turn-start`…no-`done`/`turn-sealed`-yet). True for ANY watching + * client: the sender, a second device, or a reconnected client whose in-flight + * turn was replayed. Drives the composer's "generating…" indicator. + */ + readonly generating: boolean; readonly pendingSync: boolean; readonly error: string | null; readonly model: string | undefined; @@ -50,6 +59,14 @@ export interface ChatStore { send(text: string): void; setModel(model: string): void; load(): Promise<void>; + /** + * Re-sync after a WS (re)connect. Clears any stale `generating` (a turn may + * have sealed while disconnected — the live `turn-sealed` was missed), then + * pulls newly-sealed turns from history (+ metrics). If the turn is still + * running, the server's post-subscribe replay re-asserts `generating`. The + * app store pairs this with a `chat.subscribe` for the conversation. + */ + resync(): void; dispose(): void; } @@ -101,6 +118,9 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { get currentContextSize(): number | undefined { return selectCurrentContextSize(metrics); }, + get generating(): boolean { + return selectGenerating(transcript); + }, get pendingSync(): boolean { return _pendingSync; }, @@ -154,6 +174,16 @@ export function createChatStore(deps: ChatStoreDependencies): ChatStore { await syncMetrics(); }, + resync(): void { + if (disposed) return; + // A turn may have sealed while we were disconnected (missed `turn-sealed`): + // clear the now-stale spinner BEFORE re-subscribing, so a finished turn + // doesn't spin forever. A still-running turn's replay re-asserts it. + transcript = clearGenerating(transcript); + void syncTail(); + void syncMetrics(); + }, + dispose(): void { disposed = true; }, diff --git a/src/features/chat/store.test.ts b/src/features/chat/store.test.ts index 1c99e7c..6507d69 100644 --- a/src/features/chat/store.test.ts +++ b/src/features/chat/store.test.ts @@ -802,4 +802,114 @@ describe("createChatStore", () => { store.dispose(); }); + + it("generating reflects the turn lifecycle (idle → running → idle)", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + expect(store.generating).toBe(false); + + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + expect(store.generating).toBe(true); + + store.handleDelta( + deltaEvent({ type: "text-delta", conversationId: CONV_ID, turnId: "t1", delta: "hi" }), + ); + expect(store.generating).toBe(true); + + store.handleDelta( + deltaEvent({ type: "done", conversationId: CONV_ID, turnId: "t1", reason: "end-turn" }), + ); + expect(store.generating).toBe(false); + + store.dispose(); + }); + + it("generating lights up for a watcher whose turn was replayed (no send first)", () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + // A late-joiner receives the in-flight turn replayed from turn-start. + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + store.handleDelta( + deltaEvent({ type: "text-delta", conversationId: CONV_ID, turnId: "t1", delta: "partial" }), + ); + expect(store.generating).toBe(true); + expect(transport.sent).toHaveLength(0); // it never sent — it's just watching + + store.dispose(); + }); + + it("resync clears a stale generating flag and re-syncs history + metrics", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + // Disconnected mid-turn: turn-start seen, but the live done/turn-sealed was + // missed, so generating is stuck true. + store.handleDelta(deltaEvent({ type: "turn-start", conversationId: CONV_ID, turnId: "t1" })); + expect(store.generating).toBe(true); + + // The turn actually sealed while we were gone — history now has the chunks. + historySync.returnChunks = [makeStoredChunk(1), makeStoredChunk(2)]; + + store.resync(); + + // Generating is cleared synchronously (a finished turn must not spin forever). + expect(store.generating).toBe(false); + + await vi.waitFor(() => { + expect(historySync.calls).toHaveLength(1); + expect(metricsSync.calls).toHaveLength(1); + }); + + store.dispose(); + }); + + it("resync is a no-op after dispose", async () => { + const transport = createFakeTransport(); + const historySync = createFakeHistorySync(); + const metricsSync = createFakeMetricsSync(); + const cache = createFakeCache(); + const store = createChatStore({ + conversationId: CONV_ID, + transport: transport.impl, + historySync: historySync.impl, + metricsSync: metricsSync.impl, + cache: cache.impl, + }); + + store.dispose(); + store.resync(); + + await new Promise((r) => setTimeout(r, 10)); + expect(historySync.calls).toHaveLength(0); + expect(metricsSync.calls).toHaveLength(0); + }); }); |
