summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel/src/runtime/run-turn.ts
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
committerAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
commitc48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch)
tree1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b /packages/kernel/src/runtime/run-turn.ts
parent94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff)
downloaddispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz
dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file. Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span. journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn. Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11). Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array. Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
Diffstat (limited to 'packages/kernel/src/runtime/run-turn.ts')
-rw-r--r--packages/kernel/src/runtime/run-turn.ts214
1 files changed, 181 insertions, 33 deletions
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 9421d86..0f42ef3 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -1,4 +1,5 @@
import type { ChatMessage, Chunk } from "../contracts/conversation.js";
+import type { Logger, Span } from "../contracts/logging.js";
import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js";
import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js";
import type { ToolCall, ToolContract } from "../contracts/tool.js";
@@ -76,6 +77,8 @@ interface StepContext {
readonly signal: AbortSignal;
readonly conversationId: string;
readonly turnId: string;
+ readonly logger: Logger;
+ readonly toolSpans: Map<string, Span>;
}
interface StepResult {
@@ -124,6 +127,18 @@ function processEvent(
event.input,
),
);
+
+ // Open a tool-call span (attrs: name, toolCallId)
+ try {
+ const tcSpan = ctx.logger.span("tool-call", {
+ name: event.toolName,
+ toolCallId: event.toolCallId,
+ });
+ ctx.toolSpans.set(event.toolCallId, tcSpan);
+ } catch {
+ // Swallow — D7: logging never breaks the turn.
+ }
+
if (ctx.dispatch.eager) {
dispatcher.submit(call);
}
@@ -151,6 +166,26 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
let stepUsage = zeroUsage();
let finishReason = "stop";
+ // Open a step span with the verbatim pre-mutation prompt in its body (BEFORE capture).
+ let stepSpan: Span | undefined;
+ try {
+ stepSpan = ctx.logger.span("step");
+ // Emit the verbatim pre-mutation prompt as a log record on the step span's logger.
+ // This is the "BEFORE" capture — the messages + tools as handed to provider.stream.
+ stepSpan.log.info("prompt:before", {
+ "prompt.messages": JSON.stringify(ctx.messages),
+ "prompt.tools": JSON.stringify(
+ ctx.tools.map((t) => ({
+ name: t.name,
+ description: t.description,
+ parameters: t.parameters,
+ })),
+ ),
+ });
+ } catch {
+ // Swallow — D7.
+ }
+
const dispatcher = createStepDispatcher(
ctx.toolMap,
ctx.dispatch,
@@ -158,6 +193,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
ctx.emit,
ctx.conversationId,
ctx.turnId,
+ ctx.toolSpans,
);
try {
@@ -177,6 +213,13 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
chunks.push({ type: "error", message });
ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message));
finishReason = "error";
+ // Close step span with error
+ try {
+ stepSpan?.end({ err });
+ } catch {
+ // Swallow — D7.
+ }
+ stepSpan = undefined;
}
if (!ctx.dispatch.eager) {
@@ -187,6 +230,25 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const results = await dispatcher.drain();
+ // Close remaining tool-call spans
+ for (const call of toolCalls) {
+ const tcSpan = ctx.toolSpans.get(call.id);
+ if (tcSpan !== undefined) {
+ const result = results.get(call.id);
+ try {
+ tcSpan.end({
+ attrs: {
+ isError: result?.isError ?? false,
+ contentLength: result?.content.length ?? 0,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
+ ctx.toolSpans.delete(call.id);
+ }
+ }
+
const toolMessages: ChatMessage[] = [];
for (const call of toolCalls) {
const result = results.get(call.id);
@@ -217,6 +279,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
}
}
+ // Close step span (if not already closed by error)
+ if (stepSpan !== undefined) {
+ try {
+ stepSpan.end({
+ attrs: {
+ finishReason,
+ usage_inputTokens: stepUsage.inputTokens,
+ usage_outputTokens: stepUsage.outputTokens,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
+ }
+
const assistantMessage: ChatMessage | undefined =
chunks.length > 0 ? { role: "assistant", chunks } : undefined;
@@ -237,51 +314,122 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
const conversationId = input.conversationId;
const turnId = input.turnId;
const signal = input.signal ?? new AbortController().signal;
+ const logger = input.logger;
- for (let step = 0; step < MAX_STEPS; step++) {
- if (signal.aborted) {
- finishReason = "aborted";
- break;
+ // Open a turn span (attrs: conversationId, turnId, model)
+ let turnSpan: Span | undefined;
+ if (logger !== undefined) {
+ try {
+ turnSpan = logger.span("turn", {
+ conversationId,
+ turnId,
+ model: input.providerOpts?.model ?? input.provider.id,
+ });
+ } catch {
+ // Swallow — D7.
}
+ }
- const stepResult = await executeStep({
- provider: input.provider,
- messages,
- tools: input.tools,
- toolMap,
- dispatch: input.dispatch,
- emit: input.emit,
- signal,
- conversationId,
- turnId,
- });
+ // Track open tool-call spans across steps so we can close them on abort
+ const toolSpans = new Map<string, Span>();
+
+ try {
+ for (let step = 0; step < MAX_STEPS; step++) {
+ if (signal.aborted) {
+ finishReason = "aborted";
+ break;
+ }
- totalUsage = addUsage(totalUsage, stepResult.usage);
+ const stepResult = await executeStep({
+ provider: input.provider,
+ messages,
+ tools: input.tools,
+ toolMap,
+ dispatch: input.dispatch,
+ emit: input.emit,
+ signal,
+ conversationId,
+ turnId,
+ logger: turnSpan?.log ?? logger ?? createNoopLogger(),
+ toolSpans,
+ });
- if (stepResult.assistantMessage !== undefined) {
- messages.push(stepResult.assistantMessage);
- resultMessages.push(stepResult.assistantMessage);
- }
+ totalUsage = addUsage(totalUsage, stepResult.usage);
- for (const msg of stepResult.toolMessages) {
- messages.push(msg);
- resultMessages.push(msg);
- }
+ if (stepResult.assistantMessage !== undefined) {
+ messages.push(stepResult.assistantMessage);
+ resultMessages.push(stepResult.assistantMessage);
+ }
- if (signal.aborted) {
- finishReason = "aborted";
- break;
- }
+ for (const msg of stepResult.toolMessages) {
+ messages.push(msg);
+ resultMessages.push(msg);
+ }
- if (stepResult.toolCalls.length === 0) {
- finishReason = stepResult.finishReason;
- break;
+ if (signal.aborted) {
+ finishReason = "aborted";
+ break;
+ }
+
+ if (stepResult.toolCalls.length === 0) {
+ finishReason = stepResult.finishReason;
+ break;
+ }
+
+ if (step === MAX_STEPS - 1) {
+ finishReason = "max-steps";
+ }
+ }
+ } finally {
+ // Close any orphaned tool-call spans (e.g. abort mid-tool)
+ for (const [id, tcSpan] of toolSpans) {
+ try {
+ tcSpan.end({ attrs: { orphaned: true } });
+ } catch {
+ // Swallow — D7.
+ }
+ toolSpans.delete(id);
}
- if (step === MAX_STEPS - 1) {
- finishReason = "max-steps";
+ // Close the turn span
+ if (turnSpan !== undefined) {
+ try {
+ turnSpan.end({
+ attrs: {
+ finishReason,
+ usage_inputTokens: totalUsage.inputTokens,
+ usage_outputTokens: totalUsage.outputTokens,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
}
}
return { messages: resultMessages, usage: totalUsage, finishReason };
}
+
+function createNoopLogger(): Logger {
+ return {
+ debug() {},
+ info() {},
+ warn() {},
+ error() {},
+ child() {
+ return createNoopLogger();
+ },
+ span() {
+ return {
+ id: "noop",
+ log: createNoopLogger(),
+ setAttributes() {},
+ addLink() {},
+ child() {
+ return this;
+ },
+ end() {},
+ };
+ },
+ };
+}