summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel/src/runtime
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-07 17:07:47 +0900
committerAdam Malczewski <[email protected]>2026-06-07 17:07:47 +0900
commit3ecc9778fe278d6665b1e9a918f44c16f6992b87 (patch)
tree57e14412d782469d2a744ce9da9eded8bb2a5bd6 /packages/kernel/src/runtime
parentd2ac57045e9884e5f948b95014e853111cd6bc3d (diff)
downloaddispatch-3ecc9778fe278d6665b1e9a918f44c16f6992b87.tar.gz
dispatch-3ecc9778fe278d6665b1e9a918f44c16f6992b87.zip
feat(kernel-runtime): per-step TTFT + decode timing spans (observability)
Split each step's generation into a ttft span (stream start -> first text|reasoning token) and a decode span (first token -> stream end), children of the step span. decode = generation total - TTFT; both retrievable from the trace-store. First token counts reasoning deltas; a step with no content token ends ttft with firstToken:false (no misleading decode). Span-based (no clock injection), no wire/contract change. +3 runtime tests. GLOSSARY: TTFT + decode time. typecheck clean; 512 vitest; biome 0/0.
Diffstat (limited to 'packages/kernel/src/runtime')
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts145
-rw-r--r--packages/kernel/src/runtime/run-turn.ts67
2 files changed, 211 insertions, 1 deletions
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 42a846b..089f65b 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -1161,6 +1161,151 @@ describe("runTurn", () => {
);
expect(logRecords).toHaveLength(0);
});
+
+ it("emits ttft and decode spans for a generating step", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "Hello" },
+ { type: "text-delta", delta: " world" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft");
+ const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft");
+ const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode");
+ const decodeCloses = sink.records.filter(
+ (r) => r.kind === "span-close" && r.name === "decode",
+ );
+
+ expect(ttftOpens).toHaveLength(1);
+ expect(ttftCloses).toHaveLength(1);
+ expect(decodeOpens).toHaveLength(1);
+ expect(decodeCloses).toHaveLength(1);
+
+ const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step");
+ expect(stepOpen).toBeDefined();
+
+ if (
+ ttftOpens[0]?.kind === "span-open" &&
+ ttftCloses[0]?.kind === "span-close" &&
+ decodeOpens[0]?.kind === "span-open" &&
+ decodeCloses[0]?.kind === "span-close" &&
+ stepOpen?.kind === "span-open"
+ ) {
+ // ttft and decode are children of step
+ expect(ttftOpens[0].parentSpanId).toBe(stepOpen.spanId);
+ expect(decodeOpens[0].parentSpanId).toBe(stepOpen.spanId);
+
+ // ttft closes before decode opens (in order)
+ const ttftCloseIdx = sink.records.indexOf(ttftCloses[0]);
+ const decodeOpenIdx = sink.records.indexOf(decodeOpens[0]);
+ expect(ttftCloseIdx).toBeLessThan(decodeOpenIdx);
+
+ // ttft has firstToken: true
+ expect(ttftCloses[0].attributes?.firstToken).toBe(true);
+
+ // durations from fake clock
+ expect(ttftCloses[0].durationMs).toBeGreaterThanOrEqual(0);
+ expect(decodeCloses[0].durationMs).toBeGreaterThanOrEqual(0);
+ }
+ });
+
+ it("first token counts a reasoning delta", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "reasoning-delta", delta: "thinking..." },
+ { type: "text-delta", delta: "Hello" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft");
+ expect(ttftCloses).toHaveLength(1);
+
+ // The ttft span should close at the reasoning delta, not at the text delta
+ if (ttftCloses[0]?.kind === "span-close") {
+ expect(ttftCloses[0].attributes?.firstToken).toBe(true);
+ }
+ });
+
+ it("a step with no content token does not emit a misleading decode", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ // First step (tool-call-only) should have ttft with firstToken: false and no decode
+ const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft");
+ const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft");
+ const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode");
+
+ // There should be 2 ttft opens (one per step) and 2 ttft closes
+ expect(ttftOpens).toHaveLength(2);
+ expect(ttftCloses).toHaveLength(2);
+
+ // First step: tool-call-only, no first token
+ if (ttftCloses[0]?.kind === "span-close") {
+ expect(ttftCloses[0].attributes?.firstToken).toBe(false);
+ }
+
+ // Second step: has text-delta, should have firstToken: true and decode span
+ if (ttftCloses[1]?.kind === "span-close") {
+ expect(ttftCloses[1].attributes?.firstToken).toBe(true);
+ }
+
+ // Only one decode span (for the second step)
+ expect(decodeOpens).toHaveLength(1);
+ });
});
describe("provider logger threading", () => {
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index b722f3f..a8ee6c9 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -86,6 +86,12 @@ interface StepContext {
readonly cwd: string | undefined;
}
+interface TimingState {
+ ttftSpan: Span | undefined;
+ decodeSpan: Span | undefined;
+ firstTokenSeen: boolean;
+}
+
interface StepResult {
readonly assistantMessage: ChatMessage | undefined;
readonly toolCalls: ToolCall[];
@@ -101,13 +107,42 @@ function processEvent(
dispatcher: StepDispatcher,
ctx: StepContext,
stepSpan: Span | undefined,
+ timing: TimingState,
): void {
switch (event.type) {
case "text-delta":
+ if (!timing.firstTokenSeen) {
+ timing.firstTokenSeen = true;
+ try {
+ timing.ttftSpan?.end({ attrs: { firstToken: true } });
+ } catch {
+ // Swallow — D7.
+ }
+ timing.ttftSpan = undefined;
+ try {
+ timing.decodeSpan = stepSpan?.child("decode");
+ } catch {
+ // Swallow — D7.
+ }
+ }
appendTextDelta(chunks, event.delta);
ctx.emit(textDeltaEvent(ctx.conversationId, ctx.turnId, event.delta));
break;
case "reasoning-delta":
+ if (!timing.firstTokenSeen) {
+ timing.firstTokenSeen = true;
+ try {
+ timing.ttftSpan?.end({ attrs: { firstToken: true } });
+ } catch {
+ // Swallow — D7.
+ }
+ timing.ttftSpan = undefined;
+ try {
+ timing.decodeSpan = stepSpan?.child("decode");
+ } catch {
+ // Swallow — D7.
+ }
+ }
appendThinkingDelta(chunks, event.delta);
ctx.emit(reasoningDeltaEvent(ctx.conversationId, ctx.turnId, event.delta));
break;
@@ -211,6 +246,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
ctx.cwd,
);
+ const timing: TimingState = {
+ ttftSpan: undefined,
+ decodeSpan: undefined,
+ firstTokenSeen: false,
+ };
+
+ // Open TTFT span when spans are enabled
+ try {
+ if (stepSpan !== undefined) {
+ timing.ttftSpan = stepSpan.child("ttft");
+ }
+ } catch {
+ // Swallow — D7.
+ }
+
try {
const opts = {
...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}),
@@ -218,7 +268,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
for await (const event of stream) {
if (ctx.signal.aborted) break;
- processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan);
+ processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing);
if (event.type === "usage") {
stepUsage = addUsage(stepUsage, event.usage);
}
@@ -240,6 +290,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
stepSpan = undefined;
}
+ // Close timing spans: if no first token was seen, end ttft with firstToken: false
+ // If decode span is open, close it
+ try {
+ if (timing.ttftSpan !== undefined) {
+ timing.ttftSpan.end({ attrs: { firstToken: false } });
+ timing.ttftSpan = undefined;
+ }
+ if (timing.decodeSpan !== undefined) {
+ timing.decodeSpan.end();
+ timing.decodeSpan = undefined;
+ }
+ } catch {
+ // Swallow — D7.
+ }
+
if (!ctx.dispatch.eager) {
for (const call of toolCalls) {
dispatcher.submit(call);