diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 15:35:21 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 15:35:21 +0900 |
| commit | 2bf8f9ec9714cbd33a0ecfb7b40dd7a335180fb7 (patch) | |
| tree | c9b0a6203621df44a0687e1aed1d8d4d923c8297 | |
| parent | 20c6c675a11b887c603be5ff08165cb182c7db65 (diff) | |
| download | dispatch-2bf8f9ec9714cbd33a0ecfb7b40dd7a335180fb7.tar.gz dispatch-2bf8f9ec9714cbd33a0ecfb7b40dd7a335180fb7.zip | |
fix(observability): nest turn/step/prompt/provider.request spans into a tree (+ buildSpanOpen parent propagation)
run-turn: step is now turnSpan.child; prompt/provider.request/tool-call are step's children (stepSpan.log passed into provider.stream). logger.ts: buildSpanOpen now propagates the child's computed parentSpanId onto the span-open record — a latent bug where span.child(...) never set parentSpanId on open (close was already correct).
Verified: tsc -b clean, 279 tests, biome 0/0. Live: span tree turn->step->{prompt,provider.request}; the trace CLI easy-view renders the nesting.
| -rw-r--r-- | packages/kernel/src/logging/logger.ts | 6 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 262 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 32 |
3 files changed, 286 insertions, 14 deletions
diff --git a/packages/kernel/src/logging/logger.ts b/packages/kernel/src/logging/logger.ts index 507ace6..70bc18c 100644 --- a/packages/kernel/src/logging/logger.ts +++ b/packages/kernel/src/logging/logger.ts @@ -82,6 +82,7 @@ function buildSpanOpen( spanId: string, attrs?: Attributes, body?: string, + parentSpanId?: string, ): SpanOpenRecord { const base = { kind: "span-open" as const, @@ -91,11 +92,12 @@ function buildSpanOpen( extensionId: state.ctx.extensionId, }; const merged = mergeAttributes(state.attrs, attrs); + const effectiveParent = parentSpanId ?? state.ctx.parentSpanId; return { ...base, ...(state.ctx.conversationId !== undefined ? { conversationId: state.ctx.conversationId } : {}), ...(state.ctx.turnId !== undefined ? { turnId: state.ctx.turnId } : {}), - ...(state.ctx.parentSpanId !== undefined ? { parentSpanId: state.ctx.parentSpanId } : {}), + ...(effectiveParent !== undefined ? { parentSpanId: effectiveParent } : {}), ...(merged !== undefined ? { attributes: merged } : {}), ...(body !== undefined ? { body } : {}), }; @@ -145,7 +147,7 @@ export function createLogger( ...(mergedParent !== undefined ? { parentSpanId: mergedParent } : {}), }; - const openRecord = buildSpanOpen(state, name, spanId, spanAttrs, body); + const openRecord = buildSpanOpen(state, name, spanId, spanAttrs, body, mergedParent); const spanAttrsMutable: Record<string, string | number | boolean | null> = spanAttrs !== undefined ? { ...spanAttrs } : {}; const links: SpanLink[] = []; diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 9c19027..2dd5d2e 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -1155,4 +1155,266 @@ describe("runTurn", () => { expect(capturedOpts?.logger).toBeUndefined(); }); }); + + describe("span tree nesting", () => { + it("turn span is root (parentSpanId undefined)", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "turn"); + expect(turnOpen).toBeDefined(); + if (turnOpen?.kind === "span-open") { + expect(turnOpen.parentSpanId).toBeUndefined(); + } + }); + + it("step span is a child of turn span", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "turn"); + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + if (turnOpen?.kind === "span-open" && stepOpen?.kind === "span-open") { + expect(stepOpen.parentSpanId).toBe(turnOpen.spanId); + } + }); + + it("prompt span is a child of step span", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + const promptOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "prompt"); + expect(stepOpen).toBeDefined(); + expect(promptOpen).toBeDefined(); + if (stepOpen?.kind === "span-open" && promptOpen?.kind === "span-open") { + expect(promptOpen.parentSpanId).toBe(stepOpen.spanId); + } + }); + + it("provider logger creates spans nested under step", async () => { + let capturedLogger: Logger | undefined; + let providerReqSpanId: string | undefined; + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedLogger = opts?.logger; + return (async function* () { + // Open provider.request span inside the stream (like a real provider) + if (capturedLogger !== undefined) { + const span = capturedLogger.span("provider.request"); + providerReqSpanId = span.id; + span.end(); + } + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedLogger).toBeDefined(); + expect(providerReqSpanId).toBeDefined(); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + const provReqOpen = sink.records.find( + (r) => r.kind === "span-open" && r.name === "provider.request", + ); + expect(stepOpen).toBeDefined(); + expect(provReqOpen).toBeDefined(); + if (stepOpen?.kind === "span-open" && provReqOpen?.kind === "span-open") { + expect(provReqOpen.parentSpanId).toBe(stepOpen.spanId); + expect(provReqOpen.spanId).toBe(providerReqSpanId); + } + }); + + it("tool-call spans are children of step span", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + const tcOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "tool-call"); + expect(stepOpen).toBeDefined(); + expect(tcOpen).toBeDefined(); + if (stepOpen?.kind === "span-open" && tcOpen?.kind === "span-open") { + expect(tcOpen.parentSpanId).toBe(stepOpen.spanId); + } + }); + + it("full parent chain: turn → step → {prompt, provider.request, tool-call}", async () => { + let capturedLogger: Logger | undefined; + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let streamCallCount = 0; + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedLogger = opts?.logger; + streamCallCount++; + return (async function* () { + // Simulate provider opening a provider.request span + // INSIDE the stream on the first call only (like a real provider) + if (streamCallCount === 1 && capturedLogger !== undefined) { + const span = capturedLogger.span("provider.request"); + span.end(); + } + if (streamCallCount === 1) { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + yield { type: "text-delta", delta: "done" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const spanOpens = sink.records.filter((r) => r.kind === "span-open") as Array< + Extract<LogRecord, { kind: "span-open" }> + >; + + const turnOpen = spanOpens.find((r) => r.name === "turn"); + const stepOpen = spanOpens.find((r) => r.name === "step"); + const promptOpen = spanOpens.find((r) => r.name === "prompt"); + const provReqOpen = spanOpens.find((r) => r.name === "provider.request"); + const tcOpen = spanOpens.find((r) => r.name === "tool-call"); + + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + expect(promptOpen).toBeDefined(); + expect(provReqOpen).toBeDefined(); + expect(tcOpen).toBeDefined(); + + if ( + turnOpen?.kind === "span-open" && + stepOpen?.kind === "span-open" && + promptOpen?.kind === "span-open" && + provReqOpen?.kind === "span-open" && + tcOpen?.kind === "span-open" + ) { + // turn = root + expect(turnOpen.parentSpanId).toBeUndefined(); + + // step = child of turn + expect(stepOpen.parentSpanId).toBe(turnOpen.spanId); + + // prompt = child of step + expect(promptOpen.parentSpanId).toBe(stepOpen.spanId); + + // provider.request = child of step + expect(provReqOpen.parentSpanId).toBe(stepOpen.spanId); + + // tool-call = child of step + expect(tcOpen.parentSpanId).toBe(stepOpen.spanId); + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 5e60641..01b280b 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -78,7 +78,7 @@ interface StepContext { readonly conversationId: string; readonly turnId: string; readonly logger: Logger; - readonly stepLogger: Logger | undefined; + readonly turnSpan: Span | undefined; readonly toolSpans: Map<string, Span>; } @@ -96,6 +96,7 @@ function processEvent( toolCalls: ToolCall[], dispatcher: StepDispatcher, ctx: StepContext, + stepSpan: Span | undefined, ): void { switch (event.type) { case "text-delta": @@ -129,12 +130,18 @@ function processEvent( ), ); - // Open a tool-call span (attrs: name, toolCallId) + // Open a tool-call span as a child of the step span (attrs: name, toolCallId) try { - const tcSpan = ctx.logger.span("tool-call", { - name: event.toolName, - toolCallId: event.toolCallId, - }); + const tcSpan = + stepSpan !== undefined + ? stepSpan.child("tool-call", { + name: event.toolName, + toolCallId: event.toolCallId, + }) + : ctx.logger.span("tool-call", { + name: event.toolName, + toolCallId: event.toolCallId, + }); ctx.toolSpans.set(event.toolCallId, tcSpan); } catch { // Swallow — D7: logging never breaks the turn. @@ -167,11 +174,12 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { let stepUsage = zeroUsage(); let finishReason = "stop"; - // Open a step span; capture the verbatim pre-mutation prompt via a - // "prompt" child span whose body holds the serialized messages+tools. + // Open a step span as a child of the turn span; capture the verbatim + // pre-mutation prompt via a "prompt" child span whose body holds the + // serialized messages+tools. let stepSpan: Span | undefined; try { - stepSpan = ctx.logger.span("step"); + stepSpan = ctx.turnSpan !== undefined ? ctx.turnSpan.child("step") : ctx.logger.span("step"); const promptBody = JSON.stringify({ messages: ctx.messages, tools: ctx.tools }); const promptSpan = stepSpan.child( "prompt", @@ -198,12 +206,12 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { try { const opts = { - ...(ctx.stepLogger !== undefined ? { logger: ctx.stepLogger } : {}), + ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), }; const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); for await (const event of stream) { if (ctx.signal.aborted) break; - processEvent(event, chunks, toolCalls, dispatcher, ctx); + processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan); if (event.type === "usage") { stepUsage = addUsage(stepUsage, event.usage); } @@ -354,7 +362,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { conversationId, turnId, logger: turnSpan?.log ?? logger ?? createNoopLogger(), - stepLogger: logger, + turnSpan, toolSpans, }); |
