diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
| commit | c48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch) | |
| tree | 1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b /packages/kernel/src/runtime | |
| parent | 94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff) | |
| download | dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip | |
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file.
Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span.
journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn.
Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11).
Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array.
Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
Diffstat (limited to 'packages/kernel/src/runtime')
| -rw-r--r-- | packages/kernel/src/runtime/dispatch.ts | 30 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 226 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 214 |
3 files changed, 437 insertions, 33 deletions
diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts index 626b333..1ba0849 100644 --- a/packages/kernel/src/runtime/dispatch.ts +++ b/packages/kernel/src/runtime/dispatch.ts @@ -1,4 +1,5 @@ import type { ToolDispatchPolicy } from "../contracts/dispatch.js"; +import type { Logger, Span } from "../contracts/logging.js"; import type { EventEmitter } from "../contracts/runtime.js"; import type { ToolCall, ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { toolOutputEvent } from "./events.js"; @@ -15,6 +16,7 @@ export async function executeToolCall( emit: EventEmitter, conversationId: string, turnId: string, + toolSpan?: Span, ): Promise<ToolResult> { if (tool === undefined) { return { content: `Unknown tool: ${call.name}`, isError: true }; @@ -28,6 +30,7 @@ export async function executeToolCall( onOutput: (data, stream) => { emit(toolOutputEvent(conversationId, turnId, call.id, data, stream)); }, + log: toolSpan?.log ?? createNoopLogger(), }; try { return await tool.execute(call.input, ctx); @@ -50,6 +53,7 @@ export function createStepDispatcher( emit: EventEmitter, conversationId: string, turnId: string, + toolSpans: Map<string, Span>, ): StepDispatcher { let activeCount = 0; let unsafeRunning = false; @@ -78,6 +82,7 @@ export function createStepDispatcher( } async function runAndResolve(entry: QueueEntry): Promise<void> { + const tcSpan = toolSpans.get(entry.call.id); const result = await executeToolCall( entry.call, entry.tool, @@ -85,6 +90,7 @@ export function createStepDispatcher( emit, conversationId, turnId, + tcSpan, ); activeCount--; if (entry.tool?.concurrencySafe === false) unsafeRunning = false; @@ -129,3 +135,27 @@ export function createStepDispatcher( return { submit, drain }; } + +function createNoopLogger(): Logger { + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; +} diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 696a385..48f0b5a 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -1,6 +1,8 @@ import { describe, expect, it } from "vitest"; import type { ChatMessage } from "../contracts/conversation.js"; import type { AgentEvent } from "../contracts/events.js"; +import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.js"; +import { createLogger } from "../contracts/logging.js"; import type { ProviderContract, ProviderEvent } from "../contracts/provider.js"; import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { runTurn } from "./run-turn.js"; @@ -814,4 +816,228 @@ describe("runTurn", () => { expect(outputs[1]?.stream).toBe("stderr"); } }); + + describe("span instrumentation", () => { + function createTestLogger(): { + logger: Logger; + sink: LogSink & { records: LogRecord[] }; + deps: LogDeps; + } { + let idCounter = 0; + const deps: LogDeps = { + now: () => 1000 + idCounter * 100, + newId: () => `span-${++idCounter}`, + }; + const records: LogRecord[] = []; + const sink: LogSink & { records: LogRecord[] } = { + records, + emit: (record) => records.push(record), + }; + const logger = createLogger({ extensionId: "test" }, sink, deps); + return { logger, sink, deps }; + } + + it("emits turn + step span open/close in order", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const spanOpens = sink.records.filter((r) => r.kind === "span-open"); + const spanCloses = sink.records.filter((r) => r.kind === "span-close"); + + expect(spanOpens.length).toBeGreaterThanOrEqual(2); // turn + step + expect(spanCloses.length).toBeGreaterThanOrEqual(2); + + const turnOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "turn"); + const stepOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "step"); + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + + if (turnOpen?.kind === "span-open") { + expect(turnOpen.extensionId).toBe("test"); + expect(turnOpen.attributes?.conversationId).toBe("conv-1"); + expect(turnOpen.attributes?.turnId).toBe("turn-1"); + } + + const turnClose = spanCloses.find((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.status).toBe("ok"); + expect(turnClose.durationMs).toBeGreaterThanOrEqual(0); + } + }); + + it("emits tool-call spans for dispatched tools", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const toolCallSpans = sink.records.filter( + (r) => r.kind === "span-open" && r.name === "tool-call", + ); + expect(toolCallSpans).toHaveLength(1); + if (toolCallSpans[0]?.kind === "span-open") { + expect(toolCallSpans[0].attributes?.name).toBe("echo"); + expect(toolCallSpans[0].attributes?.toolCallId).toBe("tc1"); + } + + const toolCallCloses = sink.records.filter( + (r) => r.kind === "span-close" && r.name === "tool-call", + ); + expect(toolCallCloses).toHaveLength(1); + if (toolCallCloses[0]?.kind === "span-close") { + expect(toolCallCloses[0].status).toBe("ok"); + } + }); + + it("tools receive ctx.log (correlated logger)", async () => { + let capturedLog: Logger | undefined; + + const tool = createFakeTool("logtest", async (_input, ctx) => { + capturedLog = ctx.log; + ctx.log.info("tool ran", { key: "value" }); + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "logtest", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedLog).toBeDefined(); + + const toolLogs = sink.records.filter( + (r) => r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "tool ran", + ); + expect(toolLogs).toHaveLength(1); + if (toolLogs[0]?.kind === "log") { + expect(toolLogs[0].attributes?.key).toBe("value"); + expect(toolLogs[0].extensionId).toBe("test"); + } + }); + + it("an aborted turn still closes its turn span", async () => { + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + signal: ac.signal, + logger, + }); + + const turnCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnCloses).toHaveLength(1); + if (turnCloses[0]?.kind === "span-close") { + expect(turnCloses[0].attributes?.finishReason).toBe("aborted"); + } + }); + + it("a provider error closes the step span with error status", async () => { + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("provider exploded"); + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(result.finishReason).toBe("error"); + + const stepCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "step"); + expect(stepCloses).toHaveLength(1); + if (stepCloses[0]?.kind === "span-close") { + expect(stepCloses[0].status).toBe("error"); + expect(stepCloses[0].attributes?.["error.message"]).toContain("provider exploded"); + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 9421d86..0f42ef3 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -1,4 +1,5 @@ import type { ChatMessage, Chunk } from "../contracts/conversation.js"; +import type { Logger, Span } from "../contracts/logging.js"; import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js"; import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js"; import type { ToolCall, ToolContract } from "../contracts/tool.js"; @@ -76,6 +77,8 @@ interface StepContext { readonly signal: AbortSignal; readonly conversationId: string; readonly turnId: string; + readonly logger: Logger; + readonly toolSpans: Map<string, Span>; } interface StepResult { @@ -124,6 +127,18 @@ function processEvent( event.input, ), ); + + // Open a tool-call span (attrs: name, toolCallId) + try { + const tcSpan = ctx.logger.span("tool-call", { + name: event.toolName, + toolCallId: event.toolCallId, + }); + ctx.toolSpans.set(event.toolCallId, tcSpan); + } catch { + // Swallow — D7: logging never breaks the turn. + } + if (ctx.dispatch.eager) { dispatcher.submit(call); } @@ -151,6 +166,26 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { let stepUsage = zeroUsage(); let finishReason = "stop"; + // Open a step span with the verbatim pre-mutation prompt in its body (BEFORE capture). + let stepSpan: Span | undefined; + try { + stepSpan = ctx.logger.span("step"); + // Emit the verbatim pre-mutation prompt as a log record on the step span's logger. + // This is the "BEFORE" capture — the messages + tools as handed to provider.stream. + stepSpan.log.info("prompt:before", { + "prompt.messages": JSON.stringify(ctx.messages), + "prompt.tools": JSON.stringify( + ctx.tools.map((t) => ({ + name: t.name, + description: t.description, + parameters: t.parameters, + })), + ), + }); + } catch { + // Swallow — D7. + } + const dispatcher = createStepDispatcher( ctx.toolMap, ctx.dispatch, @@ -158,6 +193,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ctx.emit, ctx.conversationId, ctx.turnId, + ctx.toolSpans, ); try { @@ -177,6 +213,13 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { chunks.push({ type: "error", message }); ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message)); finishReason = "error"; + // Close step span with error + try { + stepSpan?.end({ err }); + } catch { + // Swallow — D7. + } + stepSpan = undefined; } if (!ctx.dispatch.eager) { @@ -187,6 +230,25 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const results = await dispatcher.drain(); + // Close remaining tool-call spans + for (const call of toolCalls) { + const tcSpan = ctx.toolSpans.get(call.id); + if (tcSpan !== undefined) { + const result = results.get(call.id); + try { + tcSpan.end({ + attrs: { + isError: result?.isError ?? false, + contentLength: result?.content.length ?? 0, + }, + }); + } catch { + // Swallow — D7. + } + ctx.toolSpans.delete(call.id); + } + } + const toolMessages: ChatMessage[] = []; for (const call of toolCalls) { const result = results.get(call.id); @@ -217,6 +279,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { } } + // Close step span (if not already closed by error) + if (stepSpan !== undefined) { + try { + stepSpan.end({ + attrs: { + finishReason, + usage_inputTokens: stepUsage.inputTokens, + usage_outputTokens: stepUsage.outputTokens, + }, + }); + } catch { + // Swallow — D7. + } + } + const assistantMessage: ChatMessage | undefined = chunks.length > 0 ? { role: "assistant", chunks } : undefined; @@ -237,51 +314,122 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { const conversationId = input.conversationId; const turnId = input.turnId; const signal = input.signal ?? new AbortController().signal; + const logger = input.logger; - for (let step = 0; step < MAX_STEPS; step++) { - if (signal.aborted) { - finishReason = "aborted"; - break; + // Open a turn span (attrs: conversationId, turnId, model) + let turnSpan: Span | undefined; + if (logger !== undefined) { + try { + turnSpan = logger.span("turn", { + conversationId, + turnId, + model: input.providerOpts?.model ?? input.provider.id, + }); + } catch { + // Swallow — D7. } + } - const stepResult = await executeStep({ - provider: input.provider, - messages, - tools: input.tools, - toolMap, - dispatch: input.dispatch, - emit: input.emit, - signal, - conversationId, - turnId, - }); + // Track open tool-call spans across steps so we can close them on abort + const toolSpans = new Map<string, Span>(); + + try { + for (let step = 0; step < MAX_STEPS; step++) { + if (signal.aborted) { + finishReason = "aborted"; + break; + } - totalUsage = addUsage(totalUsage, stepResult.usage); + const stepResult = await executeStep({ + provider: input.provider, + messages, + tools: input.tools, + toolMap, + dispatch: input.dispatch, + emit: input.emit, + signal, + conversationId, + turnId, + logger: turnSpan?.log ?? logger ?? createNoopLogger(), + toolSpans, + }); - if (stepResult.assistantMessage !== undefined) { - messages.push(stepResult.assistantMessage); - resultMessages.push(stepResult.assistantMessage); - } + totalUsage = addUsage(totalUsage, stepResult.usage); - for (const msg of stepResult.toolMessages) { - messages.push(msg); - resultMessages.push(msg); - } + if (stepResult.assistantMessage !== undefined) { + messages.push(stepResult.assistantMessage); + resultMessages.push(stepResult.assistantMessage); + } - if (signal.aborted) { - finishReason = "aborted"; - break; - } + for (const msg of stepResult.toolMessages) { + messages.push(msg); + resultMessages.push(msg); + } - if (stepResult.toolCalls.length === 0) { - finishReason = stepResult.finishReason; - break; + if (signal.aborted) { + finishReason = "aborted"; + break; + } + + if (stepResult.toolCalls.length === 0) { + finishReason = stepResult.finishReason; + break; + } + + if (step === MAX_STEPS - 1) { + finishReason = "max-steps"; + } + } + } finally { + // Close any orphaned tool-call spans (e.g. abort mid-tool) + for (const [id, tcSpan] of toolSpans) { + try { + tcSpan.end({ attrs: { orphaned: true } }); + } catch { + // Swallow — D7. + } + toolSpans.delete(id); } - if (step === MAX_STEPS - 1) { - finishReason = "max-steps"; + // Close the turn span + if (turnSpan !== undefined) { + try { + turnSpan.end({ + attrs: { + finishReason, + usage_inputTokens: totalUsage.inputTokens, + usage_outputTokens: totalUsage.outputTokens, + }, + }); + } catch { + // Swallow — D7. + } } } return { messages: resultMessages, usage: totalUsage, finishReason }; } + +function createNoopLogger(): Logger { + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; +} |
