diff options
Diffstat (limited to 'packages/kernel/src/runtime')
| -rw-r--r-- | packages/kernel/src/runtime/events.ts | 80 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 330 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 65 |
3 files changed, 468 insertions, 7 deletions
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index deeb012..300e711 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -33,9 +33,10 @@ export function toolResultEvent( toolName: string, content: string, isError: boolean, + durationMs?: number, ): AgentEvent { - return { - type: "tool-result", + const base = { + type: "tool-result" as const, conversationId, turnId, stepId, @@ -44,6 +45,10 @@ export function toolResultEvent( content, isError, }; + if (durationMs !== undefined) { + return { ...base, durationMs }; + } + return base; } export function toolOutputEvent( @@ -56,7 +61,15 @@ export function toolOutputEvent( return { type: "tool-output", conversationId, turnId, toolCallId, data, stream }; } -export function usageEvent(conversationId: string, turnId: string, usage: Usage): AgentEvent { +export function usageEvent( + conversationId: string, + turnId: string, + usage: Usage, + stepId?: StepId, +): AgentEvent { + if (stepId !== undefined) { + return { type: "usage", conversationId, turnId, usage, stepId }; + } return { type: "usage", conversationId, turnId, usage }; } @@ -64,7 +77,66 @@ export function turnStartEvent(conversationId: string, turnId: string): AgentEve return { type: "turn-start", conversationId, turnId }; } -export function doneEvent(conversationId: string, turnId: string, reason: string): AgentEvent { +export function stepCompleteEvent( + conversationId: string, + turnId: string, + stepId: StepId, + timing?: { ttftMs?: number; decodeMs?: number; genTotalMs?: number }, +): AgentEvent { + if (timing !== undefined) { + if (timing.ttftMs !== undefined) { + if (timing.decodeMs !== undefined && timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + decodeMs: timing.decodeMs, + genTotalMs: timing.genTotalMs, + }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + genTotalMs: timing.genTotalMs, + }; + } + return { type: "step-complete", conversationId, turnId, stepId, ttftMs: timing.ttftMs }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + genTotalMs: timing.genTotalMs, + }; + } + } + return { type: "step-complete", conversationId, turnId, stepId }; +} + +export function doneEvent( + conversationId: string, + turnId: string, + reason: string, + durationMs?: number, + usage?: Usage, +): AgentEvent { + if (durationMs !== undefined && usage !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs, usage }; + } + if (durationMs !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs }; + } + if (usage !== undefined) { + return { type: "done", conversationId, turnId, reason, usage }; + } return { type: "done", conversationId, turnId, reason }; } diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 089f65b..ce654d5 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -125,6 +125,7 @@ describe("runTurn", () => { "text-delta", "reasoning-delta", "usage", + "step-complete", "done", ]); }); @@ -1970,4 +1971,333 @@ describe("runTurn", () => { } }); }); + + describe("timing events (now provided)", () => { + function createCounterNow(): { now: () => number; tick: (ms: number) => void } { + let current = 0; + return { + now: () => current, + tick: (ms: number) => { + current += ms; + }, + }; + } + + it("emits step-complete per step with timing when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const { events, emit } = createCollectingEmit(); + + // Advance clock during stream: first token at +50ms, stream ends at +200ms + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(50); // stream starts + yield { type: "text-delta", delta: "Hello" } as ProviderEvent; + // first token seen at 150 (100+50) + clock.tick(100); + yield { type: "text-delta", delta: " world" } as ProviderEvent; + clock.tick(50); + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(1); + + const sc = stepCompleteEvts[0]; + if (sc?.type === "step-complete") { + expect(sc.conversationId).toBe("conv-1"); + expect(sc.turnId).toBe("turn-1"); + expect(sc.stepId).toBeDefined(); + expect(sc.genTotalMs).toBe(200); // 50+100+50 + expect(sc.ttftMs).toBe(50); // stream start → first text-delta + expect(sc.decodeMs).toBe(150); // first token → stream end + const ttft = sc.ttftMs; + const decode = sc.decodeMs; + const genTotal = sc.genTotalMs; + if (ttft !== undefined && decode !== undefined && genTotal !== undefined) { + expect(genTotal).toBe(ttft + decode); + } + } + }); + + it("step-complete omits ttft/decode but keeps genTotalMs for a no-content step", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(80); // stream starts at 180 + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + clock.tick(20); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + clock.tick(50); + yield { type: "text-delta", delta: "done" } as ProviderEvent; + clock.tick(50); + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + + // First step: tool-call-only, no content token + const sc0 = stepCompleteEvts[0]; + if (sc0?.type === "step-complete") { + expect(sc0.stepId).toBeDefined(); + expect(sc0.genTotalMs).toBe(100); // 80+20 + expect(sc0.ttftMs).toBeUndefined(); + expect(sc0.decodeMs).toBeUndefined(); + } + + // Second step: has text-delta + const sc1 = stepCompleteEvts[1]; + if (sc1?.type === "step-complete") { + expect(sc1.stepId).toBeDefined(); + expect(sc1.genTotalMs).toBe(100); // 50+50 + expect(sc1.ttftMs).toBe(50); + expect(sc1.decodeMs).toBe(50); + } + }); + + it("usage event carries stepId", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const usageEvts = events.filter((e) => e.type === "usage"); + expect(usageEvts).toHaveLength(1); + const ue = usageEvts[0]; + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + }); + + it("tool-result carries durationMs (execution time) when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("slow", async () => { + clock.tick(200); // tool takes 200ms to execute + return { content: "done" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "slow", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + expect(toolResultEvts).toHaveLength(1); + const tr = toolResultEvts[0]; + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeDefined(); + expect(tr.durationMs).toBe(200); + } + }); + + it("done carries durationMs and aggregate usage when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + return (async function* () { + clock.tick(80); // stream duration + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeDefined(); + expect(d.durationMs).toBeGreaterThan(0); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(10); + expect(d.usage.outputTokens).toBe(5); + } + } + }); + + it("no now → timing fields absent", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + // no now + }); + + // step-complete still emitted (with stepId, no timing) + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + for (const sc of stepCompleteEvts) { + if (sc?.type === "step-complete") { + expect(sc.stepId).toBeDefined(); + expect(sc.ttftMs).toBeUndefined(); + expect(sc.decodeMs).toBeUndefined(); + expect(sc.genTotalMs).toBeUndefined(); + } + } + + // usage still carries stepId + const usageEvts = events.filter((e) => e.type === "usage"); + for (const ue of usageEvts) { + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + } + + // no durationMs on tool-result + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + for (const tr of toolResultEvts) { + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeUndefined(); + } + } + + // no durationMs on done, but usage is present (independent of now) + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeUndefined(); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(15); + expect(d.usage.outputTokens).toBe(8); + } + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index a8ee6c9..06069a2 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -8,6 +8,7 @@ import { doneEvent, errorEvent, reasoningDeltaEvent, + stepCompleteEvent, textDeltaEvent, toolCallEvent, toolResultEvent, @@ -84,12 +85,15 @@ interface StepContext { readonly turnSpan: Span | undefined; readonly toolSpans: Map<string, Span>; readonly cwd: string | undefined; + readonly now: (() => number) | undefined; } interface TimingState { ttftSpan: Span | undefined; decodeSpan: Span | undefined; firstTokenSeen: boolean; + streamStartMs: number | undefined; + firstTokenMs: number | undefined; } interface StepResult { @@ -108,11 +112,15 @@ function processEvent( ctx: StepContext, stepSpan: Span | undefined, timing: TimingState, + toolDispatchTimes: Map<string, number>, ): void { switch (event.type) { case "text-delta": if (!timing.firstTokenSeen) { timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } try { timing.ttftSpan?.end({ attrs: { firstToken: true } }); } catch { @@ -131,6 +139,9 @@ function processEvent( case "reasoning-delta": if (!timing.firstTokenSeen) { timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } try { timing.ttftSpan?.end({ attrs: { firstToken: true } }); } catch { @@ -171,6 +182,11 @@ function processEvent( ), ); + // Capture dispatch time for tool-call durationMs + if (ctx.now !== undefined) { + toolDispatchTimes.set(event.toolCallId, ctx.now()); + } + // Open a tool-call span as a child of the step span (attrs: name, toolCallId) try { const tcSpan = @@ -194,7 +210,7 @@ function processEvent( break; } case "usage": - ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage)); + ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage, ctx.stepId)); break; case "finish": break; @@ -212,6 +228,7 @@ function processEvent( async function executeStep(ctx: StepContext): Promise<StepResult> { const chunks: Chunk[] = []; const toolCalls: ToolCall[] = []; + const toolDispatchTimes = new Map<string, number>(); let stepUsage = zeroUsage(); let finishReason = "stop"; @@ -250,6 +267,8 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ttftSpan: undefined, decodeSpan: undefined, firstTokenSeen: false, + streamStartMs: ctx.now !== undefined ? ctx.now() : undefined, + firstTokenMs: undefined, }; // Open TTFT span when spans are enabled @@ -268,7 +287,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); for await (const event of stream) { if (ctx.signal.aborted) break; - processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing); + processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes); if (event.type === "usage") { stepUsage = addUsage(stepUsage, event.usage); } @@ -305,6 +324,22 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { // Swallow — D7. } + // Emit step-complete event with timing + const streamEndMs = ctx.now !== undefined ? ctx.now() : undefined; + if (timing.streamStartMs !== undefined && streamEndMs !== undefined) { + const genTotalMs = streamEndMs - timing.streamStartMs; + const stepTiming: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = { + genTotalMs, + }; + if (timing.firstTokenMs !== undefined) { + stepTiming.ttftMs = timing.firstTokenMs - timing.streamStartMs; + stepTiming.decodeMs = streamEndMs - timing.firstTokenMs; + } + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId, stepTiming)); + } else { + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId)); + } + if (!ctx.dispatch.eager) { for (const call of toolCalls) { dispatcher.submit(call); @@ -337,6 +372,9 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const result = results.get(call.id); if (result !== undefined) { const isError = result.isError ?? false; + const dispatchTime = toolDispatchTimes.get(call.id); + const toolDurationMs = + ctx.now !== undefined && dispatchTime !== undefined ? ctx.now() - dispatchTime : undefined; ctx.emit( toolResultEvent( ctx.conversationId, @@ -346,6 +384,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { call.name, result.content, isError, + toolDurationMs, ), ); toolMessages.push({ @@ -400,6 +439,10 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { const turnId = input.turnId; const signal = input.signal ?? new AbortController().signal; const logger = input.logger; + const now = input.now; + + // Record turn start time for durationMs on done + const turnStartMs = now !== undefined ? now() : undefined; // Open a turn span (attrs: conversationId, turnId, model) let turnSpan: Span | undefined; @@ -444,6 +487,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { turnSpan, toolSpans, cwd: input.cwd, + now, }); totalUsage = addUsage(totalUsage, stepResult.usage); @@ -499,7 +543,22 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { } } - input.emit(doneEvent(conversationId, turnId, finishReason)); + const turnDurationMs = + turnStartMs !== undefined && now !== undefined ? now() - turnStartMs : undefined; + const hasUsage = + totalUsage.inputTokens > 0 || + totalUsage.outputTokens > 0 || + totalUsage.cacheReadTokens !== undefined || + totalUsage.cacheWriteTokens !== undefined; + input.emit( + doneEvent( + conversationId, + turnId, + finishReason, + turnDurationMs, + hasUsage ? totalUsage : undefined, + ), + ); return { messages: resultMessages, usage: totalUsage, finishReason }; } |
