summaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/chunks/index.ts10
-rw-r--r--src/core/chunks/reducer.test.ts160
-rw-r--r--src/core/chunks/reducer.ts66
-rw-r--r--src/core/chunks/selectors.ts9
-rw-r--r--src/core/chunks/types.ts8
-rw-r--r--src/core/wire/conformance.test.ts17
-rw-r--r--src/core/wire/conformance.ts6
7 files changed, 266 insertions, 10 deletions
diff --git a/src/core/chunks/index.ts b/src/core/chunks/index.ts
index 0718c0d..ecfee74 100644
--- a/src/core/chunks/index.ts
+++ b/src/core/chunks/index.ts
@@ -1,7 +1,13 @@
export type { RenderGroup, ToolBatchEntry } from "./groups";
export { groupRenderedChunks } from "./groups";
-export { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer";
-export { selectChunks, selectMessages } from "./selectors";
+export {
+ appendUserMessage,
+ applyHistory,
+ clearGenerating,
+ foldEvent,
+ initialState,
+} from "./reducer";
+export { selectChunks, selectGenerating, selectMessages } from "./selectors";
export type {
AccumulatingChunk,
ProvisionalChunk,
diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts
index f2f1b75..35a586c 100644
--- a/src/core/chunks/reducer.test.ts
+++ b/src/core/chunks/reducer.test.ts
@@ -3,6 +3,7 @@ import type {
StoredChunk,
TurnDoneEvent,
TurnErrorEvent,
+ TurnInputEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
@@ -12,8 +13,14 @@ import type {
TurnUsageEvent,
} from "@dispatch/wire";
import { describe, expect, it } from "vitest";
-import { appendUserMessage, applyHistory, foldEvent, initialState } from "./reducer";
-import { selectChunks, selectMessages } from "./selectors";
+import {
+ appendUserMessage,
+ applyHistory,
+ clearGenerating,
+ foldEvent,
+ initialState,
+} from "./reducer";
+import { selectChunks, selectGenerating, selectMessages } from "./selectors";
const turnStart = (turnId: string): TurnStartEvent => ({
type: "turn-start",
@@ -112,6 +119,101 @@ describe("initialState", () => {
expect(s.currentTurnId).toBeNull();
expect(s.latestUsage).toBeNull();
expect(s.sealedTurnId).toBeNull();
+ expect(s.generating).toBe(false);
+ });
+});
+
+describe("foldEvent — generating (turn-running state)", () => {
+ it("turn-start sets generating true", () => {
+ let s = initialState();
+ expect(selectGenerating(s)).toBe(false);
+ s = foldEvent(s, turnStart("t1"));
+ expect(s.generating).toBe(true);
+ expect(selectGenerating(s)).toBe(true);
+ });
+
+ it("a content delta sets generating true (e.g. a late-joiner replay missing turn-start)", () => {
+ let s = initialState();
+ s = foldEvent(s, textDelta("t1", "hi"));
+ expect(s.generating).toBe(true);
+ s = initialState();
+ s = foldEvent(s, reasoningDelta("t1", "hmm"));
+ expect(s.generating).toBe(true);
+ s = initialState();
+ s = foldEvent(s, toolCall("t1", "tc1", "bash", {}));
+ expect(s.generating).toBe(true);
+ });
+
+ it("stays generating across the turn's deltas", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "wor"));
+ s = foldEvent(s, textDelta("t1", "king"));
+ expect(s.generating).toBe(true);
+ });
+
+ it("done clears generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "answer"));
+ s = foldEvent(s, doneEvent("t1"));
+ expect(s.generating).toBe(false);
+ });
+
+ it("turn-sealed clears generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, turnSealed("t1"));
+ expect(s.generating).toBe(false);
+ });
+
+ it("error clears generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, errorEvent("t1", "boom"));
+ expect(s.generating).toBe(false);
+ });
+
+ it("a new turn re-asserts generating after the previous one finished", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, doneEvent("t1"));
+ s = foldEvent(s, turnSealed("t1"));
+ expect(s.generating).toBe(false);
+ s = foldEvent(s, turnStart("t2"));
+ expect(s.generating).toBe(true);
+ });
+
+ it("status does not change generating (free-form string, not inferred)", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ const next = foldEvent(s, { type: "status", conversationId: "c1", status: "idle" });
+ expect(next.generating).toBe(true);
+ });
+});
+
+describe("clearGenerating", () => {
+ it("clears a set generating flag", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ expect(s.generating).toBe(true);
+ const cleared = clearGenerating(s);
+ expect(cleared.generating).toBe(false);
+ });
+
+ it("returns the same object when already not generating (no-op)", () => {
+ const s = initialState();
+ expect(clearGenerating(s)).toBe(s);
+ });
+
+ it("preserves transcript content while clearing generating", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "partial"));
+ const cleared = clearGenerating(s);
+ expect(cleared.generating).toBe(false);
+ expect(cleared.accumulating).toEqual({ kind: "text", text: "partial" });
+ expect(cleared.currentTurnId).toBe("t1");
});
});
@@ -281,6 +383,60 @@ describe("foldEvent — status and tool-output", () => {
});
});
+describe("foldEvent — user-message (the turn's user prompt; backend CR-3)", () => {
+ const userMessage = (text: string): TurnInputEvent => ({
+ type: "user-message",
+ conversationId: "c1",
+ turnId: "t1",
+ text,
+ });
+
+ it("a watcher renders the prompt: appends a provisional user chunk + marks generating", () => {
+ let s = initialState();
+ s = foldEvent(s, userMessage("what is 2+2?"));
+ const chunks = selectChunks(s);
+ expect(chunks).toHaveLength(1);
+ expect(chunks[0]?.role).toBe("user");
+ expect(chunks[0]?.chunk).toEqual({ type: "text", text: "what is 2+2?" });
+ expect(chunks[0]?.provisional).toBe(true);
+ expect(s.generating).toBe(true);
+ });
+
+ it("dedups the SENDER's optimistic echo (no duplicate user bubble)", () => {
+ let s = initialState();
+ s = appendUserMessage(s, "hi"); // optimistic echo from the sender's send()
+ s = foldEvent(s, userMessage("hi")); // server echo for the same turn
+ const users = selectChunks(s).filter((c) => c.role === "user");
+ expect(users).toHaveLength(1);
+ });
+
+ it("appends when the trailing provisional differs (no false dedup)", () => {
+ let s = initialState();
+ s = appendUserMessage(s, "first");
+ s = foldEvent(s, userMessage("second"));
+ const users = selectChunks(s).filter((c) => c.role === "user");
+ expect(users).toHaveLength(2);
+ });
+
+ it("ignores an empty user-message", () => {
+ let s = initialState();
+ s = foldEvent(s, userMessage(""));
+ expect(selectChunks(s)).toHaveLength(0);
+ expect(s.generating).toBe(false);
+ });
+
+ it("flushes an accumulating chunk before appending the prompt", () => {
+ let s = initialState();
+ s = foldEvent(s, turnStart("t1"));
+ s = foldEvent(s, textDelta("t1", "partial"));
+ s = foldEvent(s, userMessage("new prompt"));
+ // the partial assistant text was flushed to provisional, then the user prompt appended
+ expect(s.accumulating).toBeNull();
+ const roles = selectChunks(s).map((c) => c.role);
+ expect(roles).toEqual(["assistant", "user"]);
+ });
+});
+
describe("applyHistory", () => {
it("orders committed chunks by seq", () => {
const s = initialState();
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts
index 54b1922..7ce55ce 100644
--- a/src/core/chunks/reducer.ts
+++ b/src/core/chunks/reducer.ts
@@ -10,9 +10,22 @@ export function initialState(): TranscriptState {
currentTurnId: null,
latestUsage: null,
sealedTurnId: null,
+ generating: false,
};
}
+/**
+ * Clear the `generating` flag without touching anything else. Used on a WS
+ * (re)connect: a turn may have sealed while we were disconnected, so the live
+ * `turn-sealed`/`done` that would have cleared `generating` was missed. The
+ * caller resets here, then re-subscribes — if the turn is still running the
+ * server's replay re-asserts `generating` via the replayed `turn-start`.
+ */
+export function clearGenerating(state: TranscriptState): TranscriptState {
+ if (!state.generating) return state;
+ return { ...state, generating: false };
+}
+
function flushAccumulating(
provisional: readonly ProvisionalChunk[],
acc: AccumulatingChunk | null,
@@ -55,6 +68,8 @@ export function applyHistory(
* Fold one live AgentEvent into the provisional state.
*
* - `turn-start` records the turnId.
+ * - `user-message` appends the turn's user prompt (de-duped vs the sender's
+ * optimistic echo) so a watcher renders it mid-turn.
* - `text-delta` extends the current accumulating TextChunk (or starts one).
* - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one).
* - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and
@@ -63,6 +78,11 @@ export function applyHistory(
* - `done` finalizes any accumulating chunk (turn still provisional).
* - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId.
* - `status` and `tool-output` are ignored (best-effort no-ops).
+ *
+ * `generating` is folded structurally: a `turn-start` or any content delta sets
+ * it true; `done` / `turn-sealed` / `error` clear it. This is what a watching
+ * (or reconnected) client renders as "generating…", with no dependence on the
+ * free-form `status` event string.
*/
export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState {
switch (event.type) {
@@ -71,31 +91,66 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
return state;
case "turn-start":
- return { ...state, currentTurnId: event.turnId };
+ return { ...state, currentTurnId: event.turnId, generating: true };
+
+ case "user-message": {
+ // The turn's USER prompt, surfaced on the event stream (backend CR-3) so a
+ // WATCHER/late-joiner renders it mid-turn instead of waiting for seal. The
+ // SENDER already echoed its own prompt optimistically (`appendUserMessage`),
+ // so DE-DUP: skip if the trailing provisional chunk is already an identical
+ // user text chunk. A pure watcher has no such echo → it appends and renders.
+ if (event.text.length === 0) return state;
+ const last = state.provisional[state.provisional.length - 1];
+ if (
+ last !== undefined &&
+ last.role === "user" &&
+ last.chunk.type === "text" &&
+ last.chunk.text === event.text
+ ) {
+ return { ...state, generating: true };
+ }
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ return {
+ ...state,
+ provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }],
+ accumulating: null,
+ generating: true,
+ };
+ }
case "text-delta": {
const acc = state.accumulating;
if (acc !== null && acc.kind === "text") {
- return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } };
+ return {
+ ...state,
+ accumulating: { kind: "text", text: acc.text + event.delta },
+ generating: true,
+ };
}
const provisional = flushAccumulating(state.provisional, acc);
return {
...state,
provisional,
accumulating: { kind: "text", text: event.delta },
+ generating: true,
};
}
case "reasoning-delta": {
const acc = state.accumulating;
if (acc !== null && acc.kind === "thinking") {
- return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } };
+ return {
+ ...state,
+ accumulating: { kind: "thinking", text: acc.text + event.delta },
+ generating: true,
+ };
}
const provisional = flushAccumulating(state.provisional, acc);
return {
...state,
provisional,
accumulating: { kind: "thinking", text: event.delta },
+ generating: true,
};
}
@@ -112,6 +167,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional: [...provisional, { role: "assistant", chunk }],
accumulating: null,
+ generating: true,
};
}
@@ -129,6 +185,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional: [...provisional, { role: "tool", chunk }],
accumulating: null,
+ generating: true,
};
}
@@ -142,6 +199,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional: [...provisional, { role: "assistant", chunk }],
accumulating: null,
+ generating: false,
};
}
@@ -158,6 +216,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
...state,
provisional,
accumulating: null,
+ generating: false,
};
}
@@ -168,6 +227,7 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
provisional,
accumulating: null,
sealedTurnId: event.turnId,
+ generating: false,
};
}
}
diff --git a/src/core/chunks/selectors.ts b/src/core/chunks/selectors.ts
index 839ba65..6929de2 100644
--- a/src/core/chunks/selectors.ts
+++ b/src/core/chunks/selectors.ts
@@ -24,6 +24,15 @@ export function selectChunks(state: TranscriptState): readonly RenderedChunk[] {
}
/**
+ * Whether a turn is currently generating (for a "generating…" indicator). True
+ * for ANY client watching the conversation — the sender, a second device, or a
+ * reconnected client whose in-flight turn was replayed.
+ */
+export function selectGenerating(state: TranscriptState): boolean {
+ return state.generating;
+}
+
+/**
* Group consecutive same-role rendered chunks into ChatMessages.
*/
export function selectMessages(state: TranscriptState): readonly ChatMessage[] {
diff --git a/src/core/chunks/types.ts b/src/core/chunks/types.ts
index e031ce3..faa0d3f 100644
--- a/src/core/chunks/types.ts
+++ b/src/core/chunks/types.ts
@@ -20,6 +20,14 @@ export interface TranscriptState {
readonly currentTurnId: string | null;
readonly latestUsage: Usage | null;
readonly sealedTurnId: string | null;
+ /**
+ * True while a turn is generating on the server — derived STRUCTURALLY from the
+ * event stream: a `turn-start` (or any turn delta) with no matching `done` /
+ * `turn-sealed` / `error` yet. A late-joiner that subscribes mid-turn gets the
+ * in-flight turn replayed from its `turn-start`, so this lights up for any
+ * watching client. NOT inferred from the free-form `status` event string.
+ */
+ readonly generating: boolean;
}
/** A chunk ready for rendering: either committed (with seq) or provisional. */
diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts
index 690ba4e..a258873 100644
--- a/src/core/wire/conformance.test.ts
+++ b/src/core/wire/conformance.test.ts
@@ -27,6 +27,7 @@ describe("classifies every AgentEvent type", () => {
const samples: AgentEvent[] = [
{ type: "status", conversationId: "c1", status: "idle" },
{ type: "turn-start", conversationId: "c1", turnId: "t1" },
+ { type: "user-message", conversationId: "c1", turnId: "t1", text: "hi" },
{ type: "text-delta", conversationId: "c1", turnId: "t1", delta: "hi" },
{ type: "reasoning-delta", conversationId: "c1", turnId: "t1", delta: "thinking" },
{
@@ -81,6 +82,7 @@ describe("classifies every AgentEvent type", () => {
expect(labels).toEqual([
"status",
"turn-start",
+ "user-message",
"text-delta",
"reasoning-delta",
"tool-call",
@@ -94,8 +96,8 @@ describe("classifies every AgentEvent type", () => {
]);
});
- it("covers all 12 AgentEvent variants", () => {
- expect(samples).toHaveLength(12);
+ it("covers all 13 AgentEvent variants", () => {
+ expect(samples).toHaveLength(13);
});
});
@@ -148,9 +150,18 @@ describe("classifies every WsClientMessage type", () => {
{ type: "unsubscribe" as const, surfaceId: "s" },
{ type: "invoke" as const, surfaceId: "s", actionId: "a" },
{ type: "chat.send" as const, message: "hi" },
+ { type: "chat.subscribe" as const, conversationId: "c1" },
+ { type: "chat.unsubscribe" as const, conversationId: "c1" },
];
const labels = msgs.map(assertWsClientMessageExhaustive);
- expect(labels).toEqual(["subscribe", "unsubscribe", "invoke", "chat.send"]);
+ expect(labels).toEqual([
+ "subscribe",
+ "unsubscribe",
+ "invoke",
+ "chat.send",
+ "chat.subscribe",
+ "chat.unsubscribe",
+ ]);
});
});
diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts
index d89772e..13be78c 100644
--- a/src/core/wire/conformance.ts
+++ b/src/core/wire/conformance.ts
@@ -12,6 +12,8 @@ export function assertAgentEventExhaustive(event: AgentEvent): string {
return "status";
case "turn-start":
return "turn-start";
+ case "user-message":
+ return "user-message";
case "text-delta":
return "text-delta";
case "reasoning-delta":
@@ -96,6 +98,10 @@ export function assertWsClientMessageExhaustive(msg: WsClientMessage): string {
return "invoke";
case "chat.send":
return "chat.send";
+ case "chat.subscribe":
+ return "chat.subscribe";
+ case "chat.unsubscribe":
+ return "chat.unsubscribe";
default:
return msg satisfies never;
}