diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 15:08:24 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 15:08:24 +0900 |
| commit | 5ef7cc2916c544a66d68805063b02290f24d9a25 (patch) | |
| tree | 51724187d01813bbbbaef513eb8cada2e1bda1a6 /src/app | |
| parent | fb37680bd013509ab5d72619f261713e8473e988 (diff) | |
| download | dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.tar.gz dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.zip | |
feat(chat): multi-client live view — watch in-flight turns + user prompt on stream
- subscribe every open conversation on load + WS reconnect (resync), unsubscribe on tab close
- derive a stream-based 'generating' state for watchers (Composer running indicator)
- fold the user-message turn event so watchers render the prompt mid-turn (de-dup vs sender's optimistic echo)
- re-pin [email protected] / [email protected]; re-mirror contracts; add user-message to the exhaustiveness guard
Diffstat (limited to 'src/app')
| -rw-r--r-- | src/app/App.svelte | 6 | ||||
| -rw-r--r-- | src/app/store.svelte.ts | 31 | ||||
| -rw-r--r-- | src/app/store.test.ts | 153 |
3 files changed, 187 insertions, 3 deletions
diff --git a/src/app/App.svelte b/src/app/App.svelte index dbb346a..50f24e7 100644 --- a/src/app/App.svelte +++ b/src/app/App.svelte @@ -219,7 +219,11 @@ <Composer onSend={handleSend} contextSize={store.activeChat.currentContextSize} - status={store.activeChat.error ? "error" : "idle"} + status={store.activeChat.error + ? "error" + : store.activeChat.generating + ? "running" + : "idle"} /> </div> diff --git a/src/app/store.svelte.ts b/src/app/store.svelte.ts index 6991530..df92b31 100644 --- a/src/app/store.svelte.ts +++ b/src/app/store.svelte.ts @@ -239,6 +239,23 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { } } + /** + * Start watching a conversation's live turn events (`chat.subscribe`). Sent for + * EVERY open conversation — not just the active one — so a backgrounded tab keeps + * streaming a running turn, and a reloaded/second client re-attaches to an + * in-flight turn (the server replays it from `turn-start`). Idempotent server-side; + * the socket queues it until the connection is open. NOT needed right after + * `chat.send` (that auto-subscribes the sending connection). + */ + function subscribeChat(conversationId: string): void { + socket?.send({ type: "chat.subscribe", conversationId }); + } + + /** Stop watching a conversation's turn events (`chat.unsubscribe`). Never stops the turn. */ + function unsubscribeChat(conversationId: string): void { + socket?.send({ type: "chat.unsubscribe", conversationId }); + } + /** The conversation the surfaces should scope to (undefined for a draft). */ function focusedConversationId(): string | undefined { return tabsStore.activeConversationId ?? undefined; @@ -307,6 +324,14 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { : { type: "subscribe", surfaceId, conversationId: sub.conversationId }; socket?.send(msg); } + // Re-attach to every open conversation's turn stream. A turn that kept + // running while we were disconnected resumes streaming (server replays it + // from `turn-start`); one that sealed while we were gone is committed from + // history by `resync()` (which also clears a now-stale "generating"). + for (const tab of tabsStore.tabs) { + subscribeChat(tab.conversationId); + chatStores.get(tab.conversationId)?.resync(); + } }, }; if (opts?.socketFactory !== undefined) { @@ -341,6 +366,10 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { const store = createChatFor(tab.conversationId, tab.model); chatStores.set(tab.conversationId, store); void store.load(); + // Watch each restored conversation's live turns: after a reload mid-turn the + // server replays the in-flight turn so we keep rendering it. Queued until the + // socket opens. + subscribeChat(tab.conversationId); } if (persistedState.activeConversationId !== null) { const activeTab = persistedState.tabs.find( @@ -460,6 +489,8 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore { closeTab(conversationId: string): void { tabsStore.closeTab(conversationId); + // Stop watching the closed conversation's turns (does NOT stop the turn). + unsubscribeChat(conversationId); const store = chatStores.get(conversationId); if (store !== undefined) { store.dispose(); diff --git a/src/app/store.test.ts b/src/app/store.test.ts index 19530e2..803d7dc 100644 --- a/src/app/store.test.ts +++ b/src/app/store.test.ts @@ -1,6 +1,6 @@ import type { ConversationHistoryResponse, WsServerMessage } from "@dispatch/transport-contract"; import type { SurfaceServerMessage } from "@dispatch/ui-contract"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import type { WebSocketLike } from "../adapters/ws"; import { createAppStore } from "./store.svelte"; @@ -51,6 +51,55 @@ function fakeSocket(): FakeSocket { return ws; } +/** + * A fake socket that supports the close→reconnect cycle (the base `fakeSocket` + * swallows `onclose`). The factory returns the SAME instance on every connect, so + * `sent` accumulates and `open()` can be driven again after `closeRemote()`. + */ +interface ReconnectableSocket extends WebSocketLike { + sent: string[]; + open(): void; + closeRemote(): void; +} + +function reconnectableSocket(): ReconnectableSocket { + let onopen: (() => void) | null = null; + let onmessage: ((ev: { data: string }) => void) | null = null; + let onclose: ((ev: { code: number; reason: string }) => void) | null = null; + const sent: string[] = []; + return { + send(data: string) { + sent.push(data); + }, + close() {}, + get onopen() { + return onopen; + }, + set onopen(fn) { + onopen = fn; + }, + get onmessage() { + return onmessage; + }, + set onmessage(fn) { + onmessage = fn; + }, + get onclose() { + return onclose; + }, + set onclose(fn) { + onclose = fn; + }, + sent, + open() { + onopen?.(); + }, + closeRemote() { + onclose?.({ code: 1006, reason: "" }); + }, + }; +} + interface FakeFetchOptions { models?: readonly string[]; history?: Record<string, ConversationHistoryResponse>; @@ -70,7 +119,7 @@ function fakeFetchImpl(opts?: FakeFetchOptions): typeof fetch { }; } -function parseSent(ws: FakeSocket): unknown[] { +function parseSent(ws: { sent: string[] }): unknown[] { return ws.sent.map((s) => JSON.parse(s)); } @@ -752,4 +801,104 @@ describe("createAppStore", () => { store.dispose(); }); + + it("subscribes to chat for each restored tab on page load", () => { + const storage = createFakeStorage(); + // First session: create a tab, then dispose. + const ws1 = fakeSocket(); + const store1 = createAppStore({ + socketFactory: () => ws1, + fetchImpl: fakeFetchImpl(), + localStorage: storage, + }); + ws1.resolveOpen(); + store1.send("persist me"); + const convId = store1.tabs[0]?.conversationId as string; + expect(convId).toBeDefined(); + store1.dispose(); + + // Second session: the restored tab must be re-subscribed for live turns. + const ws2 = fakeSocket(); + const store2 = createAppStore({ + socketFactory: () => ws2, + fetchImpl: fakeFetchImpl(), + localStorage: storage, + }); + ws2.resolveOpen(); // flush the queued chat.subscribe + + const subscribed = parseSent(ws2) + .filter((p) => (p as { type: string }).type === "chat.subscribe") + .map((p) => (p as { conversationId: string }).conversationId); + expect(subscribed).toContain(convId); + + store2.dispose(); + }); + + it("unsubscribes from chat when a tab is closed", () => { + const ws = fakeSocket(); + const store = createAppStore({ + socketFactory: () => ws, + fetchImpl: fakeFetchImpl(), + localStorage: createFakeStorage(), + }); + ws.resolveOpen(); + + store.send("first"); + const convId = activeConversationId(store); + + ws.sent.length = 0; + store.closeTab(convId); + + const unsubscribed = parseSent(ws) + .filter((p) => (p as { type: string }).type === "chat.unsubscribe") + .map((p) => (p as { conversationId: string }).conversationId); + expect(unsubscribed).toContain(convId); + + store.dispose(); + }); + + it("re-subscribes chat (and resyncs) for every open conversation on reconnect", async () => { + const fetchedUrls: string[] = []; + const fetchImpl: typeof fetch = async (input: string | URL | Request): Promise<Response> => { + const url = typeof input === "string" ? input : input instanceof URL ? input.href : input.url; + fetchedUrls.push(url); + if (url.endsWith("/models")) { + return new Response(JSON.stringify({ models: ["opencode/deepseek-v4-flash"] }), { + status: 200, + }); + } + return new Response(JSON.stringify({ chunks: [], latestSeq: 0 }), { status: 200 }); + }; + + const ws = reconnectableSocket(); + const store = createAppStore({ + socketFactory: () => ws, + fetchImpl, + httpUrl: "http://localhost:24203", + localStorage: createFakeStorage(), + }); + ws.open(); + + store.send("hi"); + const convId = activeConversationId(store); + + // Drop the connection, wait past the reconnect backoff, then re-open. + ws.sent.length = 0; + fetchedUrls.length = 0; + ws.closeRemote(); + await new Promise((r) => setTimeout(r, 800)); + ws.open(); // reconnect → onReopen + + const subscribed = parseSent(ws) + .filter((p) => (p as { type: string }).type === "chat.subscribe") + .map((p) => (p as { conversationId: string }).conversationId); + expect(subscribed).toContain(convId); + + // resync() pulled the tail from history for the reconnected conversation. + await vi.waitFor(() => { + expect(fetchedUrls.some((u) => u.includes(`/conversations/${convId}?sinceSeq=`))).toBe(true); + }); + + store.dispose(); + }); }); |
