diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 16:31:03 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 16:31:03 +0900 |
| commit | d102fc890fb27333d87ed07cb31b15dbf890e62c (patch) | |
| tree | 54ac201aea8cdaf0f52647c3cc28b2de70606bc5 | |
| parent | 17bc0a2cdaeefd4974f785c907d3515a38d45363 (diff) | |
| download | dispatch-web-d102fc890fb27333d87ed07cb31b15dbf890e62c.tar.gz dispatch-web-d102fc890fb27333d87ed07cb31b15dbf890e62c.zip | |
test(probe): live-verify tool-call batching (stepId) end-to-end
Extend scripts/live-probe.ts with a second turn that elicits parallel
tool calls and asserts: live tool events carry stepId, replayed chunks
carry chunk.stepId, and groupRenderedChunks folds the batch identically
live and on replay. Deltas now routed by conversationId. Gated (not in
bun run test). Verified 13/13 against bin/up.
| -rw-r--r-- | scripts/live-probe.ts | 229 |
1 files changed, 169 insertions, 60 deletions
diff --git a/scripts/live-probe.ts b/scripts/live-probe.ts index bc654dd..2c4dfb9 100644 --- a/scripts/live-probe.ts +++ b/scripts/live-probe.ts @@ -5,15 +5,22 @@ * * bun scripts/live-probe.ts # default model * PROBE_MODEL=opencode/glm-5 bun scripts/live-probe.ts + * PROBE_TOOL_PROMPT="..." bun scripts/live-probe.ts # override the tool turn * * Drives the FE's REAL network-facing modules (the thin live integration test the * methodology calls for — the analogue of the backend's server.bun.test.ts): * - adapters/ws createSurfaceSocket → real WebSocket, one socket multiplexes - * the surface `catalog` AND chat ops. - * - core/chunks foldEvent/applyHistory → fold REAL chat.delta AgentEvents. + * the surface `catalog` AND chat ops (deltas routed by conversationId). + * - core/chunks foldEvent/applyHistory/groupRenderedChunks → fold REAL + * chat.delta AgentEvents and group batched tool calls by stepId. * - features/conversation-cache + adapters/idb (fake-indexeddb) → real cache. * - HTTP GET /conversations/:id?sinceSeq → real ConversationHistoryResponse. * Skips the runes chat store + svelte UI (need the Svelte compiler; thin wrappers). + * + * Turn 1 verifies the text streaming + cache + replay path. + * Turn 2 verifies the tool-call BATCHING path ([email protected] `stepId`): that live + * tool events AND replayed tool chunks carry `stepId`, and that the pure grouping + * selector folds a parallel batch into one group. */ // Provides globalThis.indexedDB + IDBKeyRange etc. for the idb adapter (a real @@ -27,20 +34,33 @@ import type { import type { SurfaceServerMessage } from "@dispatch/ui-contract"; import { createIdbChunkStore } from "../src/adapters/idb/index.ts"; import { createSurfaceSocket } from "../src/adapters/ws/index.ts"; -import { applyHistory, foldEvent, initialState, selectMessages } from "../src/core/chunks/index.ts"; +import { + applyHistory, + foldEvent, + groupRenderedChunks, + initialState, + selectChunks, + selectMessages, + type TranscriptState, +} from "../src/core/chunks/index.ts"; import { createConversationCache } from "../src/features/conversation-cache/index.ts"; const WS_URL = process.env.PROBE_WS ?? "ws://localhost:24205"; const HTTP_BASE = process.env.PROBE_HTTP ?? "http://localhost:24203"; const MODEL = process.env.PROBE_MODEL ?? "opencode/deepseek-v4-flash"; -const PROMPT = process.env.PROBE_PROMPT ?? "Reply with exactly: hello from dispatch"; -const conversationId = crypto.randomUUID(); +const TEXT_PROMPT = process.env.PROBE_PROMPT ?? "Reply with exactly: hello from dispatch"; +const TOOL_PROMPT = + process.env.PROBE_TOOL_PROMPT ?? + "Make two tool calls AT THE SAME TIME in a single step (parallel tool calls). " + + "For example, run two independent shell commands together: `echo alpha` and `echo beta`. " + + "If you have no shell tool, invoke any two of your available read-only tools simultaneously."; const checks: { name: string; ok: boolean; detail?: string }[] = []; const record = (name: string, ok: boolean, detail?: string) => { checks.push({ name, ok, ...(detail !== undefined ? { detail } : {}) }); console.log(` ${ok ? "✅" : "❌"} ${name}${detail ? ` — ${detail}` : ""}`); }; +const note = (msg: string) => console.log(` ℹ️ ${msg}`); function fail(msg: string): never { console.error(`\n[live-probe] FATAL: ${msg}`); @@ -54,34 +74,61 @@ async function historySync(id: string, sinceSeq: number): Promise<ConversationHi return (await res.json()) as ConversationHistoryResponse; } -async function main() { - console.log(`[live-probe] conversation=${conversationId} model=${MODEL}`); - console.log(`[live-probe] WS=${WS_URL} HTTP=${HTTP_BASE}\n`); +type ChatMsg = ChatDeltaMessage | ChatErrorMessage; +type Socket = ReturnType<typeof createSurfaceSocket>; - const cache = createConversationCache(createIdbChunkStore()); +const handlers = new Map<string, (msg: ChatMsg) => void>(); +function convOf(msg: ChatMsg): string | undefined { + return msg.type === "chat.error" ? msg.conversationId : msg.event.conversationId; +} +/** Drive one turn to turn-sealed (or error), folding events into a fresh state. */ +async function runTurn( + socket: Socket, + conversationId: string, + prompt: string, +): Promise<{ state: TranscriptState; deltas: number; sealed: boolean; error: string | null }> { let state = initialState(); - let gotCatalog = false; - let deltaCount = 0; - let sawTextDelta = false; - let sawSeal = false; + let deltas = 0; + let sealed = false; + let error: string | null = null; const done = Promise.withResolvers<void>(); - const onChat = (msg: ChatDeltaMessage | ChatErrorMessage) => { + handlers.set(conversationId, (msg) => { if (msg.type === "chat.error") { - record("no chat.error", false, msg.message); + error = msg.message; done.resolve(); return; } - deltaCount++; - if (msg.event.type === "text-delta") sawTextDelta = true; + deltas++; state = foldEvent(state, msg.event); if (msg.event.type === "turn-sealed") { - sawSeal = true; + sealed = true; done.resolve(); } - }; + }); + + socket.send({ type: "chat.send", conversationId, message: prompt, model: MODEL }); + const timeout = setTimeout(() => done.resolve(), 90_000); + await done.promise; + clearTimeout(timeout); + handlers.delete(conversationId); + return { state, deltas, sealed, error }; +} +function toolChunksOf(state: TranscriptState) { + return selectChunks(state).filter( + (c) => c.chunk.type === "tool-call" || c.chunk.type === "tool-result", + ); +} + +async function main() { + console.log(`[live-probe] model=${MODEL}`); + console.log(`[live-probe] WS=${WS_URL} HTTP=${HTTP_BASE}\n`); + + const cache = createConversationCache(createIdbChunkStore()); + + let gotCatalog = false; const socket = createSurfaceSocket({ url: WS_URL, onMessage: (m: SurfaceServerMessage) => { @@ -90,65 +137,127 @@ async function main() { console.log(` ↳ surface catalog: ${m.catalog.length} surface(s)`); } }, - onChat, + onChat: (msg: ChatMsg) => { + const id = convOf(msg); + const h = id !== undefined ? handlers.get(id) : undefined; + if (h) h(msg); + }, }); - // Give the socket a moment to open + deliver the catalog, then send the turn. await new Promise((r) => setTimeout(r, 500)); record("WS connected + surface catalog received", gotCatalog); - console.log(`\n[live-probe] sending chat.send: "${PROMPT}"`); - socket.send({ type: "chat.send", conversationId, message: PROMPT, model: MODEL }); - - // Wait for turn-sealed (or error), with a hard timeout. - const timeout = setTimeout(() => done.resolve(), 90_000); - await done.promise; - clearTimeout(timeout); - - record("received chat.delta events", deltaCount > 0, `${deltaCount} deltas`); - record("saw text-delta", sawTextDelta); - record("turn reached turn-sealed", sawSeal); - - const provisionalText = selectMessages(state) - .flatMap((m) => m.chunks) - .filter((c) => c.type === "text") - .map((c) => (c as { text: string }).text) - .join(""); - console.log(`\n ↳ streamed assistant text (provisional): ${JSON.stringify(provisionalText)}`); + // ─── Turn 1: text streaming + cache + replay ──────────────────────────────── + console.log(`\n[live-probe] TURN 1 (text): "${TEXT_PROMPT}"`); + const textConv = crypto.randomUUID(); + const t1 = await runTurn(socket, textConv, TEXT_PROMPT); + if (t1.error !== null) record("turn 1 had no chat.error", false, t1.error); + record("turn 1 received chat.delta events", t1.deltas > 0, `${t1.deltas} deltas`); + record("turn 1 reached turn-sealed", t1.sealed); - // Post-seal: resync authoritative seq'd history + commit to cache (the real path). - const sinceSeq = await cache.sinceSeq(conversationId); - const hist = await historySync(conversationId, sinceSeq); + let state = t1.state; + const sinceSeq = await cache.sinceSeq(textConv); + const hist = await historySync(textConv, sinceSeq); record( - "history endpoint returned chunks", + "turn 1 history endpoint returned chunks", hist.chunks.length > 0, `${hist.chunks.length} chunks, latestSeq=${hist.latestSeq}`, ); const monotonic = hist.chunks.every((c, i) => i === 0 || c.seq > (hist.chunks[i - 1]?.seq ?? -1)); - record("history chunks are seq-monotonic", monotonic); - - const merged = await cache.commit(conversationId, hist.chunks); + record("turn 1 history chunks are seq-monotonic", monotonic); + const merged = await cache.commit(textConv, hist.chunks); state = applyHistory(state, merged); - record( - "provisional superseded after applyHistory (sealedTurnId cleared)", - state.sealedTurnId === null, - ); - - const cached = await cache.load(conversationId); - record( - "IndexedDB cache persisted the turn", - cached.length === hist.chunks.length, - `${cached.length} cached`, - ); - + record("turn 1 provisional superseded (sealedTurnId cleared)", state.sealedTurnId === null); + const cached = await cache.load(textConv); + record("turn 1 IndexedDB cache persisted the turn", cached.length === hist.chunks.length); const committedText = selectMessages(state) .filter((m) => m.role === "assistant") .flatMap((m) => m.chunks) .filter((c) => c.type === "text") .map((c) => (c as { text: string }).text) .join(""); - console.log(` ↳ committed assistant text (post-sync): ${JSON.stringify(committedText)}`); - record("committed transcript has assistant text", committedText.length > 0); + record("turn 1 committed transcript has assistant text", committedText.length > 0); + + // ─── Turn 2: tool-call batching ([email protected] stepId) ───────────────────────── + console.log(`\n[live-probe] TURN 2 (tools): "${TOOL_PROMPT}"`); + const toolConv = crypto.randomUUID(); + const t2 = await runTurn(socket, toolConv, TOOL_PROMPT); + if (t2.error !== null) record("turn 2 had no chat.error", false, t2.error); + record("turn 2 reached turn-sealed", t2.sealed); + + const liveTool = toolChunksOf(t2.state); + const liveCalls = liveTool.filter((c) => c.chunk.type === "tool-call"); + + if (liveCalls.length === 0) { + note( + "INCONCLUSIVE: the model issued no tool calls this run — cannot verify stepId grouping live. " + + "Re-run with a stronger PROBE_TOOL_PROMPT or one tailored to the backend's tool set.", + ); + record("turn 2 tool-call batching (live)", true, "skipped — no tool calls issued"); + } else { + // Every live tool chunk must carry stepId (foldEvent copies it from the event). + const allLiveHaveStep = liveTool.every( + (c) => + (c.chunk.type === "tool-call" || c.chunk.type === "tool-result") && + typeof c.chunk.stepId === "string" && + c.chunk.stepId.length > 0, + ); + record( + "turn 2 every LIVE tool event carries stepId", + allLiveHaveStep, + `${liveCalls.length} call(s), ${liveTool.length - liveCalls.length} result(s)`, + ); + + const liveGroups = groupRenderedChunks(selectChunks(t2.state)); + const liveBatches = liveGroups.filter((g) => g.kind === "tool-batch"); + const distinctSteps = new Set( + liveCalls.map((c) => (c.chunk.type === "tool-call" ? c.chunk.stepId : undefined)), + ); + note( + `live grouping: ${liveCalls.length} call(s) across ${distinctSteps.size} step(s) → ` + + `${liveBatches.length} batch group(s)`, + ); + if (liveBatches.length > 0) { + record( + "turn 2 grouping produced a parallel batch (2+ calls in one step)", + true, + `${liveBatches.length} batch(es)`, + ); + } else { + note( + "the model used tools but did NOT parallelize (each call its own step) — stepId is verified, " + + "but no multi-call batch occurred to render as a list this run.", + ); + } + + // Replay path: persisted tool chunks must also carry chunk.stepId. + const histTool = await historySync(toolConv, 0); + const replayTool = histTool.chunks.filter( + (c) => c.chunk.type === "tool-call" || c.chunk.type === "tool-result", + ); + const allReplayHaveStep = replayTool.every( + (c) => + (c.chunk.type === "tool-call" || c.chunk.type === "tool-result") && + typeof c.chunk.stepId === "string" && + c.chunk.stepId.length > 0, + ); + record( + "turn 2 every REPLAYED tool chunk carries chunk.stepId", + replayTool.length > 0 && allReplayHaveStep, + `${replayTool.length} tool chunk(s) in history`, + ); + + // Grouping on the authoritative replayed history matches the live shape. + const replayState = applyHistory(initialState(), await cache.commit(toolConv, histTool.chunks)); + const replayBatches = groupRenderedChunks(selectChunks(replayState)).filter( + (g) => g.kind === "tool-batch", + ); + record( + "turn 2 replay grouping matches live (batch count)", + replayBatches.length === liveBatches.length, + `live=${liveBatches.length} replay=${replayBatches.length}`, + ); + } socket.close(); |
