summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-10 08:49:16 +0900
committerAdam Malczewski <[email protected]>2026-06-10 08:49:16 +0900
commit2e583ea78a7efa6e9a31b7c5b4dfcf792007e418 (patch)
tree35dc89df83ec615768adcf277499b4d55e36cba1
parentee502ba1228fdaec4a15413a973ffce7ca89a0b6 (diff)
downloaddispatch-2e583ea78a7efa6e9a31b7c5b4dfcf792007e418.tar.gz
dispatch-2e583ea78a7efa6e9a31b7c5b4dfcf792007e418.zip
feat(conversation-store): reconcile.repair span (logging-audit #1)
Load-time history repair was invisible (createConversationStore got no logger). Now: optional logger injected (extension passes host.logger); reconcile logic moved into pure reconcileWithReport() returning a ReconcileReport (reconcile() stays a thin byte-identical wrapper); load() emits a reconcile.repair span (childed with conversationId, flat attrs repairedCount/firstRepairedToolCallId) ONLY when a real repair occurs. No contract fan-out (factory is package-internal). typecheck EXIT 0, biome clean, 550 vitest (+4) + 89 bun.
-rw-r--r--packages/conversation-store/src/extension.ts2
-rw-r--r--packages/conversation-store/src/index.ts3
-rw-r--r--packages/conversation-store/src/reconcile.ts24
-rw-r--r--packages/conversation-store/src/store.test.ts189
-rw-r--r--packages/conversation-store/src/store.ts21
-rw-r--r--tasks.md3
6 files changed, 231 insertions, 11 deletions
diff --git a/packages/conversation-store/src/extension.ts b/packages/conversation-store/src/extension.ts
index 05ba8ef..b5a1380 100644
--- a/packages/conversation-store/src/extension.ts
+++ b/packages/conversation-store/src/extension.ts
@@ -16,7 +16,7 @@ export const extension: Extension = {
manifest,
activate: (host: HostAPI) => {
const storage = host.storage("conversation-store");
- const store = createConversationStore(storage);
+ const store = createConversationStore(storage, host.logger);
host.provideService(conversationStoreHandle, store);
},
};
diff --git a/packages/conversation-store/src/index.ts b/packages/conversation-store/src/index.ts
index f9c0ccc..b9f288a 100644
--- a/packages/conversation-store/src/index.ts
+++ b/packages/conversation-store/src/index.ts
@@ -1,5 +1,6 @@
export type { StoredChunk } from "@dispatch/kernel";
export { extension, manifest } from "./extension.js";
-export { reconcile } from "./reconcile.js";
+export type { ReconcileReport, ReconcileResult } from "./reconcile.js";
+export { reconcile, reconcileWithReport } from "./reconcile.js";
export type { ConversationStore } from "./store.js";
export { conversationStoreHandle, createConversationStore } from "./store.js";
diff --git a/packages/conversation-store/src/reconcile.ts b/packages/conversation-store/src/reconcile.ts
index a182c1e..9a3011f 100644
--- a/packages/conversation-store/src/reconcile.ts
+++ b/packages/conversation-store/src/reconcile.ts
@@ -1,6 +1,16 @@
import type { ChatMessage, ToolCallChunk, ToolResultChunk } from "@dispatch/kernel";
-export function reconcile(messages: readonly ChatMessage[]): ChatMessage[] {
+export interface ReconcileReport {
+ readonly repairedCount: number;
+ readonly repairedToolCallIds: readonly string[];
+}
+
+export interface ReconcileResult {
+ readonly messages: ChatMessage[];
+ readonly report: ReconcileReport;
+}
+
+export function reconcileWithReport(messages: readonly ChatMessage[]): ReconcileResult {
const resolvedIds = new Set<string>();
for (const msg of messages) {
for (const chunk of msg.chunks) {
@@ -35,5 +45,15 @@ export function reconcile(messages: readonly ChatMessage[]): ChatMessage[] {
result.push({ role: "tool", chunks: [synthesized] });
}
- return result;
+ return {
+ messages: result,
+ report: {
+ repairedCount: orphaned.length,
+ repairedToolCallIds: orphaned.map((c) => c.toolCallId),
+ },
+ };
+}
+
+export function reconcile(messages: readonly ChatMessage[]): ChatMessage[] {
+ return reconcileWithReport(messages).messages;
}
diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts
index c7083e8..a82b4cc 100644
--- a/packages/conversation-store/src/store.test.ts
+++ b/packages/conversation-store/src/store.test.ts
@@ -1,7 +1,80 @@
-import type { ChatMessage, StepId, StorageNamespace, TurnMetrics } from "@dispatch/kernel";
+import type {
+ ChatMessage,
+ Logger,
+ Span,
+ StepId,
+ StorageNamespace,
+ TurnMetrics,
+} from "@dispatch/kernel";
import { beforeEach, describe, expect, it } from "vitest";
import { createConversationStore } from "./store.js";
+interface SpanEvent {
+ readonly kind: "span-open" | "span-close";
+ readonly name: string;
+ readonly attrs?: Record<string, string | number | boolean | null> | undefined;
+ readonly conversationId?: string | undefined;
+}
+
+function createCapturingLogger(): { logger: Logger; events: SpanEvent[] } {
+ const events: SpanEvent[] = [];
+
+ function createSpan(name: string, conversationId?: string | undefined): Span {
+ events.push({ kind: "span-open", name, conversationId });
+ const span: Span = {
+ id: `span_${events.length}`,
+ log: createFakeLogger(conversationId),
+ setAttributes: () => {},
+ addLink: () => {},
+ child: (childName, attrs) => {
+ const child = createSpan(childName, conversationId);
+ if (attrs !== undefined) {
+ const prev = events[events.length - 1];
+ if (prev !== undefined) {
+ events[events.length - 1] = {
+ ...prev,
+ attrs: attrs as Record<string, string | number | boolean | null>,
+ };
+ }
+ }
+ return child;
+ },
+ end: (outcome) => {
+ const attrs = outcome?.attrs as
+ | Record<string, string | number | boolean | null>
+ | undefined;
+ events.push({ kind: "span-close", name, attrs, conversationId });
+ },
+ };
+ return span;
+ }
+
+ function createFakeLogger(conversationId?: string | undefined): Logger {
+ return {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: (ctx) => createFakeLogger(ctx.conversationId ?? conversationId),
+ span: (name, attrs) => {
+ const span = createSpan(name, conversationId);
+ if (attrs !== undefined) {
+ const prev = events[events.length - 1];
+ if (prev !== undefined) {
+ events[events.length - 1] = {
+ ...prev,
+ attrs: attrs as Record<string, string | number | boolean | null>,
+ };
+ }
+ }
+ return span;
+ },
+ };
+ }
+
+ return { logger: createFakeLogger(), events };
+}
+
function createMemoryStorage(): StorageNamespace {
const data = new Map<string, string>();
return {
@@ -563,3 +636,117 @@ describe("ConversationStore metrics", () => {
expect(result[0]?.steps[1]?.genTotalMs).toBe(500);
});
});
+
+describe("ConversationStore reconcile.repair span", () => {
+ let storage: StorageNamespace;
+
+ beforeEach(() => {
+ storage = createMemoryStorage();
+ });
+
+ it("load() emits a reconcile.repair span when a dangling tool-call is repaired", async () => {
+ const { logger, events } = createCapturingLogger();
+ const store = createConversationStore(storage, logger);
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "do it" }] },
+ {
+ role: "assistant",
+ chunks: [
+ {
+ type: "tool-call",
+ toolCallId: "call_dangle",
+ toolName: "someTool",
+ input: {},
+ },
+ ],
+ },
+ ];
+ await store.append("conv_span", messages);
+ await store.load("conv_span");
+
+ const spanOpens = events.filter((e) => e.kind === "span-open" && e.name === "reconcile.repair");
+ const spanCloses = events.filter(
+ (e) => e.kind === "span-close" && e.name === "reconcile.repair",
+ );
+ expect(spanOpens).toHaveLength(1);
+ expect(spanCloses).toHaveLength(1);
+ });
+
+ it("load() emits NO reconcile.repair span when the history is already valid", async () => {
+ const { logger, events } = createCapturingLogger();
+ const store = createConversationStore(storage, logger);
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "hello" }] },
+ { role: "assistant", chunks: [{ type: "text", text: "hi" }] },
+ ];
+ await store.append("conv_valid", messages);
+ await store.load("conv_valid");
+
+ const repairSpans = events.filter((e) => e.name === "reconcile.repair");
+ expect(repairSpans).toHaveLength(0);
+ });
+
+ it("the reconcile.repair span carries conversationId + a repair count attribute", async () => {
+ const { logger, events } = createCapturingLogger();
+ const store = createConversationStore(storage, logger);
+ const messages: ChatMessage[] = [
+ {
+ role: "assistant",
+ chunks: [
+ {
+ type: "tool-call",
+ toolCallId: "call_a",
+ toolName: "toolA",
+ input: {},
+ },
+ {
+ type: "tool-call",
+ toolCallId: "call_b",
+ toolName: "toolB",
+ input: {},
+ },
+ ],
+ },
+ ];
+ await store.append("conv_multi", messages);
+ await store.load("conv_multi");
+
+ const spanOpen = events.find((e) => e.kind === "span-open" && e.name === "reconcile.repair");
+ expect(spanOpen).toBeDefined();
+ if (spanOpen === undefined) throw new Error("expected spanOpen");
+ expect(spanOpen.conversationId).toBe("conv_multi");
+ expect(spanOpen.attrs).toBeDefined();
+ if (spanOpen.attrs === undefined) throw new Error("expected attrs");
+ expect(spanOpen.attrs.repairedCount).toBe(2);
+ expect(spanOpen.attrs.firstRepairedToolCallId).toBe("call_a");
+ });
+
+ it("createConversationStore works with the logger omitted (optional)", async () => {
+ const store = createConversationStore(storage);
+ const messages: ChatMessage[] = [
+ { role: "user", chunks: [{ type: "text", text: "do it" }] },
+ {
+ role: "assistant",
+ chunks: [
+ {
+ type: "tool-call",
+ toolCallId: "call_nolog",
+ toolName: "someTool",
+ input: {},
+ },
+ ],
+ },
+ ];
+ await store.append("conv_nolog", messages);
+ const result = await store.load("conv_nolog");
+ expect(result).toHaveLength(3);
+ expect(result[2]?.role).toBe("tool");
+ const chunk = result[2]?.chunks[0];
+ if (chunk === undefined) throw new Error("expected chunk");
+ expect(chunk.type).toBe("tool-result");
+ if (chunk.type === "tool-result") {
+ expect(chunk.toolCallId).toBe("call_nolog");
+ expect(chunk.isError).toBe(true);
+ }
+ });
+});
diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts
index 080cb79..6b91d58 100644
--- a/packages/conversation-store/src/store.ts
+++ b/packages/conversation-store/src/store.ts
@@ -1,6 +1,7 @@
import type {
ChatMessage,
Chunk,
+ Logger,
Role,
StorageNamespace,
StoredChunk,
@@ -16,7 +17,7 @@ import {
parseSeq,
seqKey,
} from "./keys.js";
-import { reconcile } from "./reconcile.js";
+import { reconcileWithReport } from "./reconcile.js";
export interface ConversationStore {
readonly append: (conversationId: string, messages: readonly ChatMessage[]) => Promise<void>;
@@ -38,7 +39,10 @@ interface PersistedChunkEntry {
readonly chunkIdx: number;
}
-export function createConversationStore(storage: StorageNamespace): ConversationStore {
+export function createConversationStore(
+ storage: StorageNamespace,
+ logger?: Logger,
+): ConversationStore {
return {
async append(conversationId, messages) {
const raw = await storage.get(seqKey(conversationId));
@@ -95,7 +99,18 @@ export function createConversationStore(storage: StorageNamespace): Conversation
messages.push({ role: currentRole, chunks: currentChunks });
}
- return reconcile(messages);
+ const { messages: repaired, report } = reconcileWithReport(messages);
+
+ if (report.repairedCount > 0 && logger !== undefined) {
+ const child = logger.child({ conversationId });
+ const span = child.span("reconcile.repair", {
+ repairedCount: report.repairedCount,
+ firstRepairedToolCallId: report.repairedToolCallIds[0] ?? null,
+ });
+ span.end();
+ }
+
+ return repaired;
},
async loadSince(conversationId, sinceSeq) {
diff --git a/tasks.md b/tasks.md
index 8692551..4d7da4a 100644
--- a/tasks.md
+++ b/tasks.md
@@ -69,9 +69,6 @@ server/collector procs poison the next run's counts.
this repo; user couriers to `../dispatch-web`; ORCHESTRATOR §7).
## Open items
-- **logging-audit #1:** conversation-store has no injected logger, so a load-time
- `reconcile` repair leaves no trace. Inject a logger + emit a `reconcile.repair`
- span when conversation-store is next touched.
- **dedup / storage growth (deferred):** trace-body de-dup (D5 volume control +
`prefix.fingerprint`) + rotation/compression/retention
(`notes/observability-design.md` §6, D9). `cacheReadTokens` is the cheap dedup