diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 17:07:47 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 17:07:47 +0900 |
| commit | 3ecc9778fe278d6665b1e9a918f44c16f6992b87 (patch) | |
| tree | 57e14412d782469d2a744ce9da9eded8bb2a5bd6 /packages/kernel/src/runtime | |
| parent | d2ac57045e9884e5f948b95014e853111cd6bc3d (diff) | |
| download | dispatch-3ecc9778fe278d6665b1e9a918f44c16f6992b87.tar.gz dispatch-3ecc9778fe278d6665b1e9a918f44c16f6992b87.zip | |
feat(kernel-runtime): per-step TTFT + decode timing spans (observability)
Split each step's generation into a ttft span (stream start -> first text|reasoning
token) and a decode span (first token -> stream end), children of the step span.
decode = generation total - TTFT; both retrievable from the trace-store. First token
counts reasoning deltas; a step with no content token ends ttft with firstToken:false
(no misleading decode). Span-based (no clock injection), no wire/contract change.
+3 runtime tests. GLOSSARY: TTFT + decode time.
typecheck clean; 512 vitest; biome 0/0.
Diffstat (limited to 'packages/kernel/src/runtime')
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 145 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 67 |
2 files changed, 211 insertions, 1 deletions
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 42a846b..089f65b 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -1161,6 +1161,151 @@ describe("runTurn", () => { ); expect(logRecords).toHaveLength(0); }); + + it("emits ttft and decode spans for a generating step", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "text-delta", delta: " world" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft"); + const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); + const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode"); + const decodeCloses = sink.records.filter( + (r) => r.kind === "span-close" && r.name === "decode", + ); + + expect(ttftOpens).toHaveLength(1); + expect(ttftCloses).toHaveLength(1); + expect(decodeOpens).toHaveLength(1); + expect(decodeCloses).toHaveLength(1); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + expect(stepOpen).toBeDefined(); + + if ( + ttftOpens[0]?.kind === "span-open" && + ttftCloses[0]?.kind === "span-close" && + decodeOpens[0]?.kind === "span-open" && + decodeCloses[0]?.kind === "span-close" && + stepOpen?.kind === "span-open" + ) { + // ttft and decode are children of step + expect(ttftOpens[0].parentSpanId).toBe(stepOpen.spanId); + expect(decodeOpens[0].parentSpanId).toBe(stepOpen.spanId); + + // ttft closes before decode opens (in order) + const ttftCloseIdx = sink.records.indexOf(ttftCloses[0]); + const decodeOpenIdx = sink.records.indexOf(decodeOpens[0]); + expect(ttftCloseIdx).toBeLessThan(decodeOpenIdx); + + // ttft has firstToken: true + expect(ttftCloses[0].attributes?.firstToken).toBe(true); + + // durations from fake clock + expect(ttftCloses[0].durationMs).toBeGreaterThanOrEqual(0); + expect(decodeCloses[0].durationMs).toBeGreaterThanOrEqual(0); + } + }); + + it("first token counts a reasoning delta", async () => { + const provider = createFakeProvider([ + [ + { type: "reasoning-delta", delta: "thinking..." }, + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); + expect(ttftCloses).toHaveLength(1); + + // The ttft span should close at the reasoning delta, not at the text delta + if (ttftCloses[0]?.kind === "span-close") { + expect(ttftCloses[0].attributes?.firstToken).toBe(true); + } + }); + + it("a step with no content token does not emit a misleading decode", async () => { + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + // First step (tool-call-only) should have ttft with firstToken: false and no decode + const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft"); + const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); + const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode"); + + // There should be 2 ttft opens (one per step) and 2 ttft closes + expect(ttftOpens).toHaveLength(2); + expect(ttftCloses).toHaveLength(2); + + // First step: tool-call-only, no first token + if (ttftCloses[0]?.kind === "span-close") { + expect(ttftCloses[0].attributes?.firstToken).toBe(false); + } + + // Second step: has text-delta, should have firstToken: true and decode span + if (ttftCloses[1]?.kind === "span-close") { + expect(ttftCloses[1].attributes?.firstToken).toBe(true); + } + + // Only one decode span (for the second step) + expect(decodeOpens).toHaveLength(1); + }); }); describe("provider logger threading", () => { diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index b722f3f..a8ee6c9 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -86,6 +86,12 @@ interface StepContext { readonly cwd: string | undefined; } +interface TimingState { + ttftSpan: Span | undefined; + decodeSpan: Span | undefined; + firstTokenSeen: boolean; +} + interface StepResult { readonly assistantMessage: ChatMessage | undefined; readonly toolCalls: ToolCall[]; @@ -101,13 +107,42 @@ function processEvent( dispatcher: StepDispatcher, ctx: StepContext, stepSpan: Span | undefined, + timing: TimingState, ): void { switch (event.type) { case "text-delta": + if (!timing.firstTokenSeen) { + timing.firstTokenSeen = true; + try { + timing.ttftSpan?.end({ attrs: { firstToken: true } }); + } catch { + // Swallow — D7. + } + timing.ttftSpan = undefined; + try { + timing.decodeSpan = stepSpan?.child("decode"); + } catch { + // Swallow — D7. + } + } appendTextDelta(chunks, event.delta); ctx.emit(textDeltaEvent(ctx.conversationId, ctx.turnId, event.delta)); break; case "reasoning-delta": + if (!timing.firstTokenSeen) { + timing.firstTokenSeen = true; + try { + timing.ttftSpan?.end({ attrs: { firstToken: true } }); + } catch { + // Swallow — D7. + } + timing.ttftSpan = undefined; + try { + timing.decodeSpan = stepSpan?.child("decode"); + } catch { + // Swallow — D7. + } + } appendThinkingDelta(chunks, event.delta); ctx.emit(reasoningDeltaEvent(ctx.conversationId, ctx.turnId, event.delta)); break; @@ -211,6 +246,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ctx.cwd, ); + const timing: TimingState = { + ttftSpan: undefined, + decodeSpan: undefined, + firstTokenSeen: false, + }; + + // Open TTFT span when spans are enabled + try { + if (stepSpan !== undefined) { + timing.ttftSpan = stepSpan.child("ttft"); + } + } catch { + // Swallow — D7. + } + try { const opts = { ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), @@ -218,7 +268,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); for await (const event of stream) { if (ctx.signal.aborted) break; - processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan); + processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing); if (event.type === "usage") { stepUsage = addUsage(stepUsage, event.usage); } @@ -240,6 +290,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { stepSpan = undefined; } + // Close timing spans: if no first token was seen, end ttft with firstToken: false + // If decode span is open, close it + try { + if (timing.ttftSpan !== undefined) { + timing.ttftSpan.end({ attrs: { firstToken: false } }); + timing.ttftSpan = undefined; + } + if (timing.decodeSpan !== undefined) { + timing.decodeSpan.end(); + timing.decodeSpan = undefined; + } + } catch { + // Swallow — D7. + } + if (!ctx.dispatch.eager) { for (const call of toolCalls) { dispatcher.submit(call); |
