diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 00:02:32 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 00:02:32 +0900 |
| commit | 5d9ae1849337b64af1b0d47c23b8c4950a55f792 (patch) | |
| tree | dd5fccaff7535bf1216457a986b8f95bd14fd61e /src/core/chunks/reducer.ts | |
| parent | fac44794432928d0341728642fd70eef87837da4 (diff) | |
| download | dispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.tar.gz dispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.zip | |
Slice 2 wave 1: transcript reducer, wire conformance, ws chat, cache core
- core/chunks: the one pure transcript reducer (foldEvent live deltas +
applyHistory seq-keyed reconcile + selectChunks/selectMessages); 27 tests
- core/wire: FE-side contract-conformance exhaustiveness guards + drift smoke
tests over wire/transport-contract unions (ยง2.9 drift signal); 10 tests
- adapters/ws: additively multiplex chat.send/chat.delta/chat.error on the
existing surface socket (onChat + widened send); surface API unchanged
- features/conversation-cache: pure reconcileCache/nextSinceSeq/selectEvictions
+ ConversationChunkStore port + injected createConversationCache; 26 tests
Verified green: svelte-check 0/0, vitest 169, biome clean, build ok.
Diffstat (limited to 'src/core/chunks/reducer.ts')
| -rw-r--r-- | src/core/chunks/reducer.ts | 168 |
1 files changed, 168 insertions, 0 deletions
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts new file mode 100644 index 0000000..0a8ea54 --- /dev/null +++ b/src/core/chunks/reducer.ts @@ -0,0 +1,168 @@ +import type { AgentEvent, Chunk, StoredChunk } from "@dispatch/wire"; +import type { AccumulatingChunk, ProvisionalChunk, TranscriptState } from "./types"; + +/** The initial empty transcript state. */ +export function initialState(): TranscriptState { + return { + committed: [], + provisional: [], + accumulating: null, + currentTurnId: null, + latestUsage: null, + sealedTurnId: null, + }; +} + +function flushAccumulating( + provisional: readonly ProvisionalChunk[], + acc: AccumulatingChunk | null, +): readonly ProvisionalChunk[] { + if (acc === null) return provisional; + const chunk: Chunk = + acc.kind === "text" ? { type: "text", text: acc.text } : { type: "thinking", text: acc.text }; + return [...provisional, { role: "assistant", chunk }]; +} + +/** + * Merge authoritative seq-keyed chunks into the committed history. + * Dedupes by seq (new wins), keeps seq-monotonic order, idempotent. + * When sealedTurnId is set, drops all provisional chunks (now superseded) + * and clears sealedTurnId. + */ +export function applyHistory( + state: TranscriptState, + chunks: readonly StoredChunk[], +): TranscriptState { + const seqMap = new Map<number, StoredChunk>(); + for (const c of state.committed) seqMap.set(c.seq, c); + for (const c of chunks) seqMap.set(c.seq, c); + const committed = Array.from(seqMap.values()).sort((a, b) => a.seq - b.seq); + + if (state.sealedTurnId !== null) { + return { + ...state, + committed, + provisional: [], + accumulating: null, + sealedTurnId: null, + }; + } + + return { ...state, committed }; +} + +/** + * Fold one live AgentEvent into the provisional state. + * + * - `turn-start` records the turnId. + * - `text-delta` extends the current accumulating TextChunk (or starts one). + * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one). + * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and + * add a new provisional chunk. + * - `usage` stores the latest Usage. + * - `done` finalizes any accumulating chunk (turn still provisional). + * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId. + * - `status` and `tool-output` are ignored (best-effort no-ops). + */ +export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState { + switch (event.type) { + case "status": + case "tool-output": + return state; + + case "turn-start": + return { ...state, currentTurnId: event.turnId }; + + case "text-delta": { + const acc = state.accumulating; + if (acc !== null && acc.kind === "text") { + return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } }; + } + const provisional = flushAccumulating(state.provisional, acc); + return { + ...state, + provisional, + accumulating: { kind: "text", text: event.delta }, + }; + } + + case "reasoning-delta": { + const acc = state.accumulating; + if (acc !== null && acc.kind === "thinking") { + return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } }; + } + const provisional = flushAccumulating(state.provisional, acc); + return { + ...state, + provisional, + accumulating: { kind: "thinking", text: event.delta }, + }; + } + + case "tool-call": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = { + type: "tool-call", + toolCallId: event.toolCallId, + toolName: event.toolName, + input: event.input, + }; + return { + ...state, + provisional: [...provisional, { role: "assistant", chunk }], + accumulating: null, + }; + } + + case "tool-result": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = { + type: "tool-result", + toolCallId: event.toolCallId, + toolName: event.toolName, + content: event.content, + isError: event.isError, + }; + return { + ...state, + provisional: [...provisional, { role: "tool", chunk }], + accumulating: null, + }; + } + + case "error": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = + event.code !== undefined + ? { type: "error", message: event.message, code: event.code } + : { type: "error", message: event.message }; + return { + ...state, + provisional: [...provisional, { role: "assistant", chunk }], + accumulating: null, + }; + } + + case "usage": + return { ...state, latestUsage: event.usage }; + + case "done": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional, + accumulating: null, + }; + } + + case "turn-sealed": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional, + accumulating: null, + sealedTurnId: event.turnId, + }; + } + } +} |
