summaryrefslogtreecommitdiffhomepage
path: root/src/features/chat/store.svelte.ts
blob: 9beabfcb57c828708c0cb6a9de79f9986705c2dc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
import type {
	ChatDeltaMessage,
	ChatErrorMessage,
	ChatQueueMessage,
	ChatSendMessage,
} from "@dispatch/transport-contract";
import type { ChatMessage, StoredChunk } from "@dispatch/wire";
import type { RenderedChunk, TranscriptState } from "../../core/chunks";
import {
	appendUserMessage,
	applyHistory,
	clearGenerating,
	foldEvent,
	initialState,
	initialWindowSize,
	normalizeChatLimit,
	restoreEarlier,
	selectChunks,
	selectGenerating,
	selectHasEarlier,
	selectMessages,
	trimTranscript,
	unloadCount,
	windowTranscript,
} from "../../core/chunks";
import type { MetricsState, TurnMetricsEntry } from "../../core/metrics";
import {
	applyDurableMetrics,
	foldMetricsEvent,
	initialMetricsState,
	selectCurrentContextSize,
	selectOrderedTurnMetrics,
} from "../../core/metrics";
import type { ConversationCache } from "../conversation-cache";
import type { ChatTransport, HistorySync, MetricsSync } from "./ports";

export interface ChatStoreDependencies {
	readonly conversationId: string;
	readonly model?: string;
	readonly transport: ChatTransport;
	readonly historySync: HistorySync;
	readonly metricsSync: MetricsSync;
	readonly cache: ConversationCache;
	/**
	 * The chat limit: max loaded chunks before the oldest quarter is unloaded
	 * (see `core/chunks/trim.ts`). Normalized via `normalizeChatLimit`; absent →
	 * `DEFAULT_CHAT_LIMIT`.
	 */
	readonly chatLimit?: number;
	/**
	 * Whether unloading may run RIGHT NOW. The composition root wires this to the
	 * smart-scroll "stuck to bottom" state: while the reader is scrolled up, a
	 * trim would yank the content under them, so it is DEFERRED until they return
	 * to the bottom (the next fold retries). Absent → always allowed.
	 */
	readonly canUnload?: () => boolean;
}

export interface ChatStore {
	readonly messages: readonly ChatMessage[];
	readonly chunks: readonly RenderedChunk[];
	readonly turnMetrics: readonly TurnMetricsEntry[];
	/**
	 * The conversation's current context size (tokens occupied) — the latest
	 * finalized turn's `contextSize`, or `undefined` ("unknown") when none is
	 * known yet. Never `0` for the unknown case.
	 */
	readonly currentContextSize: number | undefined;
	/**
	 * Whether a turn is currently generating server-side — derived from the event
	 * stream (`turn-start`…no-`done`/`turn-sealed`-yet). True for ANY watching
	 * client: the sender, a second device, or a reconnected client whose in-flight
	 * turn was replayed. Drives the composer's "generating…" indicator.
	 */
	readonly generating: boolean;
	readonly pendingSync: boolean;
	readonly error: string | null;
	readonly model: string | undefined;
	/**
	 * Whether earlier history was unloaded by the chat limit (or never loaded by
	 * the fresh-load window) and can be paged back in — drives the
	 * "Show earlier messages" affordance.
	 */
	readonly hasEarlier: boolean;
	/**
	 * Render-key base for thinking collapses: how many thinking chunks are
	 * unloaded below the watermark, so the UI's ordinal keys stay stable across
	 * a trim (see `TranscriptState.hiddenThinkingCount`).
	 */
	readonly thinkingKeyBase: number;
	handleDelta(msg: ChatDeltaMessage | ChatErrorMessage): void;
	send(text: string): void;
	/**
	 * Enqueue a steering message onto the conversation's queue (`chat.queue`
	 * WS op). While a turn is generating, the message is delivered mid-turn at
	 * the next tool-result boundary (a `steering` `AgentEvent` fires + the
	 * message-queue surface updates). When no turn is active, the server
	 * auto-starts a turn with the message as its opening prompt (equivalent to
	 * `chat.send`). No optimistic transcript echo — the queue SURFACE carries the
	 * pending message until drain; the `steering` event places it in the
	 * transcript. `text` must be non-empty (the server 400/errors otherwise).
	 */
	queueMessage(text: string): void;
	setModel(model: string): void;
	/**
	 * Update the chat limit LIVE: re-normalizes, then adjusts the loaded window.
	 * Lowering it unloads older committed chunks (deferred via the gate while the
	 * reader is scrolled up, catching up on the next mutation). Raising it
	 * REFILLS older history (cache first, then CR-5 `?beforeSeq=`) up to the
	 * fresh-load window (`initialWindowSize` = 75% of the limit) — the same
	 * window a fresh `load()` would show — so upping the limit reveals more
	 * history instead of leaving a partial view. New deltas + loads use the new
	 * limit. The refill awaits, so a caller can preserve scroll over the prepend.
	 */
	setChatLimit(limit: number): Promise<void>;
	load(): Promise<void>;
	/**
	 * Page one unload-unit (`ceil(limit/4)`) of earlier history back in — the
	 * "Show earlier messages" action. Local cache first; when the cache doesn't
	 * reach far enough back (a server-windowed fresh load), the missing older
	 * run is fetched via CR-5 `?beforeSeq=&limit=` and persisted to the cache.
	 */
	showEarlier(): Promise<void>;
	/**
	 * Re-sync after a WS (re)connect. Clears any stale `generating` (a turn may
	 * have sealed while disconnected — the live `turn-sealed` was missed), then
	 * pulls newly-sealed turns from history (+ metrics). If the turn is still
	 * running, the server's post-subscribe replay re-asserts `generating`. The
	 * app store pairs this with a `chat.subscribe` for the conversation.
	 */
	resync(): void;
	dispose(): void;
}

