diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
| commit | c48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch) | |
| tree | 1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b /packages/kernel | |
| parent | 94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff) | |
| download | dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip | |
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file.
Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span.
journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn.
Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11).
Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array.
Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
Diffstat (limited to 'packages/kernel')
| -rw-r--r-- | packages/kernel/src/bus/bus.test.ts | 23 | ||||
| -rw-r--r-- | packages/kernel/src/bus/pure.ts | 8 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/extension.ts | 14 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 20 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/logging.ts | 461 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 8 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/tool.ts | 9 | ||||
| -rw-r--r-- | packages/kernel/src/host/host.test.ts | 229 | ||||
| -rw-r--r-- | packages/kernel/src/host/host.ts | 21 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/dispatch.ts | 30 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 226 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 214 |
12 files changed, 1192 insertions, 71 deletions
diff --git a/packages/kernel/src/bus/bus.test.ts b/packages/kernel/src/bus/bus.test.ts index 7366b50..05cf875 100644 --- a/packages/kernel/src/bus/bus.test.ts +++ b/packages/kernel/src/bus/bus.test.ts @@ -1,6 +1,6 @@ import { beforeEach, describe, expect, it } from "vitest"; -import type { Logger } from "../contracts/extension.js"; import { defineEventHook, defineFilter, defineService } from "../contracts/hooks.js"; +import type { Logger, Span } from "../contracts/logging.js"; import { type Bus, createBus } from "./bus.js"; import { applyFilterChain, dispatchEventSync, sortFilters } from "./pure.js"; @@ -10,15 +10,30 @@ interface FakeLogger extends Logger { function createFakeLogger(): FakeLogger { const errors: Array<{ message: string; args: unknown[] }> = []; - return { + const logger: FakeLogger = { errors, debug: () => {}, info: () => {}, warn: () => {}, - error: (message: string, ...args: unknown[]) => { - errors.push({ message, args }); + error: (message, attrs) => { + errors.push({ message, args: attrs === undefined ? [] : [attrs] }); }, + child: () => logger, + span: () => makeNoopSpan(logger), }; + return logger; +} + +function makeNoopSpan(log: Logger): Span { + const span: Span = { + id: "noop", + log, + setAttributes: () => {}, + addLink: () => {}, + child: () => span, + end: () => {}, + }; + return span; } describe("event hooks", () => { diff --git a/packages/kernel/src/bus/pure.ts b/packages/kernel/src/bus/pure.ts index 7cd9143..4d90fc6 100644 --- a/packages/kernel/src/bus/pure.ts +++ b/packages/kernel/src/bus/pure.ts @@ -12,11 +12,11 @@ export function dispatchEventSync<T>( const result = handler(payload); if (result instanceof Promise) { result.catch((err: unknown) => { - logger.error(`Event hook "${hookId}" handler rejected`, err); + logger.error(`Event hook "${hookId}" handler rejected`, { err }); }); } } catch (err) { - logger.error(`Event hook "${hookId}" handler threw`, err); + logger.error(`Event hook "${hookId}" handler threw`, { err }); } } } @@ -32,7 +32,7 @@ export async function dispatchEventAsync<T>( try { await handler(payload); } catch (err) { - logger.error(`Event hook "${hookId}" handler threw`, err); + logger.error(`Event hook "${hookId}" handler threw`, { err }); } }); @@ -76,7 +76,7 @@ export async function applyFilterChain<T>( current = await fn(current); } catch (err) { if (failClosed) throw err; - logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, err); + logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, { err }); } } return current; diff --git a/packages/kernel/src/contracts/extension.ts b/packages/kernel/src/contracts/extension.ts index 424f714..00b41f1 100644 --- a/packages/kernel/src/contracts/extension.ts +++ b/packages/kernel/src/contracts/extension.ts @@ -15,6 +15,10 @@ import type { FilterHandler, ServiceHandle, } from "./hooks.js"; +import type { Logger } from "./logging.js"; + +export type { Logger } from "./logging.js"; + import type { ProviderContract } from "./provider.js"; import type { ToolContract } from "./tool.js"; @@ -132,15 +136,7 @@ export interface ScheduledJob { readonly execute: () => void | Promise<void>; } -// --- Logger --- - -/** Logger interface available to every extension via the Host API. */ -export interface Logger { - readonly debug: (message: string, ...args: unknown[]) => void; - readonly info: (message: string, ...args: unknown[]) => void; - readonly warn: (message: string, ...args: unknown[]) => void; - readonly error: (message: string, ...args: unknown[]) => void; -} +// --- Logger is re-exported from logging.ts (structured, correlated) --- // --- Config --- diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index 0b578cb..e4eba87 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -45,7 +45,6 @@ export type { EventsEmitter, Extension, HostAPI, - Logger, Manifest, ManifestCapabilities, ManifestContributions, @@ -57,7 +56,6 @@ export type { StorageNamespace, TrustLevel, } from "./extension.js"; - export type { EventHandler, EventHookDescriptor, @@ -66,9 +64,25 @@ export type { HookDescriptor, ServiceHandle, } from "./hooks.js"; - export { defineEventHook, defineFilter, defineService } from "./hooks.js"; export type { + Attributes, + ErrorAttributes, + Level, + LogContext, + LogDeps, + Logger, + LogLineRecord, + LogRecord, + LogSink, + Span, + SpanCloseRecord, + SpanLink, + SpanOpenRecord, + SpanStatus, +} from "./logging.js"; +export { createLogger } from "./logging.js"; +export type { FinishEvent, ProviderContract, ProviderErrorEvent, diff --git a/packages/kernel/src/contracts/logging.ts b/packages/kernel/src/contracts/logging.ts new file mode 100644 index 0000000..8e1eef3 --- /dev/null +++ b/packages/kernel/src/contracts/logging.ts @@ -0,0 +1,461 @@ +/** + * Logging contract — structured, correlated, span-capable Logger/Span ABI. + * + * The kernel owns types + pure record-builders. NO I/O — the LogSink is + * injected by the host-bin. Logger/Span mint records and call sink.emit; + * sink errors are swallowed (D7 — the turn is sovereign). + * + * Key properties: + * - P3-safe: correlation flows via explicit child()/span() values, no ambient. + * - P2: record-builder is pure ({ now, newId } injected). + * - D3: spans emitted incrementally (open at span(), close at end()). + * - D6: extensionId auto-stamped by host, not caller-supplied. + * - Flat scalar attributes (serializable D3, queryable D9). + */ + +// --- Levels --- + +export type Level = "debug" | "info" | "warn" | "error"; + +// --- Attributes --- + +/** Flat, serializable key/value pairs. Caller stringifies nested objects. */ +export type Attributes = Readonly<Record<string, string | number | boolean | null>>; + +// --- LogContext (correlation) --- + +/** Correlation context carried on every log record and span. */ +export interface LogContext { + /** Auto-stamped by host from manifest.id (D6) — never caller-supplied. */ + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly spanId?: string; + readonly parentSpanId?: string; +} + +// --- Span --- + +/** + * A timed unit of work within a trace. Opened via `logger.span(name)`, + * closed via `span.end()`. Emits incremental open/close records so a + * crashed turn is reconstructable from the journal (D3). + */ +export interface Span { + readonly id: string; + /** Pre-bound Logger scoped to this span's correlation. */ + readonly log: Logger; + /** Add or overwrite attributes on this span. */ + readonly setAttributes: (attrs: Attributes) => void; + /** Record a causal link to another span (D4 cross-feature causality). */ + readonly addLink: ( + target: { readonly spanId: string; readonly turnId?: string }, + reason?: string, + ) => void; + /** Open a child span nested under this one. */ + readonly child: (name: string, attrs?: Attributes) => Span; + /** + * Close this span. Records duration + status. Optionally records an + * error and/or additional attributes. + */ + readonly end: (outcome?: { readonly err?: unknown; readonly attrs?: Attributes }) => void; +} + +// --- Logger --- + +/** + * Structured, correlated logger. The host auto-scopes each extension's + * logger with its manifest.id as extensionId (D6). + * + * `info("msg")` must still compile — attrs is optional (backward compat). + */ +export interface Logger { + readonly debug: (msg: string, attrs?: Attributes) => void; + readonly info: (msg: string, attrs?: Attributes) => void; + readonly warn: (msg: string, attrs?: Attributes) => void; + readonly error: (msg: string, attrs?: ErrorAttributes) => void; + /** + * Create a child logger with additional correlation context. + * Explicit values passed down (P3 — no ambient state). + */ + readonly child: (ctx: Partial<LogContext> & { readonly attrs?: Attributes }) => Logger; + /** Open a new span. Emits a `span-open` record immediately (D3). */ + readonly span: (name: string, attrs?: Attributes) => Span; +} + +/** + * Attributes for error log calls. The `err` field is `unknown` (not + * constrained to Attributes' scalar index signature) so callers can + * pass `error("msg", { err })` directly. + */ +export interface ErrorAttributes { + readonly err?: unknown; + readonly [key: string]: unknown; +} + +// --- LogRecord (discriminated union) --- + +/** + * Status of a span. "ok" is the default when no error is recorded. + */ +export type SpanStatus = "ok" | "error"; + +/** + * A link to another span, recorded at a handoff moment (D4). + */ +export interface SpanLink { + readonly spanId: string; + readonly turnId?: string; + readonly reason?: string; +} + +/** + * Flat, JSON-serializable discriminated union. Spans are emitted + * incrementally (open at span(), close at end()) so a crashed turn is + * reconstructable from the journal (D3). + * + * Every variant carries correlation keys + timestamp. An optional `body` + * field (string) holds large verbatim payloads (store-fat-serve-thin). + */ +export type LogRecord = LogLineRecord | SpanOpenRecord | SpanCloseRecord; + +/** A structured log line (debug/info/warn/error). */ +export interface LogLineRecord { + readonly kind: "log"; + readonly level: Level; + readonly msg: string; + readonly timestamp: number; + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly spanId?: string; + readonly parentSpanId?: string; + readonly attributes?: Attributes; + /** Optional large verbatim payload (store-fat, serve-thin). */ + readonly body?: string; +} + +/** Emitted when a span is opened (at `logger.span(name)`). */ +export interface SpanOpenRecord { + readonly kind: "span-open"; + readonly spanId: string; + readonly name: string; + readonly timestamp: number; + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly parentSpanId?: string; + readonly attributes?: Attributes; + readonly links?: readonly SpanLink[]; + readonly body?: string; +} + +/** Emitted when a span is closed (at `span.end()`). Carries duration + status. */ +export interface SpanCloseRecord { + readonly kind: "span-close"; + readonly spanId: string; + readonly name: string; + readonly timestamp: number; + readonly durationMs: number; + readonly status: SpanStatus; + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly parentSpanId?: string; + readonly attributes?: Attributes; + readonly links?: readonly SpanLink[]; + readonly body?: string; +} + +// --- LogSink --- + +/** + * Fire-and-forget sink. The kernel calls emit(); the host-bin injects + * a concrete implementation. Kernel never lets sink errors escape (D7). + */ +export interface LogSink { + readonly emit: (record: LogRecord) => void; +} + +// --- Deterministic helpers (injected for testability) --- + +/** Clock + id generator injected into the logger factory. */ +export interface LogDeps { + readonly now: () => number; + readonly newId: () => string; +} + +// --- Pure record builder (no I/O) --- + +/** + * Internal state carried by a logger instance. Built by createLogger; + * never exposed outside this module. + */ +interface LoggerState { + readonly ctx: LogContext; + readonly attrs: Attributes | undefined; + readonly deps: LogDeps; + readonly sink: LogSink; +} + +function mergeAttributes( + base: Attributes | undefined, + extra: Attributes | undefined, +): Attributes | undefined { + if (base === undefined && extra === undefined) return undefined; + if (base === undefined) return extra; + if (extra === undefined) return base; + return { ...base, ...extra }; +} + +function isScalarAttr(value: unknown): value is string | number | boolean | null { + const t = typeof value; + return t === "string" || t === "number" || t === "boolean" || value === null; +} + +function emitLog(state: LoggerState, level: Level, msg: string, attrs?: Attributes): void { + const merged = mergeAttributes(state.attrs, attrs); + const base = { + kind: "log" as const, + level, + msg, + timestamp: state.deps.now(), + extensionId: state.ctx.extensionId, + }; + const record: LogLineRecord = + state.ctx.conversationId !== undefined || + state.ctx.turnId !== undefined || + state.ctx.spanId !== undefined || + state.ctx.parentSpanId !== undefined || + merged !== undefined + ? { + ...base, + ...(state.ctx.conversationId !== undefined + ? { conversationId: state.ctx.conversationId } + : {}), + ...(state.ctx.turnId !== undefined ? { turnId: state.ctx.turnId } : {}), + ...(state.ctx.spanId !== undefined ? { spanId: state.ctx.spanId } : {}), + ...(state.ctx.parentSpanId !== undefined ? { parentSpanId: state.ctx.parentSpanId } : {}), + ...(merged !== undefined ? { attributes: merged } : {}), + } + : base; + try { + state.sink.emit(record); + } catch { + // Swallow — D7: the turn is sovereign (never break the caller). + } +} + +function buildSpanOpen( + state: LoggerState, + name: string, + spanId: string, + attrs?: Attributes, +): SpanOpenRecord { + const base = { + kind: "span-open" as const, + spanId, + name, + timestamp: state.deps.now(), + extensionId: state.ctx.extensionId, + }; + const merged = mergeAttributes(state.attrs, attrs); + return { + ...base, + ...(state.ctx.conversationId !== undefined ? { conversationId: state.ctx.conversationId } : {}), + ...(state.ctx.turnId !== undefined ? { turnId: state.ctx.turnId } : {}), + ...(state.ctx.parentSpanId !== undefined ? { parentSpanId: state.ctx.parentSpanId } : {}), + ...(merged !== undefined ? { attributes: merged } : {}), + }; +} + +function buildSpanLink( + target: { readonly spanId: string; readonly turnId?: string }, + reason?: string, +): SpanLink { + return { + spanId: target.spanId, + ...(target.turnId !== undefined ? { turnId: target.turnId } : {}), + ...(reason !== undefined ? { reason } : {}), + }; +} + +/** + * Create a structured Logger. Pure factory — all I/O goes through the + * injected sink. `{ now, newId }` are injected for deterministic tests. + * + * @param ctx Initial correlation context (extensionId + optional ids). + * @param sink Fire-and-forget record sink. + * @param deps Clock + id generator. + * @param attrs Optional default attributes (from child()). + */ +export function createLogger( + ctx: LogContext, + sink: LogSink, + deps: LogDeps, + attrs?: Attributes, +): Logger { + const state: LoggerState = { ctx, attrs, deps, sink }; + + function makeSpan(name: string, spanAttrs?: Attributes, parentSpanId?: string): Span { + const spanId = deps.newId(); + const mergedParent = parentSpanId ?? state.ctx.spanId; + const spanCtx: LogContext = { + extensionId: ctx.extensionId, + ...(ctx.conversationId !== undefined ? { conversationId: ctx.conversationId } : {}), + ...(ctx.turnId !== undefined ? { turnId: ctx.turnId } : {}), + spanId, + ...(mergedParent !== undefined ? { parentSpanId: mergedParent } : {}), + }; + + const openRecord = buildSpanOpen(state, name, spanId, spanAttrs); + const spanAttrsMutable: Record<string, string | number | boolean | null> = + spanAttrs !== undefined ? { ...spanAttrs } : {}; + const links: SpanLink[] = []; + const openedAt = deps.now(); + + try { + sink.emit(openRecord); + } catch { + // Swallow — D7. + } + + const spanLogger = createLogger(spanCtx, sink, deps, state.attrs); + + const span: Span = { + id: spanId, + log: spanLogger, + setAttributes(newAttrs: Attributes): void { + for (const [key, value] of Object.entries(newAttrs)) { + spanAttrsMutable[key] = value; + } + }, + addLink(target, reason): void { + links.push(buildSpanLink(target, reason)); + }, + child(childName: string, childAttrs?: Attributes): Span { + return makeSpan(childName, childAttrs, spanId); + }, + end(outcome?): void { + const closedAt = deps.now(); + const err = outcome?.err; + let status: SpanStatus = "ok"; + if (err !== undefined && err !== null) { + status = "error"; + const errMsg = err instanceof Error ? err.message : String(err); + spanAttrsMutable["error.message"] = errMsg; + if (err instanceof Error && err.stack !== undefined) { + spanAttrsMutable["error.stack"] = err.stack; + } + } + if (outcome?.attrs !== undefined) { + for (const [key, value] of Object.entries(outcome.attrs)) { + spanAttrsMutable[key] = value; + } + } + + const hasAttrs = Object.keys(spanAttrsMutable).length > 0; + const hasLinks = links.length > 0; + const base = { + kind: "span-close" as const, + spanId, + name, + timestamp: closedAt, + durationMs: closedAt - openedAt, + status, + extensionId: ctx.extensionId, + }; + const closeRecord: SpanCloseRecord = { + ...base, + ...(ctx.conversationId !== undefined ? { conversationId: ctx.conversationId } : {}), + ...(ctx.turnId !== undefined ? { turnId: ctx.turnId } : {}), + ...(mergedParent !== undefined ? { parentSpanId: mergedParent } : {}), + ...(hasAttrs ? { attributes: { ...spanAttrsMutable } } : {}), + ...(hasLinks ? { links: [...links] } : {}), + }; + try { + sink.emit(closeRecord); + } catch { + // Swallow — D7. + } + }, + }; + + return span; + } + + const logger: Logger = { + debug(msg: string, attrs?: Attributes): void { + emitLog(state, "debug", msg, attrs); + }, + info(msg: string, attrs?: Attributes): void { + emitLog(state, "info", msg, attrs); + }, + warn(msg: string, attrs?: Attributes): void { + emitLog(state, "warn", msg, attrs); + }, + error(msg: string, attrs?: ErrorAttributes): void { + const err = attrs?.err; + if (err !== undefined && err !== null) { + // Extract scalar attributes (everything except err). + const scalarAttrs: Record<string, string | number | boolean | null> = {}; + if (attrs !== undefined) { + for (const [key, value] of Object.entries(attrs)) { + if (key !== "err" && isScalarAttr(value)) { + scalarAttrs[key] = value; + } + } + } + const merged = mergeAttributes( + state.attrs, + Object.keys(scalarAttrs).length > 0 ? scalarAttrs : undefined, + ); + const errMsg = err instanceof Error ? err.message : String(err); + const errorAttrs: Record<string, string | number | boolean | null> = { + ...(merged ?? {}), + "error.message": errMsg, + }; + if (err instanceof Error && err.stack !== undefined) { + errorAttrs["error.stack"] = err.stack; + } + emitLog(state, "error", msg, errorAttrs as Attributes); + } else { + // No err field — filter to scalar attributes only. + const scalarAttrs: Record<string, string | number | boolean | null> = {}; + if (attrs !== undefined) { + for (const [key, value] of Object.entries(attrs)) { + if (isScalarAttr(value)) { + scalarAttrs[key] = value; + } + } + } + emitLog( + state, + "error", + msg, + Object.keys(scalarAttrs).length > 0 ? (scalarAttrs as Attributes) : undefined, + ); + } + }, + child(childCtx: Partial<LogContext> & { readonly attrs?: Attributes }): Logger { + const convId = childCtx.conversationId ?? ctx.conversationId; + const tId = childCtx.turnId ?? ctx.turnId; + const sId = childCtx.spanId ?? ctx.spanId; + const pId = childCtx.parentSpanId ?? ctx.parentSpanId; + const newCtx: LogContext = { + extensionId: ctx.extensionId, + ...(convId !== undefined ? { conversationId: convId } : {}), + ...(tId !== undefined ? { turnId: tId } : {}), + ...(sId !== undefined ? { spanId: sId } : {}), + ...(pId !== undefined ? { parentSpanId: pId } : {}), + }; + const newAttrs = mergeAttributes(state.attrs, childCtx.attrs); + return createLogger(newCtx, sink, deps, newAttrs); + }, + span(name: string, attrs?: Attributes): Span { + return makeSpan(name, attrs); + }, + }; + + return logger; +} diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index 68c0444..1e8f14f 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -10,6 +10,7 @@ import type { ChatMessage } from "./conversation.js"; import type { ToolDispatchPolicy } from "./dispatch.js"; import type { AgentEvent } from "./events.js"; +import type { Logger } from "./logging.js"; import type { ProviderContract, ProviderStreamOptions, Usage } from "./provider.js"; import type { ToolContract } from "./tool.js"; @@ -73,6 +74,13 @@ export interface RunTurnInput { /** Cancellation signal for the entire turn. */ readonly signal?: AbortSignal; + + /** + * Optional logger for structured span instrumentation. The runtime opens + * turn/step/tool-call spans using this logger. If omitted, no spans are + * emitted (backward-compatible with callers that don't yet pass a logger). + */ + readonly logger?: Logger; } /** diff --git a/packages/kernel/src/contracts/tool.ts b/packages/kernel/src/contracts/tool.ts index f74ce77..0699d05 100644 --- a/packages/kernel/src/contracts/tool.ts +++ b/packages/kernel/src/contracts/tool.ts @@ -7,6 +7,8 @@ * Extensions may use zod internally and convert to this shape. */ +import type { Logger } from "./logging.js"; + /** * Structural JSON Schema subset for tool parameter declarations. * The kernel does not validate against this — the provider serializes it for @@ -53,6 +55,13 @@ export interface ToolExecuteContext { * can clean up rather than leak. */ readonly signal: AbortSignal; + + /** + * Pre-bound Logger scoped to this tool-call span. Tools log correlated + * without a global (P3). The kernel stamps extensionId, conversationId, + * turnId, and spanId automatically. + */ + readonly log: Logger; } /** diff --git a/packages/kernel/src/host/host.test.ts b/packages/kernel/src/host/host.test.ts index 11c2356..7688366 100644 --- a/packages/kernel/src/host/host.test.ts +++ b/packages/kernel/src/host/host.test.ts @@ -6,7 +6,6 @@ import type { EventsEmitter, Extension, HostAPI, - Logger, Manifest, ManifestContributions, PermissionDecision, @@ -17,33 +16,92 @@ import type { StorageNamespace, } from "../contracts/extension.js"; import { defineEventHook, defineService } from "../contracts/hooks.js"; +import type { + Attributes, + ErrorAttributes, + LogDeps, + Logger, + LogRecord, + LogSink, +} from "../contracts/logging.js"; import type { ProviderContract } from "../contracts/provider.js"; import type { ToolContract } from "../contracts/tool.js"; import { createHost, type HostDeps, KERNEL_API_VERSION } from "./host.js"; interface FakeLogger extends Logger { - readonly logs: Array<{ level: string; message: string; args: unknown[] }>; + readonly logs: Array<{ level: string; message: string; attrs?: Attributes | ErrorAttributes }>; } function createFakeLogger(): FakeLogger { - const logs: Array<{ level: string; message: string; args: unknown[] }> = []; + const logs: Array<{ level: string; message: string; attrs?: Attributes | ErrorAttributes }> = []; return { logs, - debug: (message: string, ...args: unknown[]) => { - logs.push({ level: "debug", message, args }); + debug: (message: string, attrs?: Attributes) => { + if (attrs !== undefined) { + logs.push({ level: "debug", message, attrs }); + } else { + logs.push({ level: "debug", message }); + } + }, + info: (message: string, attrs?: Attributes) => { + if (attrs !== undefined) { + logs.push({ level: "info", message, attrs }); + } else { + logs.push({ level: "info", message }); + } + }, + warn: (message: string, attrs?: Attributes) => { + if (attrs !== undefined) { + logs.push({ level: "warn", message, attrs }); + } else { + logs.push({ level: "warn", message }); + } }, - info: (message: string, ...args: unknown[]) => { - logs.push({ level: "info", message, args }); + error: (message: string, attrs?: ErrorAttributes) => { + if (attrs !== undefined) { + logs.push({ level: "error", message, attrs }); + } else { + logs.push({ level: "error", message }); + } }, - warn: (message: string, ...args: unknown[]) => { - logs.push({ level: "warn", message, args }); + child( + _ctx: Partial<import("../contracts/logging.js").LogContext> & { readonly attrs?: Attributes }, + ): Logger { + return createFakeLogger(); }, - error: (message: string, ...args: unknown[]) => { - logs.push({ level: "error", message, args }); + span(_name: string, _attrs?: Attributes): import("../contracts/logging.js").Span { + return { + id: "fake-span", + log: createFakeLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; }, }; } +function createFakeLogSink(): LogSink & { readonly records: LogRecord[] } { + const records: LogRecord[] = []; + return { + records, + emit: (record: LogRecord) => { + records.push(record); + }, + }; +} + +function createFakeLogDeps(): LogDeps { + let idCounter = 0; + return { + now: () => 1000 + idCounter * 100, + newId: () => `span-${++idCounter}`, + }; +} + function createFakeConfig(): ConfigAccess { return { get: () => undefined, @@ -176,12 +234,16 @@ function createFakeAuth(id: string): AuthContract { describe("createHost", () => { let logger: FakeLogger; + let logSink: ReturnType<typeof createFakeLogSink>; + let logDeps: LogDeps; let deps: HostDeps; let scheduler: ReturnType<typeof createFakeScheduler>; let events: ReturnType<typeof createFakeEvents>; beforeEach(() => { logger = createFakeLogger(); + logSink = createFakeLogSink(); + logDeps = createFakeLogDeps(); scheduler = createFakeScheduler(); events = createFakeEvents(); deps = { @@ -193,6 +255,8 @@ describe("createHost", () => { scheduler, bus: createBus(logger), events, + logSink, + logDeps, }; }); @@ -529,6 +593,7 @@ describe("createHost", () => { expect(order).toEqual(["deactivate-c", "deactivate-a"]); const errors = logger.logs.filter((l) => l.level === "error"); expect(errors.some((e) => e.message.includes("deactivate"))).toBe(true); + expect(errors.some((e) => (e.attrs as { err?: unknown })?.err instanceof Error)).toBe(true); }); }); @@ -750,4 +815,146 @@ describe("createHost", () => { ); }); }); + + describe("auto-scoped logger (D6)", () => { + it("each extension's logger stamps its own manifest.id as extensionId", async () => { + let extALogger: Logger | undefined; + let extBLogger: Logger | undefined; + + const a = createExtension("ext-a", { + activate: (host) => { + extALogger = host.logger; + }, + }); + const b = createExtension("ext-b", { + activate: (host) => { + extBLogger = host.logger; + }, + }); + + const host = createHost([a, b], deps); + await host.activate(); + + extALogger?.info("from-a"); + extBLogger?.info("from-b"); + + const logRecords = logSink.records.filter((r) => r.kind === "log"); + expect(logRecords).toHaveLength(2); + if (logRecords[0]?.kind === "log") { + expect(logRecords[0].extensionId).toBe("ext-a"); + expect(logRecords[0].msg).toBe("from-a"); + } + if (logRecords[1]?.kind === "log") { + expect(logRecords[1].extensionId).toBe("ext-b"); + expect(logRecords[1].msg).toBe("from-b"); + } + }); + + it("an extension cannot spoof extensionId — it is auto-stamped", async () => { + let extLogger: Logger | undefined; + + const ext = createExtension("real-id", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + // child() cannot override extensionId + const child = extLogger?.child({ extensionId: "spoofed" }); + child?.info("msg"); + + const logRecords = logSink.records.filter((r) => r.kind === "log"); + expect(logRecords).toHaveLength(1); + if (logRecords[0]?.kind === "log") { + expect(logRecords[0].extensionId).toBe("real-id"); + } + }); + + it("host.logger.error uses structured { err } shape", async () => { + let extLogger: Logger | undefined; + + const ext = createExtension("ext", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + extLogger?.error("something broke", { err: new Error("boom") }); + + const logRecords = logSink.records.filter((r) => r.kind === "log"); + expect(logRecords).toHaveLength(1); + if (logRecords[0]?.kind === "log") { + expect(logRecords[0].level).toBe("error"); + expect(logRecords[0].msg).toBe("something broke"); + expect(logRecords[0].attributes?.["error.message"]).toBe("boom"); + } + }); + + it("a throwing sink does NOT break the caller", async () => { + const brokenSink: LogSink = { + emit() { + throw new Error("sink down"); + }, + }; + const brokenDeps: HostDeps = { + ...deps, + logSink: brokenSink, + }; + + let extLogger: Logger | undefined; + const ext = createExtension("ext", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], brokenDeps); + await host.activate(); + + // Should not throw + expect(() => extLogger?.info("msg")).not.toThrow(); + }); + + it("span() + end() emit incremental span-open and span-close records", async () => { + let extLogger: Logger | undefined; + + const ext = createExtension("ext", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + const span = extLogger?.span("my-span", { key: "value" }); + span?.setAttributes({ extra: "attr" }); + span?.end({ attrs: { result: "ok" } }); + + const spanOpens = logSink.records.filter((r) => r.kind === "span-open"); + const spanCloses = logSink.records.filter((r) => r.kind === "span-close"); + + expect(spanOpens).toHaveLength(1); + expect(spanCloses).toHaveLength(1); + + if (spanOpens[0]?.kind === "span-open") { + expect(spanOpens[0].name).toBe("my-span"); + expect(spanOpens[0].extensionId).toBe("ext"); + expect(spanOpens[0].attributes?.key).toBe("value"); + } + if (spanCloses[0]?.kind === "span-close") { + expect(spanCloses[0].name).toBe("my-span"); + expect(spanCloses[0].status).toBe("ok"); + expect(spanCloses[0].durationMs).toBeGreaterThanOrEqual(0); + expect(spanCloses[0].attributes?.extra).toBe("attr"); + expect(spanCloses[0].attributes?.result).toBe("ok"); + } + }); + }); }); diff --git a/packages/kernel/src/host/host.ts b/packages/kernel/src/host/host.ts index dd61f9f..c7ec7a9 100644 --- a/packages/kernel/src/host/host.ts +++ b/packages/kernel/src/host/host.ts @@ -5,7 +5,6 @@ import type { EventsEmitter, Extension, HostAPI, - Logger, Manifest, PermissionGate, ScheduledJob, @@ -19,6 +18,8 @@ import type { FilterHandler, ServiceHandle, } from "../contracts/hooks.js"; +import type { LogDeps, Logger, LogSink } from "../contracts/logging.js"; +import { createLogger } from "../contracts/logging.js"; import type { ProviderContract } from "../contracts/provider.js"; import type { ToolContract } from "../contracts/tool.js"; import { resolveActivationOrder } from "./dag.js"; @@ -40,6 +41,8 @@ export interface HostDeps { readonly scheduler: { readonly register: (job: ScheduledJob) => void }; readonly bus: Bus; readonly events: EventsEmitter; + readonly logSink: LogSink; + readonly logDeps: LogDeps; } export interface Host { @@ -96,8 +99,12 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho } } - function buildHostAPI(opts?: { readonly registrationClosed?: boolean }): HostAPI { + function buildHostAPI( + extensionId: string, + opts?: { readonly registrationClosed?: boolean }, + ): HostAPI { const closed = opts?.registrationClosed ?? false; + const extLogger = createLogger({ extensionId }, deps.logSink, deps.logDeps); return { defineTool(tool: ToolContract) { if (closed) throw new Error("Registration not available after activation"); @@ -130,7 +137,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho secrets: deps.secrets, permissions: deps.permissions, events: deps.events, - logger: deps.logger, + logger: extLogger, getProviders() { return providers; }, @@ -156,7 +163,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho async activate() { for (const ext of compatible) { try { - await ext.activate(buildHostAPI()); + await ext.activate(buildHostAPI(ext.manifest.id)); activated.push(ext); deps.logger.info(`Extension "${ext.manifest.id}" activated`); } catch (err) { @@ -164,7 +171,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho manifest: ext.manifest, reason: `Activation failed: ${err instanceof Error ? err.message : String(err)}`, }); - deps.logger.error(`Extension "${ext.manifest.id}" failed to activate`, err); + deps.logger.error(`Extension "${ext.manifest.id}" failed to activate`, { err }); } } }, @@ -175,7 +182,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho try { await ext.deactivate(); } catch (err) { - deps.logger.error(`Extension "${ext.manifest.id}" failed to deactivate`, err); + deps.logger.error(`Extension "${ext.manifest.id}" failed to deactivate`, { err }); } } }, @@ -207,7 +214,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho return disabled; }, getHostAPI() { - return buildHostAPI({ registrationClosed: true }); + return buildHostAPI("__host__", { registrationClosed: true }); }, }; } diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts index 626b333..1ba0849 100644 --- a/packages/kernel/src/runtime/dispatch.ts +++ b/packages/kernel/src/runtime/dispatch.ts @@ -1,4 +1,5 @@ import type { ToolDispatchPolicy } from "../contracts/dispatch.js"; +import type { Logger, Span } from "../contracts/logging.js"; import type { EventEmitter } from "../contracts/runtime.js"; import type { ToolCall, ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { toolOutputEvent } from "./events.js"; @@ -15,6 +16,7 @@ export async function executeToolCall( emit: EventEmitter, conversationId: string, turnId: string, + toolSpan?: Span, ): Promise<ToolResult> { if (tool === undefined) { return { content: `Unknown tool: ${call.name}`, isError: true }; @@ -28,6 +30,7 @@ export async function executeToolCall( onOutput: (data, stream) => { emit(toolOutputEvent(conversationId, turnId, call.id, data, stream)); }, + log: toolSpan?.log ?? createNoopLogger(), }; try { return await tool.execute(call.input, ctx); @@ -50,6 +53,7 @@ export function createStepDispatcher( emit: EventEmitter, conversationId: string, turnId: string, + toolSpans: Map<string, Span>, ): StepDispatcher { let activeCount = 0; let unsafeRunning = false; @@ -78,6 +82,7 @@ export function createStepDispatcher( } async function runAndResolve(entry: QueueEntry): Promise<void> { + const tcSpan = toolSpans.get(entry.call.id); const result = await executeToolCall( entry.call, entry.tool, @@ -85,6 +90,7 @@ export function createStepDispatcher( emit, conversationId, turnId, + tcSpan, ); activeCount--; if (entry.tool?.concurrencySafe === false) unsafeRunning = false; @@ -129,3 +135,27 @@ export function createStepDispatcher( return { submit, drain }; } + +function createNoopLogger(): Logger { + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; +} diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 696a385..48f0b5a 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -1,6 +1,8 @@ import { describe, expect, it } from "vitest"; import type { ChatMessage } from "../contracts/conversation.js"; import type { AgentEvent } from "../contracts/events.js"; +import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.js"; +import { createLogger } from "../contracts/logging.js"; import type { ProviderContract, ProviderEvent } from "../contracts/provider.js"; import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { runTurn } from "./run-turn.js"; @@ -814,4 +816,228 @@ describe("runTurn", () => { expect(outputs[1]?.stream).toBe("stderr"); } }); + + describe("span instrumentation", () => { + function createTestLogger(): { + logger: Logger; + sink: LogSink & { records: LogRecord[] }; + deps: LogDeps; + } { + let idCounter = 0; + const deps: LogDeps = { + now: () => 1000 + idCounter * 100, + newId: () => `span-${++idCounter}`, + }; + const records: LogRecord[] = []; + const sink: LogSink & { records: LogRecord[] } = { + records, + emit: (record) => records.push(record), + }; + const logger = createLogger({ extensionId: "test" }, sink, deps); + return { logger, sink, deps }; + } + + it("emits turn + step span open/close in order", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const spanOpens = sink.records.filter((r) => r.kind === "span-open"); + const spanCloses = sink.records.filter((r) => r.kind === "span-close"); + + expect(spanOpens.length).toBeGreaterThanOrEqual(2); // turn + step + expect(spanCloses.length).toBeGreaterThanOrEqual(2); + + const turnOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "turn"); + const stepOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "step"); + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + + if (turnOpen?.kind === "span-open") { + expect(turnOpen.extensionId).toBe("test"); + expect(turnOpen.attributes?.conversationId).toBe("conv-1"); + expect(turnOpen.attributes?.turnId).toBe("turn-1"); + } + + const turnClose = spanCloses.find((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.status).toBe("ok"); + expect(turnClose.durationMs).toBeGreaterThanOrEqual(0); + } + }); + + it("emits tool-call spans for dispatched tools", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const toolCallSpans = sink.records.filter( + (r) => r.kind === "span-open" && r.name === "tool-call", + ); + expect(toolCallSpans).toHaveLength(1); + if (toolCallSpans[0]?.kind === "span-open") { + expect(toolCallSpans[0].attributes?.name).toBe("echo"); + expect(toolCallSpans[0].attributes?.toolCallId).toBe("tc1"); + } + + const toolCallCloses = sink.records.filter( + (r) => r.kind === "span-close" && r.name === "tool-call", + ); + expect(toolCallCloses).toHaveLength(1); + if (toolCallCloses[0]?.kind === "span-close") { + expect(toolCallCloses[0].status).toBe("ok"); + } + }); + + it("tools receive ctx.log (correlated logger)", async () => { + let capturedLog: Logger | undefined; + + const tool = createFakeTool("logtest", async (_input, ctx) => { + capturedLog = ctx.log; + ctx.log.info("tool ran", { key: "value" }); + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "logtest", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedLog).toBeDefined(); + + const toolLogs = sink.records.filter( + (r) => r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "tool ran", + ); + expect(toolLogs).toHaveLength(1); + if (toolLogs[0]?.kind === "log") { + expect(toolLogs[0].attributes?.key).toBe("value"); + expect(toolLogs[0].extensionId).toBe("test"); + } + }); + + it("an aborted turn still closes its turn span", async () => { + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + signal: ac.signal, + logger, + }); + + const turnCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnCloses).toHaveLength(1); + if (turnCloses[0]?.kind === "span-close") { + expect(turnCloses[0].attributes?.finishReason).toBe("aborted"); + } + }); + + it("a provider error closes the step span with error status", async () => { + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("provider exploded"); + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(result.finishReason).toBe("error"); + + const stepCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "step"); + expect(stepCloses).toHaveLength(1); + if (stepCloses[0]?.kind === "span-close") { + expect(stepCloses[0].status).toBe("error"); + expect(stepCloses[0].attributes?.["error.message"]).toContain("provider exploded"); + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 9421d86..0f42ef3 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -1,4 +1,5 @@ import type { ChatMessage, Chunk } from "../contracts/conversation.js"; +import type { Logger, Span } from "../contracts/logging.js"; import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js"; import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js"; import type { ToolCall, ToolContract } from "../contracts/tool.js"; @@ -76,6 +77,8 @@ interface StepContext { readonly signal: AbortSignal; readonly conversationId: string; readonly turnId: string; + readonly logger: Logger; + readonly toolSpans: Map<string, Span>; } interface StepResult { @@ -124,6 +127,18 @@ function processEvent( event.input, ), ); + + // Open a tool-call span (attrs: name, toolCallId) + try { + const tcSpan = ctx.logger.span("tool-call", { + name: event.toolName, + toolCallId: event.toolCallId, + }); + ctx.toolSpans.set(event.toolCallId, tcSpan); + } catch { + // Swallow — D7: logging never breaks the turn. + } + if (ctx.dispatch.eager) { dispatcher.submit(call); } @@ -151,6 +166,26 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { let stepUsage = zeroUsage(); let finishReason = "stop"; + // Open a step span with the verbatim pre-mutation prompt in its body (BEFORE capture). + let stepSpan: Span | undefined; + try { + stepSpan = ctx.logger.span("step"); + // Emit the verbatim pre-mutation prompt as a log record on the step span's logger. + // This is the "BEFORE" capture — the messages + tools as handed to provider.stream. + stepSpan.log.info("prompt:before", { + "prompt.messages": JSON.stringify(ctx.messages), + "prompt.tools": JSON.stringify( + ctx.tools.map((t) => ({ + name: t.name, + description: t.description, + parameters: t.parameters, + })), + ), + }); + } catch { + // Swallow — D7. + } + const dispatcher = createStepDispatcher( ctx.toolMap, ctx.dispatch, @@ -158,6 +193,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ctx.emit, ctx.conversationId, ctx.turnId, + ctx.toolSpans, ); try { @@ -177,6 +213,13 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { chunks.push({ type: "error", message }); ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message)); finishReason = "error"; + // Close step span with error + try { + stepSpan?.end({ err }); + } catch { + // Swallow — D7. + } + stepSpan = undefined; } if (!ctx.dispatch.eager) { @@ -187,6 +230,25 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const results = await dispatcher.drain(); + // Close remaining tool-call spans + for (const call of toolCalls) { + const tcSpan = ctx.toolSpans.get(call.id); + if (tcSpan !== undefined) { + const result = results.get(call.id); + try { + tcSpan.end({ + attrs: { + isError: result?.isError ?? false, + contentLength: result?.content.length ?? 0, + }, + }); + } catch { + // Swallow — D7. + } + ctx.toolSpans.delete(call.id); + } + } + const toolMessages: ChatMessage[] = []; for (const call of toolCalls) { const result = results.get(call.id); @@ -217,6 +279,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { } } + // Close step span (if not already closed by error) + if (stepSpan !== undefined) { + try { + stepSpan.end({ + attrs: { + finishReason, + usage_inputTokens: stepUsage.inputTokens, + usage_outputTokens: stepUsage.outputTokens, + }, + }); + } catch { + // Swallow — D7. + } + } + const assistantMessage: ChatMessage | undefined = chunks.length > 0 ? { role: "assistant", chunks } : undefined; @@ -237,51 +314,122 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { const conversationId = input.conversationId; const turnId = input.turnId; const signal = input.signal ?? new AbortController().signal; + const logger = input.logger; - for (let step = 0; step < MAX_STEPS; step++) { - if (signal.aborted) { - finishReason = "aborted"; - break; + // Open a turn span (attrs: conversationId, turnId, model) + let turnSpan: Span | undefined; + if (logger !== undefined) { + try { + turnSpan = logger.span("turn", { + conversationId, + turnId, + model: input.providerOpts?.model ?? input.provider.id, + }); + } catch { + // Swallow — D7. } + } - const stepResult = await executeStep({ - provider: input.provider, - messages, - tools: input.tools, - toolMap, - dispatch: input.dispatch, - emit: input.emit, - signal, - conversationId, - turnId, - }); + // Track open tool-call spans across steps so we can close them on abort + const toolSpans = new Map<string, Span>(); + + try { + for (let step = 0; step < MAX_STEPS; step++) { + if (signal.aborted) { + finishReason = "aborted"; + break; + } - totalUsage = addUsage(totalUsage, stepResult.usage); + const stepResult = await executeStep({ + provider: input.provider, + messages, + tools: input.tools, + toolMap, + dispatch: input.dispatch, + emit: input.emit, + signal, + conversationId, + turnId, + logger: turnSpan?.log ?? logger ?? createNoopLogger(), + toolSpans, + }); - if (stepResult.assistantMessage !== undefined) { - messages.push(stepResult.assistantMessage); - resultMessages.push(stepResult.assistantMessage); - } + totalUsage = addUsage(totalUsage, stepResult.usage); - for (const msg of stepResult.toolMessages) { - messages.push(msg); - resultMessages.push(msg); - } + if (stepResult.assistantMessage !== undefined) { + messages.push(stepResult.assistantMessage); + resultMessages.push(stepResult.assistantMessage); + } - if (signal.aborted) { - finishReason = "aborted"; - break; - } + for (const msg of stepResult.toolMessages) { + messages.push(msg); + resultMessages.push(msg); + } - if (stepResult.toolCalls.length === 0) { - finishReason = stepResult.finishReason; - break; + if (signal.aborted) { + finishReason = "aborted"; + break; + } + + if (stepResult.toolCalls.length === 0) { + finishReason = stepResult.finishReason; + break; + } + + if (step === MAX_STEPS - 1) { + finishReason = "max-steps"; + } + } + } finally { + // Close any orphaned tool-call spans (e.g. abort mid-tool) + for (const [id, tcSpan] of toolSpans) { + try { + tcSpan.end({ attrs: { orphaned: true } }); + } catch { + // Swallow — D7. + } + toolSpans.delete(id); } - if (step === MAX_STEPS - 1) { - finishReason = "max-steps"; + // Close the turn span + if (turnSpan !== undefined) { + try { + turnSpan.end({ + attrs: { + finishReason, + usage_inputTokens: totalUsage.inputTokens, + usage_outputTokens: totalUsage.outputTokens, + }, + }); + } catch { + // Swallow — D7. + } } } return { messages: resultMessages, usage: totalUsage, finishReason }; } + +function createNoopLogger(): Logger { + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; +} |
