summaryrefslogtreecommitdiffhomepage
path: root/src/core/chunks/reducer.ts
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-07 00:02:32 +0900
committerAdam Malczewski <[email protected]>2026-06-07 00:02:32 +0900
commit5d9ae1849337b64af1b0d47c23b8c4950a55f792 (patch)
treedd5fccaff7535bf1216457a986b8f95bd14fd61e /src/core/chunks/reducer.ts
parentfac44794432928d0341728642fd70eef87837da4 (diff)
downloaddispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.tar.gz
dispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.zip
Slice 2 wave 1: transcript reducer, wire conformance, ws chat, cache core
- core/chunks: the one pure transcript reducer (foldEvent live deltas + applyHistory seq-keyed reconcile + selectChunks/selectMessages); 27 tests - core/wire: FE-side contract-conformance exhaustiveness guards + drift smoke tests over wire/transport-contract unions (ยง2.9 drift signal); 10 tests - adapters/ws: additively multiplex chat.send/chat.delta/chat.error on the existing surface socket (onChat + widened send); surface API unchanged - features/conversation-cache: pure reconcileCache/nextSinceSeq/selectEvictions + ConversationChunkStore port + injected createConversationCache; 26 tests Verified green: svelte-check 0/0, vitest 169, biome clean, build ok.
Diffstat (limited to 'src/core/chunks/reducer.ts')
-rw-r--r--src/core/chunks/reducer.ts168
1 files changed, 168 insertions, 0 deletions
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts
new file mode 100644
index 0000000..0a8ea54
--- /dev/null
+++ b/src/core/chunks/reducer.ts
@@ -0,0 +1,168 @@
+import type { AgentEvent, Chunk, StoredChunk } from "@dispatch/wire";
+import type { AccumulatingChunk, ProvisionalChunk, TranscriptState } from "./types";
+
+/** The initial empty transcript state. */
+export function initialState(): TranscriptState {
+ return {
+ committed: [],
+ provisional: [],
+ accumulating: null,
+ currentTurnId: null,
+ latestUsage: null,
+ sealedTurnId: null,
+ };
+}
+
+function flushAccumulating(
+ provisional: readonly ProvisionalChunk[],
+ acc: AccumulatingChunk | null,
+): readonly ProvisionalChunk[] {
+ if (acc === null) return provisional;
+ const chunk: Chunk =
+ acc.kind === "text" ? { type: "text", text: acc.text } : { type: "thinking", text: acc.text };
+ return [...provisional, { role: "assistant", chunk }];
+}
+
+/**
+ * Merge authoritative seq-keyed chunks into the committed history.
+ * Dedupes by seq (new wins), keeps seq-monotonic order, idempotent.
+ * When sealedTurnId is set, drops all provisional chunks (now superseded)
+ * and clears sealedTurnId.
+ */
+export function applyHistory(
+ state: TranscriptState,
+ chunks: readonly StoredChunk[],
+): TranscriptState {
+ const seqMap = new Map<number, StoredChunk>();
+ for (const c of state.committed) seqMap.set(c.seq, c);
+ for (const c of chunks) seqMap.set(c.seq, c);
+ const committed = Array.from(seqMap.values()).sort((a, b) => a.seq - b.seq);
+
+ if (state.sealedTurnId !== null) {
+ return {
+ ...state,
+ committed,
+ provisional: [],
+ accumulating: null,
+ sealedTurnId: null,
+ };
+ }
+
+ return { ...state, committed };
+}
+
+/**
+ * Fold one live AgentEvent into the provisional state.
+ *
+ * - `turn-start` records the turnId.
+ * - `text-delta` extends the current accumulating TextChunk (or starts one).
+ * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one).
+ * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and
+ * add a new provisional chunk.
+ * - `usage` stores the latest Usage.
+ * - `done` finalizes any accumulating chunk (turn still provisional).
+ * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId.
+ * - `status` and `tool-output` are ignored (best-effort no-ops).
+ */
+export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState {
+ switch (event.type) {
+ case "status":
+ case "tool-output":
+ return state;
+
+ case "turn-start":
+ return { ...state, currentTurnId: event.turnId };
+
+ case "text-delta": {
+ const acc = state.accumulating;
+ if (acc !== null && acc.kind === "text") {
+ return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } };
+ }
+ const provisional = flushAccumulating(state.provisional, acc);
+ return {
+ ...state,
+ provisional,
+ accumulating: { kind: "text", text: event.delta },
+ };
+ }
+
+ case "reasoning-delta": {
+ const acc = state.accumulating;
+ if (acc !== null && acc.kind === "thinking") {
+ return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } };
+ }
+ const provisional = flushAccumulating(state.provisional, acc);
+ return {
+ ...state,
+ provisional,
+ accumulating: { kind: "thinking", text: event.delta },
+ };
+ }
+
+ case "tool-call": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ const chunk: Chunk = {
+ type: "tool-call",
+ toolCallId: event.toolCallId,
+ toolName: event.toolName,
+ input: event.input,
+ };
+ return {
+ ...state,
+ provisional: [...provisional, { role: "assistant", chunk }],
+ accumulating: null,
+ };
+ }
+
+ case "tool-result": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ const chunk: Chunk = {
+ type: "tool-result",
+ toolCallId: event.toolCallId,
+ toolName: event.toolName,
+ content: event.content,
+ isError: event.isError,
+ };
+ return {
+ ...state,
+ provisional: [...provisional, { role: "tool", chunk }],
+ accumulating: null,
+ };
+ }
+
+ case "error": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ const chunk: Chunk =
+ event.code !== undefined
+ ? { type: "error", message: event.message, code: event.code }
+ : { type: "error", message: event.message };
+ return {
+ ...state,
+ provisional: [...provisional, { role: "assistant", chunk }],
+ accumulating: null,
+ };
+ }
+
+ case "usage":
+ return { ...state, latestUsage: event.usage };
+
+ case "done": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ return {
+ ...state,
+ provisional,
+ accumulating: null,
+ };
+ }
+
+ case "turn-sealed": {
+ const provisional = flushAccumulating(state.provisional, state.accumulating);
+ return {
+ ...state,
+ provisional,
+ accumulating: null,
+ sealedTurnId: event.turnId,
+ };
+ }
+ }
+}