summaryrefslogtreecommitdiffhomepage
path: root/src/app
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-12 15:08:24 +0900
committerAdam Malczewski <[email protected]>2026-06-12 15:08:24 +0900
commit5ef7cc2916c544a66d68805063b02290f24d9a25 (patch)
tree51724187d01813bbbbaef513eb8cada2e1bda1a6 /src/app
parentfb37680bd013509ab5d72619f261713e8473e988 (diff)
downloaddispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.tar.gz
dispatch-web-5ef7cc2916c544a66d68805063b02290f24d9a25.zip
feat(chat): multi-client live view — watch in-flight turns + user prompt on stream
- subscribe every open conversation on load + WS reconnect (resync), unsubscribe on tab close - derive a stream-based 'generating' state for watchers (Composer running indicator) - fold the user-message turn event so watchers render the prompt mid-turn (de-dup vs sender's optimistic echo) - re-pin [email protected] / [email protected]; re-mirror contracts; add user-message to the exhaustiveness guard
Diffstat (limited to 'src/app')
-rw-r--r--src/app/App.svelte6
-rw-r--r--src/app/store.svelte.ts31
-rw-r--r--src/app/store.test.ts153
3 files changed, 187 insertions, 3 deletions
diff --git a/src/app/App.svelte b/src/app/App.svelte
index dbb346a..50f24e7 100644
--- a/src/app/App.svelte
+++ b/src/app/App.svelte
@@ -219,7 +219,11 @@
<Composer
onSend={handleSend}
contextSize={store.activeChat.currentContextSize}
- status={store.activeChat.error ? "error" : "idle"}
+ status={store.activeChat.error
+ ? "error"
+ : store.activeChat.generating
+ ? "running"
+ : "idle"}
/>
</div>
diff --git a/src/app/store.svelte.ts b/src/app/store.svelte.ts
index 6991530..df92b31 100644
--- a/src/app/store.svelte.ts
+++ b/src/app/store.svelte.ts
@@ -239,6 +239,23 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
}
}
+ /**
+ * Start watching a conversation's live turn events (`chat.subscribe`). Sent for
+ * EVERY open conversation — not just the active one — so a backgrounded tab keeps
+ * streaming a running turn, and a reloaded/second client re-attaches to an
+ * in-flight turn (the server replays it from `turn-start`). Idempotent server-side;
+ * the socket queues it until the connection is open. NOT needed right after
+ * `chat.send` (that auto-subscribes the sending connection).
+ */
+ function subscribeChat(conversationId: string): void {
+ socket?.send({ type: "chat.subscribe", conversationId });
+ }
+
+ /** Stop watching a conversation's turn events (`chat.unsubscribe`). Never stops the turn. */
+ function unsubscribeChat(conversationId: string): void {
+ socket?.send({ type: "chat.unsubscribe", conversationId });
+ }
+
/** The conversation the surfaces should scope to (undefined for a draft). */
function focusedConversationId(): string | undefined {
return tabsStore.activeConversationId ?? undefined;
@@ -307,6 +324,14 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
: { type: "subscribe", surfaceId, conversationId: sub.conversationId };
socket?.send(msg);
}
+ // Re-attach to every open conversation's turn stream. A turn that kept
+ // running while we were disconnected resumes streaming (server replays it
+ // from `turn-start`); one that sealed while we were gone is committed from
+ // history by `resync()` (which also clears a now-stale "generating").
+ for (const tab of tabsStore.tabs) {
+ subscribeChat(tab.conversationId);
+ chatStores.get(tab.conversationId)?.resync();
+ }
},
};
if (opts?.socketFactory !== undefined) {
@@ -341,6 +366,10 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
const store = createChatFor(tab.conversationId, tab.model);
chatStores.set(tab.conversationId, store);
void store.load();
+ // Watch each restored conversation's live turns: after a reload mid-turn the
+ // server replays the in-flight turn so we keep rendering it. Queued until the
+ // socket opens.
+ subscribeChat(tab.conversationId);
}
if (persistedState.activeConversationId !== null) {
const activeTab = persistedState.tabs.find(
@@ -460,6 +489,8 @@ export function createAppStore(opts?: CreateAppStoreOptions): AppStore {
closeTab(conversationId: string): void {
tabsStore.closeTab(conversationId);
+ // Stop watching the closed conversation's turns (does NOT stop the turn).
+ unsubscribeChat(conversationId);
const store = chatStores.get(conversationId);
if (store !== undefined) {
store.dispose();
diff --git a/src/app/store.test.ts b/src/app/store.test.ts
index 19530e2..803d7dc 100644
--- a/src/app/store.test.ts
+++ b/src/app/store.test.ts
@@ -1,6 +1,6 @@
import type { ConversationHistoryResponse, WsServerMessage } from "@dispatch/transport-contract";
import type { SurfaceServerMessage } from "@dispatch/ui-contract";
-import { describe, expect, it } from "vitest";
+import { describe, expect, it, vi } from "vitest";
import type { WebSocketLike } from "../adapters/ws";
import { createAppStore } from "./store.svelte";
@@ -51,6 +51,55 @@ function fakeSocket(): FakeSocket {
return ws;
}
+/**
+ * A fake socket that supports the close→reconnect cycle (the base `fakeSocket`
+ * swallows `onclose`). The factory returns the SAME instance on every connect, so
+ * `sent` accumulates and `open()` can be driven again after `closeRemote()`.
+ */
+interface ReconnectableSocket extends WebSocketLike {
+ sent: string[];
+ open(): void;
+ closeRemote(): void;
+}
+
+function reconnectableSocket(): ReconnectableSocket {
+ let onopen: (() => void) | null = null;
+ let onmessage: ((ev: { data: string }) => void) | null = null;
+ let onclose: ((ev: { code: number; reason: string }) => void) | null = null;
+ const sent: string[] = [];
+ return {
+ send(data: string) {
+ sent.push(data);
+ },
+ close() {},
+ get onopen() {
+ return onopen;
+ },
+ set onopen(fn) {
+ onopen = fn;
+ },
+ get onmessage() {
+ return onmessage;
+ },
+ set onmessage(fn) {
+ onmessage = fn;
+ },
+ get onclose() {
+ return onclose;
+ },
+ set onclose(fn) {
+ onclose = fn;
+ },
+ sent,
+ open() {
+ onopen?.();
+ },
+ closeRemote() {
+ onclose?.({ code: 1006, reason: "" });
+ },
+ };
+}
+
interface FakeFetchOptions {
models?: readonly string[];
history?: Record<string, ConversationHistoryResponse>;
@@ -70,7 +119,7 @@ function fakeFetchImpl(opts?: FakeFetchOptions): typeof fetch {
};
}
-function parseSent(ws: FakeSocket): unknown[] {
+function parseSent(ws: { sent: string[] }): unknown[] {
return ws.sent.map((s) => JSON.parse(s));
}
@@ -752,4 +801,104 @@ describe("createAppStore", () => {
store.dispose();
});
+
+ it("subscribes to chat for each restored tab on page load", () => {
+ const storage = createFakeStorage();
+ // First session: create a tab, then dispose.
+ const ws1 = fakeSocket();
+ const store1 = createAppStore({
+ socketFactory: () => ws1,
+ fetchImpl: fakeFetchImpl(),
+ localStorage: storage,
+ });
+ ws1.resolveOpen();
+ store1.send("persist me");
+ const convId = store1.tabs[0]?.conversationId as string;
+ expect(convId).toBeDefined();
+ store1.dispose();
+
+ // Second session: the restored tab must be re-subscribed for live turns.
+ const ws2 = fakeSocket();
+ const store2 = createAppStore({
+ socketFactory: () => ws2,
+ fetchImpl: fakeFetchImpl(),
+ localStorage: storage,
+ });
+ ws2.resolveOpen(); // flush the queued chat.subscribe
+
+ const subscribed = parseSent(ws2)
+ .filter((p) => (p as { type: string }).type === "chat.subscribe")
+ .map((p) => (p as { conversationId: string }).conversationId);
+ expect(subscribed).toContain(convId);
+
+ store2.dispose();
+ });
+
+ it("unsubscribes from chat when a tab is closed", () => {
+ const ws = fakeSocket();
+ const store = createAppStore({
+ socketFactory: () => ws,
+ fetchImpl: fakeFetchImpl(),
+ localStorage: createFakeStorage(),
+ });
+ ws.resolveOpen();
+
+ store.send("first");
+ const convId = activeConversationId(store);
+
+ ws.sent.length = 0;
+ store.closeTab(convId);
+
+ const unsubscribed = parseSent(ws)
+ .filter((p) => (p as { type: string }).type === "chat.unsubscribe")
+ .map((p) => (p as { conversationId: string }).conversationId);
+ expect(unsubscribed).toContain(convId);
+
+ store.dispose();
+ });
+
+ it("re-subscribes chat (and resyncs) for every open conversation on reconnect", async () => {
+ const fetchedUrls: string[] = [];
+ const fetchImpl: typeof fetch = async (input: string | URL | Request): Promise<Response> => {
+ const url = typeof input === "string" ? input : input instanceof URL ? input.href : input.url;
+ fetchedUrls.push(url);
+ if (url.endsWith("/models")) {
+ return new Response(JSON.stringify({ models: ["opencode/deepseek-v4-flash"] }), {
+ status: 200,
+ });
+ }
+ return new Response(JSON.stringify({ chunks: [], latestSeq: 0 }), { status: 200 });
+ };
+
+ const ws = reconnectableSocket();
+ const store = createAppStore({
+ socketFactory: () => ws,
+ fetchImpl,
+ httpUrl: "http://localhost:24203",
+ localStorage: createFakeStorage(),
+ });
+ ws.open();
+
+ store.send("hi");
+ const convId = activeConversationId(store);
+
+ // Drop the connection, wait past the reconnect backoff, then re-open.
+ ws.sent.length = 0;
+ fetchedUrls.length = 0;
+ ws.closeRemote();
+ await new Promise((r) => setTimeout(r, 800));
+ ws.open(); // reconnect → onReopen
+
+ const subscribed = parseSent(ws)
+ .filter((p) => (p as { type: string }).type === "chat.subscribe")
+ .map((p) => (p as { conversationId: string }).conversationId);
+ expect(subscribed).toContain(convId);
+
+ // resync() pulled the tail from history for the reconnected conversation.
+ await vi.waitFor(() => {
+ expect(fetchedUrls.some((u) => u.includes(`/conversations/${convId}?sinceSeq=`))).toBe(true);
+ });
+
+ store.dispose();
+ });
});