diff options
| author | Adam Malczewski <[email protected]> | 2026-06-24 13:43:40 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-24 13:43:40 +0900 |
| commit | b58fb8373a1f7311cead23aa9a4d1fcd6927634f (patch) | |
| tree | 839e0e51a235ed5d9bb8d24ac0f367552c1d61ac | |
| parent | d274567893ff3283878ac0dcafd51a0b127653d7 (diff) | |
| download | dispatch-b58fb8373a1f7311cead23aa9a4d1fcd6927634f.tar.gz dispatch-b58fb8373a1f7311cead23aa9a4d1fcd6927634f.zip | |
fix(broken-chat): read-time self-repair of unrecoverable chats
reconcile() only repaired orphaned tool-calls. Two other broken states made
chats uncontinuable, and load() had no parse-error guard:
- A trailing assistant message whose only chunk is 'error' (a failed-
generation marker) serializes to empty content -> provider rejects/empty
-> chat never continues. 6 of 140 production conversations were stuck.
- A tool-call whose input is a raw malformed-JSON string (model emitted
broken JSON) re-sent as OpenAI arguments -> provider 400s on every
continuation (the 77574596 break).
- load() JSON.parse had no try/catch -> one corrupt row bricked the chat.
Fix = read-time repair (no DB surgery; append-only preserved). reconcile
runs on every load() BEFORE any provider sees messages, so Layer 1
protects ALL providers.
Layer 1 (conversation-store reconcile): strip error chunks from assistant
messages + drop the now-empty error-only messages (safe: never followed by
a tool message); orphaned-tool-call synthesis unchanged; ReconcileReport
+2 additive counts. loadSince (FE reads) intentionally unreconciled so the
user still SEES the error. load() wraps JSON.parse in try/catch (skip
corrupt rows).
Layer 2 (openai-stream): serializeToolArguments ensures tool-call
arguments is always valid JSON (malformed string -> fallback object),
neutralizing already-stored malformed args.
Layer 2 equiv (../claude provider-anthropic): safeJson returns a valid
object fallback on parse failure, not the raw string. (Separate repo.)
Live-verified: reproduced 77574596's real broken tail in the dev DB;
POST /chat continued it cleanly (no 400, model replied) — the provider
accepted the reconciled history.
tsc -b EXIT 0, biome clean, 1453 vitest pass.
| -rw-r--r-- | packages/conversation-store/src/reconcile.test.ts | 134 | ||||
| -rw-r--r-- | packages/conversation-store/src/reconcile.ts | 54 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.test.ts | 48 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.ts | 26 | ||||
| -rw-r--r-- | packages/openai-stream/src/convert-messages.test.ts | 105 | ||||
| -rw-r--r-- | packages/openai-stream/src/convert-messages.ts | 30 | ||||
| -rw-r--r-- | tasks.md | 44 |
7 files changed, 432 insertions, 9 deletions
diff --git a/packages/conversation-store/src/reconcile.test.ts b/packages/conversation-store/src/reconcile.test.ts index 90d1157..78b808e 100644 --- a/packages/conversation-store/src/reconcile.test.ts +++ b/packages/conversation-store/src/reconcile.test.ts @@ -1,6 +1,6 @@ import type { ChatMessage, StepId } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; -import { reconcile } from "./reconcile.js"; +import { reconcile, reconcileWithReport } from "./reconcile.js"; describe("reconcile", () => { it("returns empty array for empty input", () => { @@ -285,4 +285,136 @@ describe("reconcile", () => { expect(chunk).not.toHaveProperty("stepId"); } }); + + // --- Layer 1: read-time self-repair of broken chats (error chunks) --- + + it("reconcile strips error-only trailing assistant message", () => { + // The 77574596/102587c0 shape: [user, assistant{error}] -> [user]. + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "hi" }] }, + { role: "assistant", chunks: [{ type: "error", message: "boom" }] }, + ]; + const { messages: result, report } = reconcileWithReport(messages); + expect(result).toEqual([{ role: "user", chunks: [{ type: "text", text: "hi" }] }]); + expect(report.strippedErrorChunks).toBe(1); + expect(report.droppedEmptyMessages).toBe(1); + expect(report.repairedCount).toBe(0); + }); + + it("reconcile strips error chunk but keeps sibling text", () => { + // assistant{text,error} -> assistant{text}. + const messages: ChatMessage[] = [ + { + role: "assistant", + chunks: [ + { type: "text", text: "hello" }, + { type: "error", message: "boom" }, + ], + }, + ]; + const { messages: result, report } = reconcileWithReport(messages); + expect(result).toEqual([{ role: "assistant", chunks: [{ type: "text", text: "hello" }] }]); + expect(report.strippedErrorChunks).toBe(1); + expect(report.droppedEmptyMessages).toBe(0); + expect(report.repairedCount).toBe(0); + }); + + it("reconcile drops assistant message left empty after stripping error", () => { + // assistant{error} only -> dropped entirely. + const messages: ChatMessage[] = [ + { role: "assistant", chunks: [{ type: "error", message: "boom" }] }, + ]; + const { messages: result, report } = reconcileWithReport(messages); + expect(result).toEqual([]); + expect(report.strippedErrorChunks).toBe(1); + expect(report.droppedEmptyMessages).toBe(1); + expect(report.repairedCount).toBe(0); + }); + + it("reconcile keeps tool-call + strips error", () => { + // assistant{tool-call,error} with a matching result -> assistant{tool-call}. + const messages: ChatMessage[] = [ + { + role: "assistant", + chunks: [ + { type: "tool-call", toolCallId: "call_1", toolName: "t", input: {} }, + { type: "error", message: "boom" }, + ], + }, + { + role: "tool", + chunks: [ + { + type: "tool-result", + toolCallId: "call_1", + toolName: "t", + content: "ok", + isError: false, + }, + ], + }, + ]; + const { messages: result, report } = reconcileWithReport(messages); + expect(result).toEqual([ + { + role: "assistant", + chunks: [{ type: "tool-call", toolCallId: "call_1", toolName: "t", input: {} }], + }, + { + role: "tool", + chunks: [ + { + type: "tool-result", + toolCallId: "call_1", + toolName: "t", + content: "ok", + isError: false, + }, + ], + }, + ]); + expect(report.strippedErrorChunks).toBe(1); + expect(report.droppedEmptyMessages).toBe(0); + expect(report.repairedCount).toBe(0); // the tool-call has a matching result + }); + + it("reconcile strips error and still synthesizes a result for an orphaned tool-call", () => { + // Ordering guard: strip error chunks first, then run orphaned-tool-call + // synthesis on what remains. assistant{tool-call,error} with NO result -> + // the error is stripped, the tool-call survives, and a result is synthesized. + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "go" }] }, + { + role: "assistant", + chunks: [ + { type: "tool-call", toolCallId: "call_orph", toolName: "t", input: {} }, + { type: "error", message: "boom" }, + ], + }, + ]; + const { messages: result, report } = reconcileWithReport(messages); + expect(result).toEqual([ + { role: "user", chunks: [{ type: "text", text: "go" }] }, + { + role: "assistant", + chunks: [{ type: "tool-call", toolCallId: "call_orph", toolName: "t", input: {} }], + }, + { + role: "tool", + chunks: [ + { + type: "tool-result", + toolCallId: "call_orph", + toolName: "t", + content: "interrupted: tool execution did not complete", + isError: true, + }, + ], + }, + ]); + expect(report.strippedErrorChunks).toBe(1); + expect(report.droppedEmptyMessages).toBe(0); + expect(report.repairedCount).toBe(1); + expect(report.repairedToolCallIds).toEqual(["call_orph"]); + }); }); diff --git a/packages/conversation-store/src/reconcile.ts b/packages/conversation-store/src/reconcile.ts index 9a3011f..ea33904 100644 --- a/packages/conversation-store/src/reconcile.ts +++ b/packages/conversation-store/src/reconcile.ts @@ -3,6 +3,10 @@ import type { ChatMessage, ToolCallChunk, ToolResultChunk } from "@dispatch/kern export interface ReconcileReport { readonly repairedCount: number; readonly repairedToolCallIds: readonly string[]; + /** Number of `error` chunks stripped from assistant messages. */ + readonly strippedErrorChunks: number; + /** Number of assistant messages dropped after stripping left them empty. */ + readonly droppedEmptyMessages: number; } export interface ReconcileResult { @@ -11,8 +15,50 @@ export interface ReconcileResult { } export function reconcileWithReport(messages: readonly ChatMessage[]): ReconcileResult { - const resolvedIds = new Set<string>(); + // Phase 1: strip `error` chunks from assistant messages. An error chunk is a + // failed-generation marker, never valid provider content — removing it here + // (on load, before any provider sees the messages) auto-repairs broken chats + // with no DB surgery (append-only storage untouched). + let strippedErrorChunks = 0; + const stripped: ChatMessage[] = []; for (const msg of messages) { + if (msg.role === "assistant" && msg.chunks.some((c) => c.type === "error")) { + const filtered = msg.chunks.filter((chunk) => { + if (chunk.type === "error") { + strippedErrorChunks++; + return false; + } + return true; + }); + stripped.push({ role: msg.role, chunks: filtered }); + } else { + stripped.push(msg); + } + } + + // Phase 2: drop assistant messages left with neither `text` nor `tool-call` + // chunks after stripping (the now-empty error-only message). This is what + // unblocks continuation: such a message serializes to nothing a provider + // understands. Safe: it ends with no tool-call, so it is NEVER followed by a + // `tool` message — no "tool-without-preceding-assistant-tool_calls" 400. + let droppedEmptyMessages = 0; + const pruned: ChatMessage[] = []; + for (const msg of stripped) { + if (msg.role === "assistant") { + const hasContent = msg.chunks.some( + (chunk) => chunk.type === "text" || chunk.type === "tool-call", + ); + if (!hasContent) { + droppedEmptyMessages++; + continue; + } + } + pruned.push(msg); + } + + // Phase 3: orphaned-tool-call synthesis (unchanged) on what remains. + const resolvedIds = new Set<string>(); + for (const msg of pruned) { for (const chunk of msg.chunks) { if (chunk.type === "tool-result") { resolvedIds.add(chunk.toolCallId); @@ -21,7 +67,7 @@ export function reconcileWithReport(messages: readonly ChatMessage[]): Reconcile } const orphaned: ToolCallChunk[] = []; - for (const msg of messages) { + for (const msg of pruned) { if (msg.role !== "assistant") continue; for (const chunk of msg.chunks) { if (chunk.type === "tool-call" && !resolvedIds.has(chunk.toolCallId)) { @@ -30,7 +76,7 @@ export function reconcileWithReport(messages: readonly ChatMessage[]): Reconcile } } - const result: ChatMessage[] = [...messages]; + const result: ChatMessage[] = [...pruned]; for (const call of orphaned) { const base = { @@ -50,6 +96,8 @@ export function reconcileWithReport(messages: readonly ChatMessage[]): Reconcile report: { repairedCount: orphaned.length, repairedToolCallIds: orphaned.map((c) => c.toolCallId), + strippedErrorChunks, + droppedEmptyMessages, }, }; } diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts index 5e8d26f..65b7dc3 100644 --- a/packages/conversation-store/src/store.test.ts +++ b/packages/conversation-store/src/store.test.ts @@ -7,7 +7,7 @@ import type { TurnMetrics, } from "@dispatch/kernel"; import { beforeEach, describe, expect, it } from "vitest"; -import { CONVERSATION_INDEX_KEY, metaKey } from "./keys.js"; +import { CONVERSATION_INDEX_KEY, chunkKey, metaKey } from "./keys.js"; import { createConversationStore, extractTitle } from "./store.js"; interface SpanEvent { @@ -410,6 +410,52 @@ describe("ConversationStore", () => { } }); + it("load() skips a corrupt-JSON chunk row and reconciles the rest (no throw)", async () => { + // "Never leave the system broken": a single bad row must not brick the + // conversation. The corrupt chunk is skipped; the rest loads and reconcile + // still runs normally. Fake only the OUTERMOST edge (the injected storage) + // — no @dispatch/* mocks. + const { logger } = createCapturingLogger(); + const store = createConversationStore(storage, logger); + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "do it" }] }, + { + role: "assistant", + chunks: [ + { type: "text", text: "calling" }, + { type: "tool-call", toolCallId: "call_x", toolName: "t", input: {} }, + ], + }, + ]; + await store.append("conv_corrupt", messages); + // Corrupt the assistant text chunk (seq 2) directly in storage. + await storage.set(chunkKey("conv_corrupt", 2), "{this is not valid json"); + + const result = await store.load("conv_corrupt"); + // No throw. The user message survives; the assistant message keeps its + // tool-call (its text chunk was the corrupt row, skipped); reconcile + // synthesizes the missing tool-result for the now-orphaned tool-call. + expect(result).toEqual([ + { role: "user", chunks: [{ type: "text", text: "do it" }] }, + { + role: "assistant", + chunks: [{ type: "tool-call", toolCallId: "call_x", toolName: "t", input: {} }], + }, + { + role: "tool", + chunks: [ + { + type: "tool-result", + toolCallId: "call_x", + toolName: "t", + content: "interrupted: tool execution did not complete", + isError: true, + }, + ], + }, + ]); + }); + it("loadSince returns empty array for unknown conversation", async () => { const store = createConversationStore(storage); const result = await store.loadSince("nonexistent"); diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts index b70f706..26d5ed4 100644 --- a/packages/conversation-store/src/store.ts +++ b/packages/conversation-store/src/store.ts @@ -538,7 +538,23 @@ export function createConversationStore( for (const key of sorted) { const value = await storage.get(key); if (value === null) continue; - const entry = JSON.parse(value) as PersistedChunkEntry; + let entry: PersistedChunkEntry; + try { + entry = JSON.parse(value) as PersistedChunkEntry; + } catch (err) { + // "Never leave the system broken": a single corrupt/unparseable + // row must not brick the whole conversation. Skip it (append-only + // storage untouched) and let reconcile run on the rest. loadSince + // is intentionally NOT hardened here — it is the raw FE read path. + if (logger !== undefined) { + logger.warn("skipping corrupt chunk row", { + conversationId, + key, + error: err instanceof Error ? err.message : String(err), + }); + } + continue; + } if (entry.msgIdx !== currentMsgIdx) { if (currentMsgIdx >= 0 && currentRole !== undefined) { @@ -558,11 +574,17 @@ export function createConversationStore( const { messages: repaired, report } = reconcileWithReport(messages); - if (report.repairedCount > 0 && logger !== undefined) { + const hasReconcileActivity = + report.repairedCount > 0 || + report.strippedErrorChunks > 0 || + report.droppedEmptyMessages > 0; + if (hasReconcileActivity && logger !== undefined) { const child = logger.child({ conversationId }); const span = child.span("reconcile.repair", { repairedCount: report.repairedCount, firstRepairedToolCallId: report.repairedToolCallIds[0] ?? null, + strippedErrorChunks: report.strippedErrorChunks, + droppedEmptyMessages: report.droppedEmptyMessages, }); span.end(); } diff --git a/packages/openai-stream/src/convert-messages.test.ts b/packages/openai-stream/src/convert-messages.test.ts index 51513ea..004a6b7 100644 --- a/packages/openai-stream/src/convert-messages.test.ts +++ b/packages/openai-stream/src/convert-messages.test.ts @@ -256,4 +256,109 @@ describe("convertMessages", () => { const result = convertMessages(messages); expect(result).toEqual([{ role: "assistant", content: "Let me think...Here is my answer." }]); }); + + it("arguments is valid JSON when input is a malformed string", () => { + // Production seq-134 shape: the model emitted broken JSON as the tool + // arguments and it was stored verbatim. Unquoted key fails JSON.parse at + // some column (position 1 here). + const malformed = '{path: "/src/main.ts"}'; + expect(() => JSON.parse(malformed)).toThrow(); + + const messages: ChatMessage[] = [ + { + role: "assistant", + chunks: [ + { + type: "tool-call", + toolCallId: "call_bad", + toolName: "read_file", + input: malformed, + }, + ], + }, + ]; + + const result = convertMessages(messages); + const args = result[0]?.tool_calls?.[0]?.function.arguments; + expect(args).toBeDefined(); + // The output MUST parse without throwing — the provider receives valid JSON. + expect(() => JSON.parse(args as string)).not.toThrow(); + // And it is the fallback object preserving a truncated hint. + expect(JSON.parse(args as string)).toEqual({ + _malformed_arguments: malformed.slice(0, 200), + }); + }); + + it("arguments passes through valid string input", () => { + const validJson = '{"path":"/src/main.ts"}'; + expect(() => JSON.parse(validJson)).not.toThrow(); + + const messages: ChatMessage[] = [ + { + role: "assistant", + chunks: [ + { + type: "tool-call", + toolCallId: "call_str", + toolName: "read_file", + input: validJson, + }, + ], + }, + ]; + + const result = convertMessages(messages); + const args = result[0]?.tool_calls?.[0]?.function.arguments; + // A valid-JSON string round-trips to a canonical JSON string. + expect(args).toBe(JSON.stringify(JSON.parse(validJson))); + expect(args).toBe('{"path":"/src/main.ts"}'); + }); + + it("stringifies object input", () => { + const input = { path: "/src/main.ts", line: 42 }; + const messages: ChatMessage[] = [ + { + role: "assistant", + chunks: [ + { + type: "tool-call", + toolCallId: "call_obj", + toolName: "read_file", + input, + }, + ], + }, + ]; + + const result = convertMessages(messages); + const args = result[0]?.tool_calls?.[0]?.function.arguments; + expect(args).toBe(JSON.stringify(input)); + }); + + it("truncates the malformed input hint to 200 characters", () => { + // A long bare run of letters is not valid JSON (no quotes/braces). + const malformed = "x".repeat(500); + expect(() => JSON.parse(malformed)).toThrow(); + + const messages: ChatMessage[] = [ + { + role: "assistant", + chunks: [ + { + type: "tool-call", + toolCallId: "call_long", + toolName: "read_file", + input: malformed, + }, + ], + }, + ]; + + const result = convertMessages(messages); + const args = result[0]?.tool_calls?.[0]?.function.arguments; + expect(() => JSON.parse(args as string)).not.toThrow(); + const parsed = JSON.parse(args as string); + expect(parsed).toEqual({ _malformed_arguments: malformed.slice(0, 200) }); + expect(parsed._malformed_arguments.length).toBe(200); + }); }); diff --git a/packages/openai-stream/src/convert-messages.ts b/packages/openai-stream/src/convert-messages.ts index 786a70d..76badc8 100644 --- a/packages/openai-stream/src/convert-messages.ts +++ b/packages/openai-stream/src/convert-messages.ts @@ -71,7 +71,7 @@ function convertAssistantMessage(msg: ChatMessage): OpenAIMessage { type: "function", function: { name: c.toolName, - arguments: typeof c.input === "string" ? c.input : JSON.stringify(c.input), + arguments: serializeToolArguments(c.input), }, }), ); @@ -97,3 +97,31 @@ function convertToolResultMessages(msg: ChatMessage): OpenAIMessage[] { }), ); } + +/** + * Serialize a tool-call's `input` into a JSON string the provider will accept. + * + * The OpenAI `arguments` field MUST be a valid JSON string. A broken chat can + * have a tool-call whose `input` is a raw malformed-JSON string (the model + * emitted broken JSON as the tool arguments and it was stored verbatim). + * Passing that string straight through makes the provider 400 + * `unexpected character` on EVERY continuation, bricking the chat. + * + * - object input → `JSON.stringify(input)` (regression, unchanged shape). + * - string input that is valid JSON → re-serialized to canonical JSON. + * - string input that fails to parse → a valid fallback object preserving a + * truncated hint of the original, so the chat can continue (the model sees + * its tool-call had no usable args and adjusts). + * + * Pure: input → output, no I/O. + */ +function serializeToolArguments(input: unknown): string { + if (typeof input === "string") { + try { + return JSON.stringify(JSON.parse(input)); + } catch { + return JSON.stringify({ _malformed_arguments: input.slice(0, 200) }); + } + } + return JSON.stringify(input); +} @@ -5,7 +5,49 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **1443 vitest** green. +`tsc -b` EXIT 0 · biome clean · **1453 vitest** green. + +## Broken-chat self-repair (read-time reconcile) (DONE) +Conversation `77574596` broke unrecoverably: `reconcile()` only repaired orphaned +tool-calls, not (a) a trailing assistant message whose only chunk is `error` +(serializes to empty content → uncontinuable) and (b) a `tool-call` whose `input` +is a raw malformed-JSON string (re-sent as OpenAI `arguments` → provider 400s on +every continuation). `load()` also had no try/catch on `JSON.parse` (one corrupt +row would brick a chat). Fix = read-time repair so broken chats auto-heal on next +open — NO DB surgery (append-only preserved; repair is a turn-path transform on +`load()`). Full diagnosis + plan: `broken-chat-repair-handoff.md` + +`reports/broken-chat-repair-diagnosis.md`. +- **Layer 1 — `conversation-store` `reconcile.ts` (protects ALL providers):** + `reconcileWithReport` now (1) strips `error` chunks from assistant messages, (2) + drops any assistant message left with no `text`/`tool-call` (the emptied error-only + msg — safe: never followed by a `tool` msg), (3) keeps orphaned-tool-call synthesis + unchanged. `ReconcileReport` +2 additive counts (`strippedErrorChunks`, + `droppedEmptyMessages`) for the repair span. `loadSince` (FE reads) intentionally + NOT reconciled — the user still SEES the error while the provider gets clean history. + **Hardening:** `store.ts` `load()` wraps per-chunk `JSON.parse` in try/catch → + corrupt row skipped (log + continue), reconcile runs on the rest. +6 reconcile/store + tests. +- **Layer 2 — `openai-stream` `convert-messages.ts` (per-provider args safety):** new + pure `serializeToolArguments` — object→stringify; valid-string→parse+restringify; + malformed-string→fallback `{ _malformed_arguments: <truncated 200> }`. Output ALWAYS + `JSON.parse`s → provider stops 400ing on stored malformed args. +4 tests. +- **Layer 2 (equiv) — `../claude` `provider-anthropic` `convert.ts`:** `safeJson` now + returns a valid object fallback (`{ _malformed_arguments: s.slice(0,200) }`) on + parse failure, not the raw string (`tool_use.input` must be an object for Anthropic). + Exported for direct testing. +3 tests. (Separate repo, separate agent.) +- **Wave 1+2 (parallel, disjoint):** conversation-store + openai-stream (arch-rewrite) + + provider-anthropic (`../claude`). All in-lane; zero internal mocks; no contract/type + change. Reports: `reports/conversation-store.md`, `reports/openai-stream.md`, + `../claude/reports/provider-anthropic.md`. +- [x] Verified: arch-rewrite `tsc -b` EXIT 0, biome clean, **1453 vitest** (was 1443); + `../claude` `tsc -b` EXIT 0, 71 vitest, biome clean. Both pure-core units zero + internal mocks. +- [x] **LIVE-VERIFIED** (dev stack `bin/up` :24203): reproduced 77574596's REAL broken + tail (the actual malformed-args tool-call + trailing error chunk) in the dev DB; + `POST /chat` continued it cleanly (`text-delta:"OK"` → `done` reason `"stop"`, no + 400) — the provider accepted the reconciled history (error stripped, args sanitized). + The historical error chunk remains in storage by design (read-time repair only); no + new error was appended. Cleaned up the test conversation after. ## LSP — broken-server recovery + config source attribution (DONE) Handoff from an agent running in raylib-jamstack (configuring ruby-lsp under the |