export function createChatStore(deps: ChatStoreDependencies): ChatStore {
	let transcript = $state<TranscriptState>(initialState());
	let metrics = $state<MetricsState>(initialMetricsState());
	let _pendingSync = $state(false);
	let _error = $state<string | null>(null);
	let _model = $state<string | undefined>(deps.model);
	let disposed = false;

	let chatLimit = normalizeChatLimit(deps.chatLimit);

	/**
	 * Enforce the chat limit after a transcript mutation — unless the injected
	 * gate says the reader is scrolled up (then defer; the next mutation retries
	 * and `trimTranscript` unloads whole quarters to catch up).
	 */
	function maybeTrim(): void {
		if (deps.canUnload !== undefined && !deps.canUnload()) return;
		transcript = trimTranscript(transcript, chatLimit);
	}

	/**
	 * Pull `seq > cache-cursor` from the server and fold it in. `coldLimit`, when
	 * given AND the cache is empty (a truly fresh browser), windows the fetch to
	 * the newest N chunks (CR-5 `?limit=`) so a huge conversation doesn't ship
	 * whole. It is deliberately NOT applied to a warm-cache tail: windowing a
	 * tail that grew past N while we were away would leave a silent seq GAP
	 * between the cache and the fetched window.
	 */
	async function syncTail(coldLimit?: number): Promise<void> {
		if (disposed || _pendingSync) return;
		_pendingSync = true;
		try {
			const since = await deps.cache.sinceSeq(deps.conversationId);
			const window = since === 0 && coldLimit !== undefined ? { limit: coldLimit } : undefined;
			const res = await deps.historySync(deps.conversationId, since, window);
			const merged = await deps.cache.commit(deps.conversationId, res.chunks);
			transcript = applyHistory(transcript, merged);
			maybeTrim();
			_error = null;
		} catch (err) {
			_error = err instanceof Error ? err.message : String(err);
		} finally {
			_pendingSync = false;
		}
	}

	async function syncMetrics(): Promise<void> {
		if (disposed) return;
		try {
			const res = await deps.metricsSync(deps.conversationId);
			metrics = applyDurableMetrics(metrics, res.turns);
		} catch {
			// Metrics fetch failure must not block history sync or throw;
			// live-folded metrics remain intact.
		}
	}

	/**
	 * Fetch up to `want` older chunks (seq < `oldest`) — cache first, then a
	 * CR-5 `?beforeSeq=&limit=` server backfill when the cache is too shallow,
	 * persisting it so the next read is local. Returns every locally-known
	 * chunk older than `oldest` (the caller — `restoreEarlier` — takes the
	 * newest `count` of them). Shared by `showEarlier` and the raise-refill.
	 */
	async function backfillOlder(oldest: number, want: number): Promise<readonly StoredChunk[]> {
		let earlier = (await deps.cache.load(deps.conversationId)).filter((c) => c.seq < oldest);
		const oldestKnown = earlier[0]?.seq ?? oldest;
		if (earlier.length < want && oldestKnown > 1) {
			const res = await deps.historySync(deps.conversationId, 0, {
				beforeSeq: oldestKnown,
				limit: want - earlier.length,
			});
			const merged = await deps.cache.commit(deps.conversationId, res.chunks);
			earlier = merged.filter((c) => c.seq < oldest);
		}
		return earlier;
	}

	/**
	 * Refill toward the fresh-load window after a limit RAISE: pull older
	 * history (cache first, then server) so the loaded set grows to match what a
	 * fresh `load()` would show at the new limit. No-op when already at the
	 * origin (seq 1) or already within the window. `restoreEarlier` re-derives
	 * the window start at apply time, so a delta landing during the await can't
	 * corrupt the merge. NOT gated (refilling prepends above the viewport; the
	 * caller preserves scroll position).
	 */
	async function refill(): Promise<void> {
		if (disposed) return;
		const oldest = transcript.committed[0]?.seq ?? transcript.hiddenBeforeSeq;
		if (oldest <= 1) return;
		const want = initialWindowSize(chatLimit) - transcript.committed.length;
		if (want <= 0) return;
		try {
			const earlier = await backfillOlder(oldest, want);
			if (earlier.length === 0) return;
			transcript = restoreEarlier(transcript, earlier, want);
			_error = null;
		} catch (err) {
			_error = err instanceof Error ? err.message : String(err);
		}
	}

	return {
		get messages(): readonly ChatMessage[] {
			return selectMessages(transcript);
		},
		get chunks(): readonly RenderedChunk[] {
			return selectChunks(transcript);
		},
		get turnMetrics(): readonly TurnMetricsEntry[] {
			return selectOrderedTurnMetrics(metrics);
		},
		get currentContextSize(): number | undefined {
			return selectCurrentContextSize(metrics);
		},
		get generating(): boolean {
			return selectGenerating(transcript);
		},
		get pendingSync(): boolean {
			return _pendingSync;
		},
		get error(): string | null {
			return _error;
		},
		get model(): string | undefined {
			return _model;
		},
		get hasEarlier(): boolean {
			return selectHasEarlier(transcript);
		},
		get thinkingKeyBase(): number {
			return transcript.hiddenThinkingCount;
		},

		handleDelta(msg: ChatDeltaMessage | ChatErrorMessage): void {
			if (msg.type === "chat.error") {
				if (msg.conversationId !== undefined && msg.conversationId !== deps.conversationId) {
					return;
				}
				_error = msg.message;
				return;
			}
			if (msg.event.conversationId !== deps.conversationId) {
				return;
			}
			transcript = foldEvent(transcript, msg.event);
			metrics = foldMetricsEvent(metrics, msg.event);
			maybeTrim();
			if (transcript.sealedTurnId !== null) {
				void syncTail();
				void syncMetrics();
			}
		},

		send(text: string): void {
			transcript = appendUserMessage(transcript, text);
			maybeTrim();
			const msg: ChatSendMessage = {
				type: "chat.send",
				conversationId: deps.conversationId,
				message: text,
				...(_model !== undefined ? { model: _model } : {}),
			};
			deps.transport.send(msg);
		},

		queueMessage(text: string): void {
			const trimmed = text.trim();
			if (trimmed.length === 0) return;
			const msg: ChatQueueMessage = {
				type: "chat.queue",
				conversationId: deps.conversationId,
				text: trimmed,
			};
			deps.transport.send(msg);
		},

		setModel(model: string): void {
			_model = model;
		},

		async setChatLimit(limit: number): Promise<void> {
			const prev = chatLimit;
			chatLimit = normalizeChatLimit(limit);
			if (chatLimit < prev) {
				maybeTrim();
			} else if (chatLimit > prev) {
				await refill();
			}
		},

		async load(): Promise<void> {
			// Fresh load shows only the newest 75% of the limit — headroom before the
			// first trim. A warm cache is windowed locally (synchronously with its
			// apply — no render in between); a COLD cache passes the window to the
			// server instead (CR-5 `?limit=`), so a huge conversation never ships
			// whole. The post-sync window re-asserts the cap either way.
			const windowSize = initialWindowSize(chatLimit);
			const cached = await deps.cache.load(deps.conversationId);
			if (cached.length > 0) {
				transcript = windowTranscript(applyHistory(transcript, cached), windowSize);
			}
			await syncTail(windowSize);
			transcript = windowTranscript(transcript, windowSize);
			await syncMetrics();
		},

		async showEarlier(): Promise<void> {
			if (disposed) return;
			const oldest = transcript.committed[0]?.seq ?? transcript.hiddenBeforeSeq;
			if (oldest <= 1) return;
			const want = unloadCount(chatLimit);
			try {
				const earlier = await backfillOlder(oldest, want);
				transcript = restoreEarlier(transcript, earlier, want);
				_error = null;
			} catch (err) {
				_error = err instanceof Error ? err.message : String(err);
			}
		},

		resync(): void {
			if (disposed) return;
			// A turn may have sealed while we were disconnected (missed `turn-sealed`):
			// clear the now-stale spinner BEFORE re-subscribing, so a finished turn
			// doesn't spin forever. A still-running turn's replay re-asserts it.
			transcript = clearGenerating(transcript);
			void syncTail();
			void syncMetrics();
		},

		dispose(): void {
			disposed = true;
		},
	};
}