summaryrefslogtreecommitdiffhomepage
path: root/src/core/chunks/reducer.ts
blob: 0783c22ad7e02b6c0dffa4371b070539933c8829 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
import type { AgentEvent, Chunk, StoredChunk } from "@dispatch/wire";
import type { AccumulatingChunk, ProvisionalChunk, TranscriptState } from "./types";

/** The initial empty transcript state. */
export function initialState(): TranscriptState {
	return {
		committed: [],
		provisional: [],
		accumulating: null,
		currentTurnId: null,
		latestUsage: null,
		sealedTurnId: null,
		hiddenBeforeSeq: 0,
		hiddenThinkingCount: 0,
		generating: false,
	};
}

/**
 * Clear the `generating` flag without touching anything else. Used on a WS
 * (re)connect: a turn may have sealed while we were disconnected, so the live
 * `turn-sealed`/`done` that would have cleared `generating` was missed. The
 * caller resets here, then re-subscribes — if the turn is still running the
 * server's replay re-asserts `generating` via the replayed `turn-start`.
 */
export function clearGenerating(state: TranscriptState): TranscriptState {
	if (!state.generating) return state;
	return { ...state, generating: false };
}

function flushAccumulating(
	provisional: readonly ProvisionalChunk[],
	acc: AccumulatingChunk | null,
): readonly ProvisionalChunk[] {
	if (acc === null) return provisional;
	const chunk: Chunk =
		acc.kind === "text" ? { type: "text", text: acc.text } : { type: "thinking", text: acc.text };
	return [...provisional, { role: "assistant", chunk }];
}

/**
 * Merge authoritative seq-keyed chunks into the committed history.
 * Dedupes by seq (new wins), keeps seq-monotonic order, idempotent.
 * When sealedTurnId is set, drops all provisional chunks (now superseded)
 * and clears sealedTurnId.
 *
 * Chunks below the chat-limit unload watermark (`hiddenBeforeSeq`) are
 * REJECTED: a full-cache or tail merge must not resurrect what the trim
 * unloaded. Restoring earlier history goes through `restoreEarlier` instead.
 */
export function applyHistory(
	state: TranscriptState,
	chunks: readonly StoredChunk[],
): TranscriptState {
	const seqMap = new Map<number, StoredChunk>();
	for (const c of state.committed) seqMap.set(c.seq, c);
	let addedNew = false;
	for (const c of chunks) {
		if (c.seq < state.hiddenBeforeSeq) continue;
		if (!seqMap.has(c.seq)) addedNew = true;
		seqMap.set(c.seq, c);
	}
	const committed = Array.from(seqMap.values()).sort((a, b) => a.seq - b.seq);

	if (state.sealedTurnId !== null) {
		return {
			...state,
			committed,
			provisional: [],
			accumulating: null,
			sealedTurnId: null,
		};
	}

	// During generation: if new committed chunks arrived, the provisional
	// array may contain duplicates — the optimistic echo from `appendUserMessage`
	// is now backed by a committed chunk (CR-6: user message persisted at turn
	// start). Remove provisional chunks that match the last committed chunk
	// (role + chunk content), keeping only the accumulating (streaming) chunk.
	if (addedNew && state.generating && state.provisional.length > 0) {
		const lastCommitted = committed[committed.length - 1];
		if (lastCommitted !== undefined) {
			const provisional = state.provisional.filter((p) => {
				if (p.role !== lastCommitted.role) return true;
				if (p.chunk.type !== lastCommitted.chunk.type) return true;
				if (p.chunk.type === "text" && lastCommitted.chunk.type === "text") {
					return p.chunk.text !== lastCommitted.chunk.text;
				}
				return true;
			});
			return { ...state, committed, provisional, accumulating: state.accumulating };
		}
	}

	return { ...state, committed };
}

/**
 * Fold one live AgentEvent into the provisional state.
 *
 * - `turn-start` records the turnId.
 * - `user-message` appends the turn's user prompt (de-duped vs the sender's
 *   optimistic echo) so a watcher renders it mid-turn.
 * - `text-delta` extends the current accumulating TextChunk (or starts one).
 * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one).
 * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and
 *   add a new provisional chunk.
 * - `steering` appends a user bubble mid-turn (drained from the message queue
 *   at a tool-result boundary; the queue surface separately clears on drain).
 * - `usage` stores the latest Usage.
 * - `done` finalizes any accumulating chunk (turn still provisional).
 * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId.
 * - `status` and `tool-output` are ignored (best-effort no-ops).
 *
 * `generating` is folded structurally: a `turn-start` or any content delta sets
 * it true; `done` / `turn-sealed` / `error` clear it. This is what a watching
 * (or reconnected) client renders as "generating…", with no dependence on the
 * free-form `status` event string.
 */
