diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 00:02:32 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 00:02:32 +0900 |
| commit | 5d9ae1849337b64af1b0d47c23b8c4950a55f792 (patch) | |
| tree | dd5fccaff7535bf1216457a986b8f95bd14fd61e /src/core/chunks | |
| parent | fac44794432928d0341728642fd70eef87837da4 (diff) | |
| download | dispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.tar.gz dispatch-web-5d9ae1849337b64af1b0d47c23b8c4950a55f792.zip | |
Slice 2 wave 1: transcript reducer, wire conformance, ws chat, cache core
- core/chunks: the one pure transcript reducer (foldEvent live deltas +
applyHistory seq-keyed reconcile + selectChunks/selectMessages); 27 tests
- core/wire: FE-side contract-conformance exhaustiveness guards + drift smoke
tests over wire/transport-contract unions (§2.9 drift signal); 10 tests
- adapters/ws: additively multiplex chat.send/chat.delta/chat.error on the
existing surface socket (onChat + widened send); surface API unchanged
- features/conversation-cache: pure reconcileCache/nextSinceSeq/selectEvictions
+ ConversationChunkStore port + injected createConversationCache; 26 tests
Verified green: svelte-check 0/0, vitest 169, biome clean, build ok.
Diffstat (limited to 'src/core/chunks')
| -rw-r--r-- | src/core/chunks/index.ts | 8 | ||||
| -rw-r--r-- | src/core/chunks/reducer.test.ts | 406 | ||||
| -rw-r--r-- | src/core/chunks/reducer.ts | 168 | ||||
| -rw-r--r-- | src/core/chunks/selectors.ts | 51 | ||||
| -rw-r--r-- | src/core/chunks/types.ts | 31 |
5 files changed, 664 insertions, 0 deletions
diff --git a/src/core/chunks/index.ts b/src/core/chunks/index.ts new file mode 100644 index 0000000..36ba7f4 --- /dev/null +++ b/src/core/chunks/index.ts @@ -0,0 +1,8 @@ +export { applyHistory, foldEvent, initialState } from "./reducer"; +export { selectChunks, selectMessages } from "./selectors"; +export type { + AccumulatingChunk, + ProvisionalChunk, + RenderedChunk, + TranscriptState, +} from "./types"; diff --git a/src/core/chunks/reducer.test.ts b/src/core/chunks/reducer.test.ts new file mode 100644 index 0000000..f83edb4 --- /dev/null +++ b/src/core/chunks/reducer.test.ts @@ -0,0 +1,406 @@ +import type { + StoredChunk, + TurnDoneEvent, + TurnErrorEvent, + TurnReasoningDeltaEvent, + TurnSealedEvent, + TurnStartEvent, + TurnTextDeltaEvent, + TurnToolCallEvent, + TurnToolResultEvent, + TurnUsageEvent, +} from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { applyHistory, foldEvent, initialState } from "./reducer"; +import { selectChunks, selectMessages } from "./selectors"; + +const turnStart = (turnId: string): TurnStartEvent => ({ + type: "turn-start", + conversationId: "c1", + turnId, +}); + +const textDelta = (turnId: string, delta: string): TurnTextDeltaEvent => ({ + type: "text-delta", + conversationId: "c1", + turnId, + delta, +}); + +const reasoningDelta = (turnId: string, delta: string): TurnReasoningDeltaEvent => ({ + type: "reasoning-delta", + conversationId: "c1", + turnId, + delta, +}); + +const toolCall = ( + turnId: string, + toolCallId: string, + toolName: string, + input: unknown, +): TurnToolCallEvent => ({ + type: "tool-call", + conversationId: "c1", + turnId, + toolCallId, + toolName, + input, +}); + +const toolResult = ( + turnId: string, + toolCallId: string, + toolName: string, + content: string, +): TurnToolResultEvent => ({ + type: "tool-result", + conversationId: "c1", + turnId, + toolCallId, + toolName, + content, + isError: false, +}); + +const usageEvent = (turnId: string, inputTokens: number, outputTokens: number): TurnUsageEvent => ({ + type: "usage", + conversationId: "c1", + turnId, + usage: { inputTokens, outputTokens }, +}); + +const errorEvent = (turnId: string, message: string, code?: string): TurnErrorEvent => + code !== undefined + ? { type: "error", conversationId: "c1", turnId, message, code } + : { type: "error", conversationId: "c1", turnId, message }; + +const doneEvent = (turnId: string): TurnDoneEvent => ({ + type: "done", + conversationId: "c1", + turnId, + reason: "stop", +}); + +const turnSealed = (turnId: string): TurnSealedEvent => ({ + type: "turn-sealed", + conversationId: "c1", + turnId, +}); + +const storedChunk = ( + seq: number, + role: "user" | "assistant" | "tool" | "system", + chunk: StoredChunk["chunk"], +): StoredChunk => ({ + seq, + role, + chunk, +}); + +describe("initialState", () => { + it("initial state is empty", () => { + const s = initialState(); + expect(s.committed).toEqual([]); + expect(s.provisional).toEqual([]); + expect(s.accumulating).toBeNull(); + expect(s.currentTurnId).toBeNull(); + expect(s.latestUsage).toBeNull(); + expect(s.sealedTurnId).toBeNull(); + }); +}); + +describe("foldEvent — text-delta", () => { + it("text-delta accumulates into one TextChunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello")); + expect(s.accumulating).toEqual({ kind: "text", text: "hello" }); + expect(s.provisional).toEqual([]); + }); + + it("successive text-deltas extend the same provisional chunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello ")); + s = foldEvent(s, textDelta("t1", "world")); + expect(s.accumulating).toEqual({ kind: "text", text: "hello world" }); + expect(s.provisional).toEqual([]); + }); + + it("text-delta after reasoning-delta flushes thinking and starts text", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, reasoningDelta("t1", "thinking...")); + s = foldEvent(s, textDelta("t1", "answer")); + expect(s.accumulating).toEqual({ kind: "text", text: "answer" }); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "thinking", text: "thinking..." }); + expect(s.provisional[0]?.role).toBe("assistant"); + }); +}); + +describe("foldEvent — reasoning-delta", () => { + it("reasoning-delta yields a thinking chunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, reasoningDelta("t1", "hmm")); + expect(s.accumulating).toEqual({ kind: "thinking", text: "hmm" }); + }); + + it("successive reasoning-deltas extend the same chunk", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, reasoningDelta("t1", "hmm ")); + s = foldEvent(s, reasoningDelta("t1", "ok")); + expect(s.accumulating).toEqual({ kind: "thinking", text: "hmm ok" }); + }); +}); + +describe("foldEvent — tool-call then tool-result", () => { + it("tool-call then tool-result render in order", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, toolCall("t1", "tc1", "bash", { cmd: "ls" })); + s = foldEvent(s, toolResult("t1", "tc1", "bash", "file.txt")); + expect(s.provisional).toHaveLength(2); + expect(s.provisional[0]?.role).toBe("assistant"); + expect(s.provisional[0]?.chunk).toEqual({ + type: "tool-call", + toolCallId: "tc1", + toolName: "bash", + input: { cmd: "ls" }, + }); + expect(s.provisional[1]?.role).toBe("tool"); + expect(s.provisional[1]?.chunk).toEqual({ + type: "tool-result", + toolCallId: "tc1", + toolName: "bash", + content: "file.txt", + isError: false, + }); + }); + + it("tool-call flushes accumulating text", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "let me check")); + s = foldEvent(s, toolCall("t1", "tc1", "bash", {})); + expect(s.provisional).toHaveLength(2); + expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "let me check" }); + expect(s.provisional[1]?.chunk).toMatchObject({ type: "tool-call", toolCallId: "tc1" }); + expect(s.accumulating).toBeNull(); + }); +}); + +describe("foldEvent — turn-sealed", () => { + it("turn-sealed sets sealedTurnId", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hi")); + s = foldEvent(s, turnSealed("t1")); + expect(s.sealedTurnId).toBe("t1"); + expect(s.accumulating).toBeNull(); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "hi" }); + }); +}); + +describe("foldEvent — usage", () => { + it("stores latest usage", () => { + let s = initialState(); + s = foldEvent(s, usageEvent("t1", 100, 50)); + expect(s.latestUsage).toEqual({ inputTokens: 100, outputTokens: 50 }); + }); + + it("overwrites previous usage", () => { + let s = initialState(); + s = foldEvent(s, usageEvent("t1", 100, 50)); + s = foldEvent(s, usageEvent("t1", 200, 80)); + expect(s.latestUsage).toEqual({ inputTokens: 200, outputTokens: 80 }); + }); +}); + +describe("foldEvent — error", () => { + it("creates error chunk with code", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, errorEvent("t1", "bad", "E001")); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "error", message: "bad", code: "E001" }); + expect(s.provisional[0]?.role).toBe("assistant"); + }); + + it("creates error chunk without code", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, errorEvent("t1", "bad")); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "error", message: "bad" }); + }); +}); + +describe("foldEvent — done", () => { + it("flushes accumulating chunk on done", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello")); + s = foldEvent(s, doneEvent("t1")); + expect(s.accumulating).toBeNull(); + expect(s.provisional).toHaveLength(1); + expect(s.provisional[0]?.chunk).toEqual({ type: "text", text: "hello" }); + }); +}); + +describe("foldEvent — status and tool-output", () => { + it("status is a no-op", () => { + const s = initialState(); + const next = foldEvent(s, { type: "status", conversationId: "c1", status: "running" }); + expect(next).toBe(s); + }); + + it("tool-output is a no-op", () => { + const s = initialState(); + const next = foldEvent(s, { + type: "tool-output", + conversationId: "c1", + turnId: "t1", + toolCallId: "tc1", + data: "output", + stream: "stdout", + }); + expect(next).toBe(s); + }); +}); + +describe("applyHistory", () => { + it("orders committed chunks by seq", () => { + const s = initialState(); + const chunks = [ + storedChunk(3, "assistant", { type: "text", text: "c" }), + storedChunk(1, "user", { type: "text", text: "a" }), + storedChunk(2, "assistant", { type: "text", text: "b" }), + ]; + const next = applyHistory(s, chunks); + expect(next.committed.map((c) => c.seq)).toEqual([1, 2, 3]); + }); + + it("is idempotent on duplicate seqs", () => { + let s = initialState(); + const batch1 = [ + storedChunk(1, "user", { type: "text", text: "a" }), + storedChunk(2, "assistant", { type: "text", text: "b" }), + ]; + s = applyHistory(s, batch1); + const batch2 = [ + storedChunk(2, "assistant", { type: "text", text: "b" }), + storedChunk(3, "assistant", { type: "text", text: "c" }), + ]; + s = applyHistory(s, batch2); + expect(s.committed.map((c) => c.seq)).toEqual([1, 2, 3]); + expect(s.committed).toHaveLength(3); + }); + + it("supersedes & clears provisional once committed", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "hello")); + s = foldEvent(s, turnSealed("t1")); + expect(s.provisional).toHaveLength(1); + expect(s.sealedTurnId).toBe("t1"); + + s = applyHistory(s, [storedChunk(1, "assistant", { type: "text", text: "hello" })]); + expect(s.provisional).toEqual([]); + expect(s.accumulating).toBeNull(); + expect(s.sealedTurnId).toBeNull(); + expect(s.committed).toHaveLength(1); + }); + + it("keeps provisional and accumulating when sealedTurnId is null", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "wip")); + s = foldEvent(s, doneEvent("t1")); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]); + expect(s.provisional).toHaveLength(1); + expect(s.committed).toHaveLength(1); + }); + + it("merges new history into existing committed", () => { + let s = initialState(); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "a" })]); + s = applyHistory(s, [storedChunk(2, "assistant", { type: "text", text: "b" })]); + expect(s.committed).toHaveLength(2); + expect(s.committed.map((c) => c.seq)).toEqual([1, 2]); + }); +}); + +describe("selectChunks", () => { + it("selectChunks marks provisional with seq null", () => { + let s = initialState(); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "wip")); + const chunks = selectChunks(s); + expect(chunks).toHaveLength(2); + expect(chunks[0]?.seq).toBe(1); + expect(chunks[0]?.provisional).toBe(false); + expect(chunks[1]?.seq).toBeNull(); + expect(chunks[1]?.provisional).toBe(true); + }); + + it("returns empty for empty state", () => { + expect(selectChunks(initialState())).toEqual([]); + }); + + it("includes accumulating chunk as provisional", () => { + let s = initialState(); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "building...")); + const chunks = selectChunks(s); + expect(chunks).toHaveLength(1); + expect(chunks[0]?.seq).toBeNull(); + expect(chunks[0]?.provisional).toBe(true); + expect(chunks[0]?.chunk).toEqual({ type: "text", text: "building..." }); + }); +}); + +describe("selectMessages", () => { + it("selectMessages groups consecutive same-role chunks", () => { + let s = initialState(); + s = applyHistory(s, [ + storedChunk(1, "user", { type: "text", text: "q1" }), + storedChunk(2, "user", { type: "text", text: "q2" }), + storedChunk(3, "assistant", { type: "text", text: "a1" }), + storedChunk(4, "assistant", { type: "text", text: "a2" }), + storedChunk(5, "user", { type: "text", text: "q3" }), + ]); + const msgs = selectMessages(s); + expect(msgs).toHaveLength(3); + expect(msgs[0]?.role).toBe("user"); + expect(msgs[0]?.chunks).toHaveLength(2); + expect(msgs[1]?.role).toBe("assistant"); + expect(msgs[1]?.chunks).toHaveLength(2); + expect(msgs[2]?.role).toBe("user"); + expect(msgs[2]?.chunks).toHaveLength(1); + }); + + it("returns empty for empty state", () => { + expect(selectMessages(initialState())).toEqual([]); + }); + + it("mixes committed and provisional in messages", () => { + let s = initialState(); + s = applyHistory(s, [storedChunk(1, "user", { type: "text", text: "q" })]); + s = foldEvent(s, turnStart("t1")); + s = foldEvent(s, textDelta("t1", "a1")); + s = foldEvent(s, textDelta("t1", "a2")); + const msgs = selectMessages(s); + expect(msgs).toHaveLength(2); + expect(msgs[0]?.role).toBe("user"); + expect(msgs[0]?.chunks).toHaveLength(1); + expect(msgs[1]?.role).toBe("assistant"); + expect(msgs[1]?.chunks).toHaveLength(1); + expect(msgs[1]?.chunks[0]).toEqual({ type: "text", text: "a1a2" }); + }); +}); diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts new file mode 100644 index 0000000..0a8ea54 --- /dev/null +++ b/src/core/chunks/reducer.ts @@ -0,0 +1,168 @@ +import type { AgentEvent, Chunk, StoredChunk } from "@dispatch/wire"; +import type { AccumulatingChunk, ProvisionalChunk, TranscriptState } from "./types"; + +/** The initial empty transcript state. */ +export function initialState(): TranscriptState { + return { + committed: [], + provisional: [], + accumulating: null, + currentTurnId: null, + latestUsage: null, + sealedTurnId: null, + }; +} + +function flushAccumulating( + provisional: readonly ProvisionalChunk[], + acc: AccumulatingChunk | null, +): readonly ProvisionalChunk[] { + if (acc === null) return provisional; + const chunk: Chunk = + acc.kind === "text" ? { type: "text", text: acc.text } : { type: "thinking", text: acc.text }; + return [...provisional, { role: "assistant", chunk }]; +} + +/** + * Merge authoritative seq-keyed chunks into the committed history. + * Dedupes by seq (new wins), keeps seq-monotonic order, idempotent. + * When sealedTurnId is set, drops all provisional chunks (now superseded) + * and clears sealedTurnId. + */ +export function applyHistory( + state: TranscriptState, + chunks: readonly StoredChunk[], +): TranscriptState { + const seqMap = new Map<number, StoredChunk>(); + for (const c of state.committed) seqMap.set(c.seq, c); + for (const c of chunks) seqMap.set(c.seq, c); + const committed = Array.from(seqMap.values()).sort((a, b) => a.seq - b.seq); + + if (state.sealedTurnId !== null) { + return { + ...state, + committed, + provisional: [], + accumulating: null, + sealedTurnId: null, + }; + } + + return { ...state, committed }; +} + +/** + * Fold one live AgentEvent into the provisional state. + * + * - `turn-start` records the turnId. + * - `text-delta` extends the current accumulating TextChunk (or starts one). + * - `reasoning-delta` extends the current accumulating ThinkingChunk (or starts one). + * - `tool-call` / `tool-result` / `error` finalize any accumulating chunk and + * add a new provisional chunk. + * - `usage` stores the latest Usage. + * - `done` finalizes any accumulating chunk (turn still provisional). + * - `turn-sealed` finalizes any accumulating chunk and sets sealedTurnId. + * - `status` and `tool-output` are ignored (best-effort no-ops). + */ +export function foldEvent(state: TranscriptState, event: AgentEvent): TranscriptState { + switch (event.type) { + case "status": + case "tool-output": + return state; + + case "turn-start": + return { ...state, currentTurnId: event.turnId }; + + case "text-delta": { + const acc = state.accumulating; + if (acc !== null && acc.kind === "text") { + return { ...state, accumulating: { kind: "text", text: acc.text + event.delta } }; + } + const provisional = flushAccumulating(state.provisional, acc); + return { + ...state, + provisional, + accumulating: { kind: "text", text: event.delta }, + }; + } + + case "reasoning-delta": { + const acc = state.accumulating; + if (acc !== null && acc.kind === "thinking") { + return { ...state, accumulating: { kind: "thinking", text: acc.text + event.delta } }; + } + const provisional = flushAccumulating(state.provisional, acc); + return { + ...state, + provisional, + accumulating: { kind: "thinking", text: event.delta }, + }; + } + + case "tool-call": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = { + type: "tool-call", + toolCallId: event.toolCallId, + toolName: event.toolName, + input: event.input, + }; + return { + ...state, + provisional: [...provisional, { role: "assistant", chunk }], + accumulating: null, + }; + } + + case "tool-result": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = { + type: "tool-result", + toolCallId: event.toolCallId, + toolName: event.toolName, + content: event.content, + isError: event.isError, + }; + return { + ...state, + provisional: [...provisional, { role: "tool", chunk }], + accumulating: null, + }; + } + + case "error": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + const chunk: Chunk = + event.code !== undefined + ? { type: "error", message: event.message, code: event.code } + : { type: "error", message: event.message }; + return { + ...state, + provisional: [...provisional, { role: "assistant", chunk }], + accumulating: null, + }; + } + + case "usage": + return { ...state, latestUsage: event.usage }; + + case "done": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional, + accumulating: null, + }; + } + + case "turn-sealed": { + const provisional = flushAccumulating(state.provisional, state.accumulating); + return { + ...state, + provisional, + accumulating: null, + sealedTurnId: event.turnId, + }; + } + } +} diff --git a/src/core/chunks/selectors.ts b/src/core/chunks/selectors.ts new file mode 100644 index 0000000..8fb832f --- /dev/null +++ b/src/core/chunks/selectors.ts @@ -0,0 +1,51 @@ +import type { ChatMessage, Chunk } from "@dispatch/wire"; +import type { RenderedChunk, TranscriptState } from "./types"; + +/** + * Select all chunks for rendering: committed first (seq order), + * then provisional (seq: null). + */ +export function selectChunks(state: TranscriptState): readonly RenderedChunk[] { + const result: RenderedChunk[] = []; + for (const c of state.committed) { + result.push({ seq: c.seq, role: c.role, chunk: c.chunk, provisional: false }); + } + for (const p of state.provisional) { + result.push({ seq: null, role: p.role, chunk: p.chunk, provisional: true }); + } + if (state.accumulating !== null) { + const chunk: Chunk = + state.accumulating.kind === "text" + ? { type: "text", text: state.accumulating.text } + : { type: "thinking", text: state.accumulating.text }; + result.push({ seq: null, role: "assistant", chunk, provisional: true }); + } + return result; +} + +/** + * Group consecutive same-role rendered chunks into ChatMessages. + */ +export function selectMessages(state: TranscriptState): readonly ChatMessage[] { + const rendered = selectChunks(state); + const first = rendered[0]; + if (first === undefined) return []; + + const messages: ChatMessage[] = []; + let role = first.role; + let chunks: Chunk[] = [first.chunk]; + + for (let i = 1; i < rendered.length; i++) { + const rc = rendered[i]; + if (rc === undefined) continue; + if (rc.role === role) { + chunks.push(rc.chunk); + } else { + messages.push({ role, chunks }); + role = rc.role; + chunks = [rc.chunk]; + } + } + messages.push({ role, chunks }); + return messages; +} diff --git a/src/core/chunks/types.ts b/src/core/chunks/types.ts new file mode 100644 index 0000000..3792445 --- /dev/null +++ b/src/core/chunks/types.ts @@ -0,0 +1,31 @@ +import type { Chunk, Role, StoredChunk, Usage } from "@dispatch/wire"; + +/** A chunk being accumulated from streaming deltas (text or thinking). */ +export interface AccumulatingChunk { + readonly kind: "text" | "thinking"; + readonly text: string; +} + +/** A provisional chunk that has no authoritative seq yet. */ +export interface ProvisionalChunk { + readonly role: Role; + readonly chunk: Chunk; +} + +/** The transcript reducer state. Holds committed history + live in-flight turn. */ +export interface TranscriptState { + readonly committed: readonly StoredChunk[]; + readonly provisional: readonly ProvisionalChunk[]; + readonly accumulating: AccumulatingChunk | null; + readonly currentTurnId: string | null; + readonly latestUsage: Usage | null; + readonly sealedTurnId: string | null; +} + +/** A chunk ready for rendering: either committed (with seq) or provisional. */ +export interface RenderedChunk { + readonly seq: number | null; + readonly role: Role; + readonly chunk: Chunk; + readonly provisional: boolean; +} |
