summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel/src/runtime
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
committerAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
commitc48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch)
tree1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b /packages/kernel/src/runtime
parent94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff)
downloaddispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz
dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file. Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span. journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn. Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11). Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array. Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
Diffstat (limited to 'packages/kernel/src/runtime')
-rw-r--r--packages/kernel/src/runtime/dispatch.ts30
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts226
-rw-r--r--packages/kernel/src/runtime/run-turn.ts214
3 files changed, 437 insertions, 33 deletions
diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts
index 626b333..1ba0849 100644
--- a/packages/kernel/src/runtime/dispatch.ts
+++ b/packages/kernel/src/runtime/dispatch.ts
@@ -1,4 +1,5 @@
import type { ToolDispatchPolicy } from "../contracts/dispatch.js";
+import type { Logger, Span } from "../contracts/logging.js";
import type { EventEmitter } from "../contracts/runtime.js";
import type { ToolCall, ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js";
import { toolOutputEvent } from "./events.js";
@@ -15,6 +16,7 @@ export async function executeToolCall(
emit: EventEmitter,
conversationId: string,
turnId: string,
+ toolSpan?: Span,
): Promise<ToolResult> {
if (tool === undefined) {
return { content: `Unknown tool: ${call.name}`, isError: true };
@@ -28,6 +30,7 @@ export async function executeToolCall(
onOutput: (data, stream) => {
emit(toolOutputEvent(conversationId, turnId, call.id, data, stream));
},
+ log: toolSpan?.log ?? createNoopLogger(),
};
try {
return await tool.execute(call.input, ctx);
@@ -50,6 +53,7 @@ export function createStepDispatcher(
emit: EventEmitter,
conversationId: string,
turnId: string,
+ toolSpans: Map<string, Span>,
): StepDispatcher {
let activeCount = 0;
let unsafeRunning = false;
@@ -78,6 +82,7 @@ export function createStepDispatcher(
}
async function runAndResolve(entry: QueueEntry): Promise<void> {
+ const tcSpan = toolSpans.get(entry.call.id);
const result = await executeToolCall(
entry.call,
entry.tool,
@@ -85,6 +90,7 @@ export function createStepDispatcher(
emit,
conversationId,
turnId,
+ tcSpan,
);
activeCount--;
if (entry.tool?.concurrencySafe === false) unsafeRunning = false;
@@ -129,3 +135,27 @@ export function createStepDispatcher(
return { submit, drain };
}
+
+function createNoopLogger(): Logger {
+ return {
+ debug() {},
+ info() {},
+ warn() {},
+ error() {},
+ child() {
+ return createNoopLogger();
+ },
+ span() {
+ return {
+ id: "noop",
+ log: createNoopLogger(),
+ setAttributes() {},
+ addLink() {},
+ child() {
+ return this;
+ },
+ end() {},
+ };
+ },
+ };
+}
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 696a385..48f0b5a 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -1,6 +1,8 @@
import { describe, expect, it } from "vitest";
import type { ChatMessage } from "../contracts/conversation.js";
import type { AgentEvent } from "../contracts/events.js";
+import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.js";
+import { createLogger } from "../contracts/logging.js";
import type { ProviderContract, ProviderEvent } from "../contracts/provider.js";
import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js";
import { runTurn } from "./run-turn.js";
@@ -814,4 +816,228 @@ describe("runTurn", () => {
expect(outputs[1]?.stream).toBe("stderr");
}
});
+
+ describe("span instrumentation", () => {
+ function createTestLogger(): {
+ logger: Logger;
+ sink: LogSink & { records: LogRecord[] };
+ deps: LogDeps;
+ } {
+ let idCounter = 0;
+ const deps: LogDeps = {
+ now: () => 1000 + idCounter * 100,
+ newId: () => `span-${++idCounter}`,
+ };
+ const records: LogRecord[] = [];
+ const sink: LogSink & { records: LogRecord[] } = {
+ records,
+ emit: (record) => records.push(record),
+ };
+ const logger = createLogger({ extensionId: "test" }, sink, deps);
+ return { logger, sink, deps };
+ }
+
+ it("emits turn + step span open/close in order", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const spanOpens = sink.records.filter((r) => r.kind === "span-open");
+ const spanCloses = sink.records.filter((r) => r.kind === "span-close");
+
+ expect(spanOpens.length).toBeGreaterThanOrEqual(2); // turn + step
+ expect(spanCloses.length).toBeGreaterThanOrEqual(2);
+
+ const turnOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "turn");
+ const stepOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "step");
+ expect(turnOpen).toBeDefined();
+ expect(stepOpen).toBeDefined();
+
+ if (turnOpen?.kind === "span-open") {
+ expect(turnOpen.extensionId).toBe("test");
+ expect(turnOpen.attributes?.conversationId).toBe("conv-1");
+ expect(turnOpen.attributes?.turnId).toBe("turn-1");
+ }
+
+ const turnClose = spanCloses.find((r) => r.kind === "span-close" && r.name === "turn");
+ expect(turnClose).toBeDefined();
+ if (turnClose?.kind === "span-close") {
+ expect(turnClose.status).toBe("ok");
+ expect(turnClose.durationMs).toBeGreaterThanOrEqual(0);
+ }
+ });
+
+ it("emits tool-call spans for dispatched tools", async () => {
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const toolCallSpans = sink.records.filter(
+ (r) => r.kind === "span-open" && r.name === "tool-call",
+ );
+ expect(toolCallSpans).toHaveLength(1);
+ if (toolCallSpans[0]?.kind === "span-open") {
+ expect(toolCallSpans[0].attributes?.name).toBe("echo");
+ expect(toolCallSpans[0].attributes?.toolCallId).toBe("tc1");
+ }
+
+ const toolCallCloses = sink.records.filter(
+ (r) => r.kind === "span-close" && r.name === "tool-call",
+ );
+ expect(toolCallCloses).toHaveLength(1);
+ if (toolCallCloses[0]?.kind === "span-close") {
+ expect(toolCallCloses[0].status).toBe("ok");
+ }
+ });
+
+ it("tools receive ctx.log (correlated logger)", async () => {
+ let capturedLog: Logger | undefined;
+
+ const tool = createFakeTool("logtest", async (_input, ctx) => {
+ capturedLog = ctx.log;
+ ctx.log.info("tool ran", { key: "value" });
+ return { content: "ok" };
+ });
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "logtest", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ expect(capturedLog).toBeDefined();
+
+ const toolLogs = sink.records.filter(
+ (r) => r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "tool ran",
+ );
+ expect(toolLogs).toHaveLength(1);
+ if (toolLogs[0]?.kind === "log") {
+ expect(toolLogs[0].attributes?.key).toBe("value");
+ expect(toolLogs[0].extensionId).toBe("test");
+ }
+ });
+
+ it("an aborted turn still closes its turn span", async () => {
+ const ac = new AbortController();
+ ac.abort();
+
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "should not appear" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ signal: ac.signal,
+ logger,
+ });
+
+ const turnCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "turn");
+ expect(turnCloses).toHaveLength(1);
+ if (turnCloses[0]?.kind === "span-close") {
+ expect(turnCloses[0].attributes?.finishReason).toBe("aborted");
+ }
+ });
+
+ it("a provider error closes the step span with error status", async () => {
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ throw new Error("provider exploded");
+ })();
+ },
+ };
+
+ const { logger, sink } = createTestLogger();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ expect(result.finishReason).toBe("error");
+
+ const stepCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "step");
+ expect(stepCloses).toHaveLength(1);
+ if (stepCloses[0]?.kind === "span-close") {
+ expect(stepCloses[0].status).toBe("error");
+ expect(stepCloses[0].attributes?.["error.message"]).toContain("provider exploded");
+ }
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 9421d86..0f42ef3 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -1,4 +1,5 @@
import type { ChatMessage, Chunk } from "../contracts/conversation.js";
+import type { Logger, Span } from "../contracts/logging.js";
import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js";
import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js";
import type { ToolCall, ToolContract } from "../contracts/tool.js";
@@ -76,6 +77,8 @@ interface StepContext {
readonly signal: AbortSignal;
readonly conversationId: string;
readonly turnId: string;
+ readonly logger: Logger;
+ readonly toolSpans: Map<string, Span>;
}
interface StepResult {
@@ -124,6 +127,18 @@ function processEvent(
event.input,
),
);
+
+ // Open a tool-call span (attrs: name, toolCallId)
+ try {
+ const tcSpan = ctx.logger.span("tool-call", {
+ name: event.toolName,
+ toolCallId: event.toolCallId,
+ });
+ ctx.toolSpans.set(event.toolCallId, tcSpan);
+ } catch {
+ // Swallow — D7: logging never breaks the turn.
+ }
+
if (ctx.dispatch.eager) {
dispatcher.submit(call);
}
@@ -151,6 +166,26 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
let stepUsage = zeroUsage();
let finishReason = "stop";
+ // Open a step span with the verbatim pre-mutation prompt in its body (BEFORE capture).
+ let stepSpan: Span | undefined;
+ try {
+ stepSpan = ctx.logger.span("step");
+ // Emit the verbatim pre-mutation prompt as a log record on the step span's logger.
+ // This is the "BEFORE" capture — the messages + tools as handed to provider.stream.
+ stepSpan.log.info("prompt:before", {
+ "prompt.messages": JSON.stringify(ctx.messages),
+ "prompt.tools": JSON.stringify(
+ ctx.tools.map((t) => ({
+ name: t.name,
+ description: t.description,
+ parameters: t.parameters,
+ })),
+ ),
+ });
+ } catch {
+ // Swallow — D7.
+ }
+
const dispatcher = createStepDispatcher(
ctx.toolMap,
ctx.dispatch,
@@ -158,6 +193,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
ctx.emit,
ctx.conversationId,
ctx.turnId,
+ ctx.toolSpans,
);
try {
@@ -177,6 +213,13 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
chunks.push({ type: "error", message });
ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message));
finishReason = "error";
+ // Close step span with error
+ try {
+ stepSpan?.end({ err });
+ } catch {
+ // Swallow — D7.
+ }
+ stepSpan = undefined;
}
if (!ctx.dispatch.eager) {
@@ -187,6 +230,25 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const results = await dispatcher.drain();
+ // Close remaining tool-call spans
+ for (const call of toolCalls) {
+ const tcSpan = ctx.toolSpans.get(call.id);
+ if (tcSpan !== undefined) {
+ const result = results.get(call.id);
+ try {
+ tcSpan.end({
+ attrs: {
+ isError: result?.isError ?? false,
+ contentLength: result?.content.length ?? 0,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
+ ctx.toolSpans.delete(call.id);
+ }
+ }
+
const toolMessages: ChatMessage[] = [];
for (const call of toolCalls) {
const result = results.get(call.id);
@@ -217,6 +279,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
}
}
+ // Close step span (if not already closed by error)
+ if (stepSpan !== undefined) {
+ try {
+ stepSpan.end({
+ attrs: {
+ finishReason,
+ usage_inputTokens: stepUsage.inputTokens,
+ usage_outputTokens: stepUsage.outputTokens,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
+ }
+
const assistantMessage: ChatMessage | undefined =
chunks.length > 0 ? { role: "assistant", chunks } : undefined;
@@ -237,51 +314,122 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
const conversationId = input.conversationId;
const turnId = input.turnId;
const signal = input.signal ?? new AbortController().signal;
+ const logger = input.logger;
- for (let step = 0; step < MAX_STEPS; step++) {
- if (signal.aborted) {
- finishReason = "aborted";
- break;
+ // Open a turn span (attrs: conversationId, turnId, model)
+ let turnSpan: Span | undefined;
+ if (logger !== undefined) {
+ try {
+ turnSpan = logger.span("turn", {
+ conversationId,
+ turnId,
+ model: input.providerOpts?.model ?? input.provider.id,
+ });
+ } catch {
+ // Swallow — D7.
}
+ }
- const stepResult = await executeStep({
- provider: input.provider,
- messages,
- tools: input.tools,
- toolMap,
- dispatch: input.dispatch,
- emit: input.emit,
- signal,
- conversationId,
- turnId,
- });
+ // Track open tool-call spans across steps so we can close them on abort
+ const toolSpans = new Map<string, Span>();
+
+ try {
+ for (let step = 0; step < MAX_STEPS; step++) {
+ if (signal.aborted) {
+ finishReason = "aborted";
+ break;
+ }
- totalUsage = addUsage(totalUsage, stepResult.usage);
+ const stepResult = await executeStep({
+ provider: input.provider,
+ messages,
+ tools: input.tools,
+ toolMap,
+ dispatch: input.dispatch,
+ emit: input.emit,
+ signal,
+ conversationId,
+ turnId,
+ logger: turnSpan?.log ?? logger ?? createNoopLogger(),
+ toolSpans,
+ });
- if (stepResult.assistantMessage !== undefined) {
- messages.push(stepResult.assistantMessage);
- resultMessages.push(stepResult.assistantMessage);
- }
+ totalUsage = addUsage(totalUsage, stepResult.usage);
- for (const msg of stepResult.toolMessages) {
- messages.push(msg);
- resultMessages.push(msg);
- }
+ if (stepResult.assistantMessage !== undefined) {
+ messages.push(stepResult.assistantMessage);
+ resultMessages.push(stepResult.assistantMessage);
+ }
- if (signal.aborted) {
- finishReason = "aborted";
- break;
- }
+ for (const msg of stepResult.toolMessages) {
+ messages.push(msg);
+ resultMessages.push(msg);
+ }
- if (stepResult.toolCalls.length === 0) {
- finishReason = stepResult.finishReason;
- break;
+ if (signal.aborted) {
+ finishReason = "aborted";
+ break;
+ }
+
+ if (stepResult.toolCalls.length === 0) {
+ finishReason = stepResult.finishReason;
+ break;
+ }
+
+ if (step === MAX_STEPS - 1) {
+ finishReason = "max-steps";
+ }
+ }
+ } finally {
+ // Close any orphaned tool-call spans (e.g. abort mid-tool)
+ for (const [id, tcSpan] of toolSpans) {
+ try {
+ tcSpan.end({ attrs: { orphaned: true } });
+ } catch {
+ // Swallow — D7.
+ }
+ toolSpans.delete(id);
}
- if (step === MAX_STEPS - 1) {
- finishReason = "max-steps";
+ // Close the turn span
+ if (turnSpan !== undefined) {
+ try {
+ turnSpan.end({
+ attrs: {
+ finishReason,
+ usage_inputTokens: totalUsage.inputTokens,
+ usage_outputTokens: totalUsage.outputTokens,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
}
}
return { messages: resultMessages, usage: totalUsage, finishReason };
}
+
+function createNoopLogger(): Logger {
+ return {
+ debug() {},
+ info() {},
+ warn() {},
+ error() {},
+ child() {
+ return createNoopLogger();
+ },
+ span() {
+ return {
+ id: "noop",
+ log: createNoopLogger(),
+ setAttributes() {},
+ addLink() {},
+ child() {
+ return this;
+ },
+ end() {},
+ };
+ },
+ };
+}