export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState {
	switch (event.type) {
		case "status":
		case "tool-output":
			return state;

		case "turn-start":
			return { ...state, currentTurnId: event.turnId, generating: true };

		case "user-message": {
			// The turn's USER prompt, surfaced on the event stream (backend CR-3) so a
			// WATCHER/late-joiner renders it mid-turn instead of waiting for seal. The
			// SENDER already echoed its own prompt optimistically (`appendUserMessage`),
			// so DE-DUP: skip if the trailing provisional chunk is already an identical
			// user text chunk. A pure watcher has no such echo → it appends and renders.
			if (event.text.length === 0) return state;
			const last = state.provisional[state.provisional.length - 1];
			if (
				last !== undefined &&
				last.role === "user" &&
				last.chunk.type === "text" &&
				last.chunk.text === event.text
			) {
				return { ...state, generating: true };
			}
			const provisional = flushAccumulating(state.provisional, state.accumulating);
			return {
				...state,
				provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }],
				accumulating: null,
				generating: true,
			};
		}

		case "text-delta": {
			const acc = state.accumulating;
			if (acc !== null && acc.kind === "text") {
				return {
					...state,
					accumulating: { kind: "text", text: acc.text + event.delta },
					generating: true,
				};
			}
			const provisional = flushAccumulating(state.provisional, acc);
			return {
				...state,
				provisional,
				accumulating: { kind: "text", text: event.delta },
				generating: true,
			};
		}

		case "reasoning-delta": {
			const acc = state.accumulating;
			if (acc !== null && acc.kind === "thinking") {
				return {
					...state,
					accumulating: { kind: "thinking", text: acc.text + event.delta },
					generating: true,
				};
			}
			const provisional = flushAccumulating(state.provisional, acc);
			return {
				...state,
				provisional,
				accumulating: { kind: "thinking", text: event.delta },
				generating: true,
			};
		}

		case "tool-call": {
			const provisional = flushAccumulating(state.provisional, state.accumulating);
			const chunk: Chunk = {
				type: "tool-call",
				toolCallId: event.toolCallId,
				toolName: event.toolName,
				input: event.input,
				stepId: event.stepId,
			};
			return {
				...state,
				provisional: [...provisional, { role: "assistant", chunk }],
				accumulating: null,
				generating: true,
			};
		}

		case "tool-result": {
			const provisional = flushAccumulating(state.provisional, state.accumulating);
			const chunk: Chunk = {
				type: "tool-result",
				toolCallId: event.toolCallId,
				toolName: event.toolName,
				content: event.content,
				isError: event.isError,
				stepId: event.stepId,
			};
			return {
				...state,
				provisional: [...provisional, { role: "tool", chunk }],
				accumulating: null,
				generating: true,
			};
		}

		case "error": {
			const provisional = flushAccumulating(state.provisional, state.accumulating);
			const chunk: Chunk =
				event.code !== undefined
					? { type: "error", message: event.message, code: event.code }
					: { type: "error", message: event.message };
			return {
				...state,
				provisional: [...provisional, { role: "assistant", chunk }],
				accumulating: null,
				generating: false,
			};
		}

		case "usage":
			return { ...state, latestUsage: event.usage };

		case "step-complete":
			// Timing metadata — no content chunk; handled by the telemetry reducer.
			return state;

		case "done": {
			const provisional = flushAccumulating(state.provisional, state.accumulating);
			return {
				...state,
				provisional,
				accumulating: null,
				generating: false,
			};
		}

		case "turn-sealed": {
			const provisional = flushAccumulating(state.provisional, state.accumulating);
			return {
				...state,
				provisional,
				accumulating: null,
				sealedTurnId: event.turnId,
				generating: false,
			};
		}

		case "steering": {
			// A steering message drained from the queue at a tool-result boundary
			// (the model sees it alongside the tool results). Append a user bubble
			// to the provisional transcript; the turn is still in flight. The queue
			// surface clears separately on drain (a different channel) — no de-dup
			// here (unlike `user-message`, steering is never optimistically echoed
			// into the transcript by the sender).
			if (event.text.length === 0) return state;
			const provisional = flushAccumulating(state.provisional, state.accumulating);
			return {
				...state,
				provisional: [...provisional, { role: "user", chunk: { type: "text", text: event.text } }],
				accumulating: null,
				generating: true,
			};
		}
	}
}

/**
 * Optimistically append a user message to the provisional list.
 * Flushes any in-progress accumulating chunk first (defensively).
 * The provisional user chunk is superseded when applyHistory receives
 * the authoritative committed chunks after a turn seals.
 */
export function appendUserMessage(state: TranscriptState, text: string): TranscriptState {
	const provisional = flushAccumulating(state.provisional, state.accumulating);
	const userChunk: Chunk = { type: "text", text };
	return {
		...state,
		provisional: [...provisional, { role: "user", chunk: userChunk }],
		accumulating: null,
	};
}