From 5ef7cc2916c544a66d68805063b02290f24d9a25 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Fri, 12 Jun 2026 15:08:24 +0900 Subject: feat(chat): multi-client live view — watch in-flight turns + user prompt on stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - subscribe every open conversation on load + WS reconnect (resync), unsubscribe on tab close - derive a stream-based 'generating' state for watchers (Composer running indicator) - fold the user-message turn event so watchers render the prompt mid-turn (de-dup vs sender's optimistic echo) - re-pin wire@0.6.0 / transport-contract@0.8.0; re-mirror contracts; add user-message to the exhaustiveness guard --- src/core/chunks/index.ts | 10 ++- src/core/chunks/reducer.test.ts | 160 +++++++++++++++++++++++++++++++++++++- src/core/chunks/reducer.ts | 66 +++++++++++++++- src/core/chunks/selectors.ts | 9 +++ src/core/chunks/types.ts | 8 ++ src/core/wire/conformance.test.ts | 17 +++- src/core/wire/conformance.ts | 6 ++ 7 files changed, 266 insertions(+), 10 deletions(-) (limited to 'src/core') diff --git a/src/core/chunks/index.ts b/src/core/chunks/index.ts index 0718c0d..ecfee74 100644 --- a/src/core/chunks/index.ts +++ b/src/core/chunks/index.ts @@ -1,7 +1,13 @@ export type { RenderGroup, ToolBatchEntry } from "./groups"; export { groupRenderedChunks } from "./groups"; -export { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer"; -export { selectChunks, selectMessages } from "./selectors"; +export { + appendUserMessage, + applyHistory, + clearGenerating, + foldEvent, + initialState, +} from "./reducer"; +export { selectChunks, selectGenerating, selectMessages } from "./selectors"; export type { AccumulatingChunk, ProvisionalChunk, diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts index f2f1b75..35a586c 100644 --- a/src/core/chunks/reducer.test.ts +++ b/src/core/chunks/reducer.test.ts @@ -3,6 +3,7 @@ import type { StoredChunk, TurnDoneEvent, TurnErrorEvent, + TurnInputEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, @@ -12,8 +13,14 @@ import type { TurnUsageEvent, } from "@dispatch/wire"; import { describe, expect, it } from "vitest"; -import { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer"; -import { selectChunks, selectMessages } from "./selectors"; +import { + appendUserMessage, + applyHistory, + clearGenerating, + foldEvent, + initialState, +} from "./reducer"; +import { selectChunks, selectGenerating, selectMessages } from "./selectors"; const turnStart = (turnId: string): TurnStartEvent => ({ type: "turn-start", @@ -112,6 +119,101 @@ describe("initialState", () => { expect(s.currentTurnId).toBeNull(); expect(s.latestUsage).toBeNull(); expect(s.sealedTurnId).toBeNull(); + expect(s.generating).toBe(false); + }); +}); + +describe("foldEvent — generating (turn-running state)", () => { + it("turn-start sets generating true", () => { + let s = initialState(); + expect(selectGenerating(s)).toBe(false); + s = foldEvent(s, turnStart("t1")); + expect(s.generating).toBe(true); + expect(selectGenerating(s)).toBe(true); + }); + + it("a content delta sets generating true (e.g. a late-joiner replay missing turn-start)", () => { + let s = initialState(); + s = foldEvent(s, textDelta("t1", "hi")); + expect(s.generating).toBe(true); + s = initialState(); + s = foldEvent(s, reasoningDelta("t1", "hmm")); + expect(s.generating).toBe(true); + s = initialState(); + s = foldEvent(s, toolCall("t1", "tc1", "bash", {})); + expect(s.generating).toBe(true); + }); + + it("stays generating across the turn's deltas", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "wor")); + s = foldEvent(s, textDelta("t1", "king")); + expect(s.generating).toBe(true); + }); + + it("done clears generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "answer")); + s = foldEvent(s, doneEvent("t1")); + expect(s.generating).toBe(false); + }); + + it("turn-sealed clears generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, turnSealed("t1")); + expect(s.generating).toBe(false); + }); + + it("error clears generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, errorEvent("t1", "boom")); + expect(s.generating).toBe(false); + }); + + it("a new turn re-asserts generating after the previous one finished", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, doneEvent("t1")); + s = foldEvent(s, turnSealed("t1")); + expect(s.generating).toBe(false); + s = foldEvent(s, turnStart("t2")); + expect(s.generating).toBe(true); + }); + + it("status does not change generating (free-form string, not inferred)", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + const next = foldEvent(s, { type: "status", conversationId: "c1", status: "idle" }); + expect(next.generating).toBe(true); + }); +}); + +describe("clearGenerating", () => { + it("clears a set generating flag", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + expect(s.generating).toBe(true); + const cleared = clearGenerating(s); + expect(cleared.generating).toBe(false); + }); + + it("returns the same object when already not generating (no-op)", () => { + const s = initialState(); + expect(clearGenerating(s)).toBe(s); + }); + + it("preserves transcript content while clearing generating", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "partial")); + const cleared = clearGenerating(s); + expect(cleared.generating).toBe(false); + expect(cleared.accumulating).toEqual({ kind: "text", text: "partial" }); + expect(cleared.currentTurnId).toBe("t1"); }); }); @@ -281,6 +383,60 @@ describe("foldEvent — status and tool-output", () => { }); }); +describe("foldEvent — user-message (the turn's user prompt; backend CR-3)", () => { + const userMessage = (text: string): TurnInputEvent => ({ + type: "user-message", + conversationId: "c1", + turnId: "t1", + text, + }); + + it("a watcher renders the prompt: appends a provisional user chunk + marks generating", () => { + let s = initialState(); + s = foldEvent(s, userMessage("what is 2+2?")); + const chunks = selectChunks(s); + expect(chunks).toHaveLength(1); + expect(chunks[0]?.role).toBe("user"); + expect(chunks[0]?.chunk).toEqual({ type: "text", text: "what is 2+2?" }); + expect(chunks[0]?.provisional).toBe(true); + expect(s.generating).toBe(true); + }); + + it("dedups the SENDER's optimistic echo (no duplicate user bubble)", () => { + let s = initialState(); + s = appendUserMessage(s, "hi"); // optimistic echo from the sender's send() + s = foldEvent(s, userMessage("hi")); // server echo for the same turn + const users = selectChunks(s).filter((c) => c.role === "user"); + expect(users).toHaveLength(1); + }); + + it("appends when the trailing provisional differs (no false dedup)", () => { + let s = initialState(); + s = appendUserMessage(s, "first"); + s = foldEvent(s, userMessage("second")); + const users = selectChunks(s).filter((c) => c.role === "user"); + expect(users).toHaveLength(2); + }); + + it("ignores an empty user-message", () => { + let s = initialState(); + s = foldEvent(s, userMessage("")); + expect(selectChunks(s)).toHaveLength(0); + expect(s.generating).toBe(false); + }); + + it("flushes an accumulating chunk before appending the prompt", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "partial")); + s = foldEvent(s, userMessage("new prompt")); + // the partial assistant text was flushed to provisional, then the user prompt appended + expect(s.accumulating).toBeNull(); + const roles = selectChunks(s).map((c) => c.role); + expect(roles).toEqual(["assistant", "user"]); + }); +}); + describe("applyHistory", () => { it("orders committed chunks by seq", () => { const s = initialState(); diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts index 54b1922..7ce55ce 100644 --- a/src/core/chunks/reducer.ts +++ b/src/core/chunks/reducer.ts @@ -10,9 +10,22 @@ export function initialState(): TranscriptState { currentTurnId: null, latestUsage: null, sealedTurnId: null, + generating: false, }; } +/** + * Clear the `generating` flag without touching anything else. Used on a WS + * (re)connect: a turn may have sealed while we were disconnected, so the live + * `turn-sealed`/`done` that would have cleared `generating` was missed. The + * caller resets here, then re-subscribes — if the turn is still running the + * server's replay re-asserts `generating` via the replayed `turn-start`. + */ +export function clearGenerating(state: TranscriptState): TranscriptState { + if (!state.generating) return state; + return { ...state, generating: false }; +} + function flushAccumulating( provisional: readonly ProvisionalChunk[], acc: AccumulatingChunk | null, @@ -55,6 +68,8 @@ export function applyHistory( * Fold one live AgentEvent into the provisional state. * * - `turn-start` records the turnId. + * - `user-message` appends the turn's user prompt (de-duped vs the sender's + * optimistic echo) so a watcher renders it mid-turn. * - `text-delta` extends the current accumulating TextChunk (or starts one). * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one). * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and @@ -63,6 +78,11 @@ export function applyHistory( * - `done` finalizes any accumulating chunk (turn still provisional). * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId. * - `status` and `tool-output` are ignored (best-effort no-ops). + * + * `generating` is folded structurally: a `turn-start` or any content delta sets + * it true; `done` / `turn-sealed` / `error` clear it. This is what a watching + * (or reconnected) client renders as "generating…", with no dependence on the + * free-form `status` event string. */ export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState { switch (event.type) { @@ -71,31 +91,66 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript return state; case "turn-start": - return { ...state, currentTurnId: event.turnId }; + return { ...state, currentTurnId: event.turnId, generating: true }; + + case "user-message": { + // The turn's USER prompt, surfaced on the event stream (backend CR-3) so a + // WATCHER/late-joiner renders it mid-turn instead of waiting for seal. The + // SENDER already echoed its own prompt optimistically (`appendUserMessage`), + // so DE-DUP: skip if the trailing provisional chunk is already an identical + // user text chunk. A pure watcher has no such echo → it appends and renders. + if (event.text.length === 0) return state; + const last = state.provisional[state.provisional.length - 1]; + if ( + last !== undefined && + last.role === "user" && + last.chunk.type === "text" && + last.chunk.text === event.text + ) { + return { ...state, generating: true }; + } + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }], + accumulating: null, + generating: true, + }; + } case "text-delta": { const acc = state.accumulating; if (acc !== null && acc.kind === "text") { - return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } }; + return { + ...state, + accumulating: { kind: "text", text: acc.text + event.delta }, + generating: true, + }; } const provisional = flushAccumulating(state.provisional, acc); return { ...state, provisional, accumulating: { kind: "text", text: event.delta }, + generating: true, }; } case "reasoning-delta": { const acc = state.accumulating; if (acc !== null && acc.kind === "thinking") { - return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } }; + return { + ...state, + accumulating: { kind: "thinking", text: acc.text + event.delta }, + generating: true, + }; } const provisional = flushAccumulating(state.provisional, acc); return { ...state, provisional, accumulating: { kind: "thinking", text: event.delta }, + generating: true, }; } @@ -112,6 +167,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional: [...provisional, { role: "assistant", chunk }], accumulating: null, + generating: true, }; } @@ -129,6 +185,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional: [...provisional, { role: "tool", chunk }], accumulating: null, + generating: true, }; } @@ -142,6 +199,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional: [...provisional, { role: "assistant", chunk }], accumulating: null, + generating: false, }; } @@ -158,6 +216,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript ...state, provisional, accumulating: null, + generating: false, }; } @@ -168,6 +227,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript provisional, accumulating: null, sealedTurnId: event.turnId, + generating: false, }; } } diff --git a/src/core/chunks/selectors.ts b/src/core/chunks/selectors.ts index 839ba65..6929de2 100644 --- a/src/core/chunks/selectors.ts +++ b/src/core/chunks/selectors.ts @@ -23,6 +23,15 @@ export function selectChunks(state: TranscriptState): readonly RenderedChunk[] { return result; } +/** + * Whether a turn is currently generating (for a "generating…" indicator). True + * for ANY client watching the conversation — the sender, a second device, or a + * reconnected client whose in-flight turn was replayed. + */ +export function selectGenerating(state: TranscriptState): boolean { + return state.generating; +} + /** * Group consecutive same-role rendered chunks into ChatMessages. */ diff --git a/src/core/chunks/types.ts b/src/core/chunks/types.ts index e031ce3..faa0d3f 100644 --- a/src/core/chunks/types.ts +++ b/src/core/chunks/types.ts @@ -20,6 +20,14 @@ export interface TranscriptState { readonly currentTurnId: string | null; readonly latestUsage: Usage | null; readonly sealedTurnId: string | null; + /** + * True while a turn is generating on the server — derived STRUCTURALLY from the + * event stream: a `turn-start` (or any turn delta) with no matching `done` / + * `turn-sealed` / `error` yet. A late-joiner that subscribes mid-turn gets the + * in-flight turn replayed from its `turn-start`, so this lights up for any + * watching client. NOT inferred from the free-form `status` event string. + */ + readonly generating: boolean; } /** A chunk ready for rendering: either committed (with seq) or provisional. */ diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts index 690ba4e..a258873 100644 --- a/src/core/wire/conformance.test.ts +++ b/src/core/wire/conformance.test.ts @@ -27,6 +27,7 @@ describe("classifies every AgentEvent type", () => { const samples: AgentEvent[] = [ { type: "status", conversationId: "c1", status: "idle" }, { type: "turn-start", conversationId: "c1", turnId: "t1" }, + { type: "user-message", conversationId: "c1", turnId: "t1", text: "hi" }, { type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" }, { type: "reasoning-delta", conversationId: "c1", turnId: "t1", delta: "thinking" }, { @@ -81,6 +82,7 @@ describe("classifies every AgentEvent type", () => { expect(labels).toEqual([ "status", "turn-start", + "user-message", "text-delta", "reasoning-delta", "tool-call", @@ -94,8 +96,8 @@ describe("classifies every AgentEvent type", () => { ]); }); - it("covers all 12 AgentEvent variants", () => { - expect(samples).toHaveLength(12); + it("covers all 13 AgentEvent variants", () => { + expect(samples).toHaveLength(13); }); }); @@ -148,9 +150,18 @@ describe("classifies every WsClientMessage type", () => { { type: "unsubscribe" as const, surfaceId: "s" }, { type: "invoke" as const, surfaceId: "s", actionId: "a" }, { type: "chat.send" as const, message: "hi" }, + { type: "chat.subscribe" as const, conversationId: "c1" }, + { type: "chat.unsubscribe" as const, conversationId: "c1" }, ]; const labels = msgs.map(assertWsClientMessageExhaustive); - expect(labels).toEqual(["subscribe", "unsubscribe", "invoke", "chat.send"]); + expect(labels).toEqual([ + "subscribe", + "unsubscribe", + "invoke", + "chat.send", + "chat.subscribe", + "chat.unsubscribe", + ]); }); }); diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts index d89772e..13be78c 100644 --- a/src/core/wire/conformance.ts +++ b/src/core/wire/conformance.ts @@ -12,6 +12,8 @@ export function assertAgentEventExhaustive(event: AgentEvent): string { return "status"; case "turn-start": return "turn-start"; + case "user-message": + return "user-message"; case "text-delta": return "text-delta"; case "reasoning-delta": @@ -96,6 +98,10 @@ export function assertWsClientMessageExhaustive(msg: WsClientMessage): string { return "invoke"; case "chat.send": return "chat.send"; + case "chat.subscribe": + return "chat.subscribe"; + case "chat.unsubscribe": + return "chat.unsubscribe"; default: return msg satisfies never; } -- cgit v1.2.3