diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 17:46:53 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 17:46:53 +0900 |
| commit | 7c459c7d919d1e08a228e8abc56129be174d8abe (patch) | |
| tree | 93011125c001945723ac9b9358c4ddd450f87f72 /packages | |
| parent | 5746cf4e545cd5b0d7faf0595554f273f236f3a9 (diff) | |
| download | dispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.tar.gz dispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.zip | |
feat(wire,kernel,session-orchestrator): live turn metrics on the stream
Expose the backend's authoritative token+timing metrics on the live AgentEvent
stream (observability-only -> now also client-facing). All additive/optional.
- [email protected]: new TurnStepCompleteEvent (type:step-complete) with per-step
ttftMs/decodeMs/genTotalMs; usage += stepId; tool-result += durationMs (exec);
done += durationMs (turn wall-clock) + usage (turn total). RunTurnInput += now?.
[email protected] (re-export bump).
- kernel-runtime: when now injected, measures + emits the above (reuses the
ttft/decode first-token detection); omits timing gracefully without a clock.
- session-orchestrator: adds now? to deps, threads into RunTurnInput; extension
activate injects () => Date.now().
- transport/cli/host-bin: untouched (verbatim pass-through; additive fields).
FE handoff: frontend-metrics-handoff.md. typecheck clean; 520 vitest + 89 bun;
biome 0/0. Replay/persistence = deferred Pass 2 (documented in tasks.md).
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/kernel/src/contracts/events.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 10 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/events.ts | 80 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 330 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 65 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/extension.ts | 1 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 47 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 3 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 52 |
12 files changed, 585 insertions, 9 deletions
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index 8737b02..be09066 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -13,6 +13,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, TurnToolOutputEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index 1698486..38f1442 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -35,6 +35,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, TurnToolOutputEvent, diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index 8917709..b7fe23c 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -90,6 +90,16 @@ export interface RunTurnInput { * emitted (backward-compatible with callers that don't yet pass a logger). */ readonly logger?: Logger; + + /** + * Optional monotonic-ish clock (milliseconds) for emitting wall-clock timing + * on outward events: per-step `step-complete` (ttft/decode/genTotal), tool + * execution `durationMs` on `tool-result`, and turn `durationMs` on `done`. + * Injected (not ambient) so the runtime stays pure and deterministic in tests. + * If omitted, the runtime emits no such timing (the optional fields stay + * absent) — backward-compatible with callers that don't provide a clock. + */ + readonly now?: () => number; } /** diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index deeb012..300e711 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -33,9 +33,10 @@ export function toolResultEvent( toolName: string, content: string, isError: boolean, + durationMs?: number, ): AgentEvent { - return { - type: "tool-result", + const base = { + type: "tool-result" as const, conversationId, turnId, stepId, @@ -44,6 +45,10 @@ export function toolResultEvent( content, isError, }; + if (durationMs !== undefined) { + return { ...base, durationMs }; + } + return base; } export function toolOutputEvent( @@ -56,7 +61,15 @@ export function toolOutputEvent( return { type: "tool-output", conversationId, turnId, toolCallId, data, stream }; } -export function usageEvent(conversationId: string, turnId: string, usage: Usage): AgentEvent { +export function usageEvent( + conversationId: string, + turnId: string, + usage: Usage, + stepId?: StepId, +): AgentEvent { + if (stepId !== undefined) { + return { type: "usage", conversationId, turnId, usage, stepId }; + } return { type: "usage", conversationId, turnId, usage }; } @@ -64,7 +77,66 @@ export function turnStartEvent(conversationId: string, turnId: string): AgentEve return { type: "turn-start", conversationId, turnId }; } -export function doneEvent(conversationId: string, turnId: string, reason: string): AgentEvent { +export function stepCompleteEvent( + conversationId: string, + turnId: string, + stepId: StepId, + timing?: { ttftMs?: number; decodeMs?: number; genTotalMs?: number }, +): AgentEvent { + if (timing !== undefined) { + if (timing.ttftMs !== undefined) { + if (timing.decodeMs !== undefined && timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + decodeMs: timing.decodeMs, + genTotalMs: timing.genTotalMs, + }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + genTotalMs: timing.genTotalMs, + }; + } + return { type: "step-complete", conversationId, turnId, stepId, ttftMs: timing.ttftMs }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + genTotalMs: timing.genTotalMs, + }; + } + } + return { type: "step-complete", conversationId, turnId, stepId }; +} + +export function doneEvent( + conversationId: string, + turnId: string, + reason: string, + durationMs?: number, + usage?: Usage, +): AgentEvent { + if (durationMs !== undefined && usage !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs, usage }; + } + if (durationMs !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs }; + } + if (usage !== undefined) { + return { type: "done", conversationId, turnId, reason, usage }; + } return { type: "done", conversationId, turnId, reason }; } diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 089f65b..ce654d5 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -125,6 +125,7 @@ describe("runTurn", () => { "text-delta", "reasoning-delta", "usage", + "step-complete", "done", ]); }); @@ -1970,4 +1971,333 @@ describe("runTurn", () => { } }); }); + + describe("timing events (now provided)", () => { + function createCounterNow(): { now: () => number; tick: (ms: number) => void } { + let current = 0; + return { + now: () => current, + tick: (ms: number) => { + current += ms; + }, + }; + } + + it("emits step-complete per step with timing when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const { events, emit } = createCollectingEmit(); + + // Advance clock during stream: first token at +50ms, stream ends at +200ms + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(50); // stream starts + yield { type: "text-delta", delta: "Hello" } as ProviderEvent; + // first token seen at 150 (100+50) + clock.tick(100); + yield { type: "text-delta", delta: " world" } as ProviderEvent; + clock.tick(50); + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(1); + + const sc = stepCompleteEvts[0]; + if (sc?.type === "step-complete") { + expect(sc.conversationId).toBe("conv-1"); + expect(sc.turnId).toBe("turn-1"); + expect(sc.stepId).toBeDefined(); + expect(sc.genTotalMs).toBe(200); // 50+100+50 + expect(sc.ttftMs).toBe(50); // stream start → first text-delta + expect(sc.decodeMs).toBe(150); // first token → stream end + const ttft = sc.ttftMs; + const decode = sc.decodeMs; + const genTotal = sc.genTotalMs; + if (ttft !== undefined && decode !== undefined && genTotal !== undefined) { + expect(genTotal).toBe(ttft + decode); + } + } + }); + + it("step-complete omits ttft/decode but keeps genTotalMs for a no-content step", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(80); // stream starts at 180 + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + clock.tick(20); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + clock.tick(50); + yield { type: "text-delta", delta: "done" } as ProviderEvent; + clock.tick(50); + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + + // First step: tool-call-only, no content token + const sc0 = stepCompleteEvts[0]; + if (sc0?.type === "step-complete") { + expect(sc0.stepId).toBeDefined(); + expect(sc0.genTotalMs).toBe(100); // 80+20 + expect(sc0.ttftMs).toBeUndefined(); + expect(sc0.decodeMs).toBeUndefined(); + } + + // Second step: has text-delta + const sc1 = stepCompleteEvts[1]; + if (sc1?.type === "step-complete") { + expect(sc1.stepId).toBeDefined(); + expect(sc1.genTotalMs).toBe(100); // 50+50 + expect(sc1.ttftMs).toBe(50); + expect(sc1.decodeMs).toBe(50); + } + }); + + it("usage event carries stepId", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const usageEvts = events.filter((e) => e.type === "usage"); + expect(usageEvts).toHaveLength(1); + const ue = usageEvts[0]; + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + }); + + it("tool-result carries durationMs (execution time) when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("slow", async () => { + clock.tick(200); // tool takes 200ms to execute + return { content: "done" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "slow", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + expect(toolResultEvts).toHaveLength(1); + const tr = toolResultEvts[0]; + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeDefined(); + expect(tr.durationMs).toBe(200); + } + }); + + it("done carries durationMs and aggregate usage when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + return (async function* () { + clock.tick(80); // stream duration + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeDefined(); + expect(d.durationMs).toBeGreaterThan(0); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(10); + expect(d.usage.outputTokens).toBe(5); + } + } + }); + + it("no now → timing fields absent", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + // no now + }); + + // step-complete still emitted (with stepId, no timing) + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + for (const sc of stepCompleteEvts) { + if (sc?.type === "step-complete") { + expect(sc.stepId).toBeDefined(); + expect(sc.ttftMs).toBeUndefined(); + expect(sc.decodeMs).toBeUndefined(); + expect(sc.genTotalMs).toBeUndefined(); + } + } + + // usage still carries stepId + const usageEvts = events.filter((e) => e.type === "usage"); + for (const ue of usageEvts) { + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + } + + // no durationMs on tool-result + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + for (const tr of toolResultEvts) { + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeUndefined(); + } + } + + // no durationMs on done, but usage is present (independent of now) + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeUndefined(); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(15); + expect(d.usage.outputTokens).toBe(8); + } + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index a8ee6c9..06069a2 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -8,6 +8,7 @@ import { doneEvent, errorEvent, reasoningDeltaEvent, + stepCompleteEvent, textDeltaEvent, toolCallEvent, toolResultEvent, @@ -84,12 +85,15 @@ interface StepContext { readonly turnSpan: Span | undefined; readonly toolSpans: Map<string, Span>; readonly cwd: string | undefined; + readonly now: (() => number) | undefined; } interface TimingState { ttftSpan: Span | undefined; decodeSpan: Span | undefined; firstTokenSeen: boolean; + streamStartMs: number | undefined; + firstTokenMs: number | undefined; } interface StepResult { @@ -108,11 +112,15 @@ function processEvent( ctx: StepContext, stepSpan: Span | undefined, timing: TimingState, + toolDispatchTimes: Map<string, number>, ): void { switch (event.type) { case "text-delta": if (!timing.firstTokenSeen) { timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } try { timing.ttftSpan?.end({ attrs: { firstToken: true } }); } catch { @@ -131,6 +139,9 @@ function processEvent( case "reasoning-delta": if (!timing.firstTokenSeen) { timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } try { timing.ttftSpan?.end({ attrs: { firstToken: true } }); } catch { @@ -171,6 +182,11 @@ function processEvent( ), ); + // Capture dispatch time for tool-call durationMs + if (ctx.now !== undefined) { + toolDispatchTimes.set(event.toolCallId, ctx.now()); + } + // Open a tool-call span as a child of the step span (attrs: name, toolCallId) try { const tcSpan = @@ -194,7 +210,7 @@ function processEvent( break; } case "usage": - ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage)); + ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage, ctx.stepId)); break; case "finish": break; @@ -212,6 +228,7 @@ function processEvent( async function executeStep(ctx: StepContext): Promise<StepResult> { const chunks: Chunk[] = []; const toolCalls: ToolCall[] = []; + const toolDispatchTimes = new Map<string, number>(); let stepUsage = zeroUsage(); let finishReason = "stop"; @@ -250,6 +267,8 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ttftSpan: undefined, decodeSpan: undefined, firstTokenSeen: false, + streamStartMs: ctx.now !== undefined ? ctx.now() : undefined, + firstTokenMs: undefined, }; // Open TTFT span when spans are enabled @@ -268,7 +287,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); for await (const event of stream) { if (ctx.signal.aborted) break; - processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing); + processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes); if (event.type === "usage") { stepUsage = addUsage(stepUsage, event.usage); } @@ -305,6 +324,22 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { // Swallow — D7. } + // Emit step-complete event with timing + const streamEndMs = ctx.now !== undefined ? ctx.now() : undefined; + if (timing.streamStartMs !== undefined && streamEndMs !== undefined) { + const genTotalMs = streamEndMs - timing.streamStartMs; + const stepTiming: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = { + genTotalMs, + }; + if (timing.firstTokenMs !== undefined) { + stepTiming.ttftMs = timing.firstTokenMs - timing.streamStartMs; + stepTiming.decodeMs = streamEndMs - timing.firstTokenMs; + } + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId, stepTiming)); + } else { + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId)); + } + if (!ctx.dispatch.eager) { for (const call of toolCalls) { dispatcher.submit(call); @@ -337,6 +372,9 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const result = results.get(call.id); if (result !== undefined) { const isError = result.isError ?? false; + const dispatchTime = toolDispatchTimes.get(call.id); + const toolDurationMs = + ctx.now !== undefined && dispatchTime !== undefined ? ctx.now() - dispatchTime : undefined; ctx.emit( toolResultEvent( ctx.conversationId, @@ -346,6 +384,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { call.name, result.content, isError, + toolDurationMs, ), ); toolMessages.push({ @@ -400,6 +439,10 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { const turnId = input.turnId; const signal = input.signal ?? new AbortController().signal; const logger = input.logger; + const now = input.now; + + // Record turn start time for durationMs on done + const turnStartMs = now !== undefined ? now() : undefined; // Open a turn span (attrs: conversationId, turnId, model) let turnSpan: Span | undefined; @@ -444,6 +487,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { turnSpan, toolSpans, cwd: input.cwd, + now, }); totalUsage = addUsage(totalUsage, stepResult.usage); @@ -499,7 +543,22 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { } } - input.emit(doneEvent(conversationId, turnId, finishReason)); + const turnDurationMs = + turnStartMs !== undefined && now !== undefined ? now() - turnStartMs : undefined; + const hasUsage = + totalUsage.inputTokens > 0 || + totalUsage.outputTokens > 0 || + totalUsage.cacheReadTokens !== undefined || + totalUsage.cacheWriteTokens !== undefined; + input.emit( + doneEvent( + conversationId, + turnId, + finishReason, + turnDurationMs, + hasUsage ? totalUsage : undefined, + ), + ); return { messages: resultMessages, usage: totalUsage, finishReason }; } diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index af7d6e0..fbb7c15 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -38,6 +38,7 @@ export function activate(host: HostAPI): void { }, runTurn, logger: host.logger, + now: () => Date.now(), }); host.provideService(sessionOrchestratorHandle, orchestrator); diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index b648d42..3954ffe 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -363,6 +363,53 @@ describe("handleMessage model resolution", () => { expect(captured).toHaveLength(2); expect(captured[1]?.cwd).toBeUndefined(); }); + + it("forwards an injected now into the RunTurnInput passed to runTurn", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captured, captureRunTurn } = createCapturingRunTurn(); + const fakeNow = () => 42; + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn: captureRunTurn, + now: fakeNow, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-now", + text: "hi", + onEvent: () => {}, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]?.now).toBe(fakeNow); + expect(captured[0]?.now?.()).toBe(42); + }); + + it("omits now from RunTurnInput when deps.now is not provided", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captured, captureRunTurn } = createCapturingRunTurn(); + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn: captureRunTurn, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-no-now", + text: "hi", + onEvent: () => {}, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]?.now).toBeUndefined(); + }); }); describe("turn-sealed event", () => { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 311b620..04f6ad2 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -39,6 +39,8 @@ export interface SessionOrchestratorDeps { readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>; /** Base logger (auto-scoped to this extension); childed per turn for span capture. */ readonly logger?: Logger; + /** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */ + readonly now?: () => number; } export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator { @@ -86,6 +88,7 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio ...(turnLogger !== undefined ? { logger: turnLogger } : {}), ...(signal !== undefined ? { signal } : {}), ...(cwd !== undefined ? { cwd } : {}), + ...(deps.now !== undefined ? { now: deps.now } : {}), }; const result = await deps.runTurn(opts); diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 4e5f382..a2711af 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.2.0", + "version": "0.3.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/package.json b/packages/wire/package.json index 6098703..762c06e 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.2.0", + "version": "0.3.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 90213b4..a4790de 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -168,6 +168,7 @@ export type AgentEvent = | TurnToolResultEvent | TurnToolOutputEvent | TurnUsageEvent + | TurnStepCompleteEvent | TurnErrorEvent | TurnDoneEvent | TurnSealedEvent; @@ -236,6 +237,12 @@ export interface TurnToolResultEvent { readonly toolName: string; readonly content: string; readonly isError: boolean; + /** + * How long the tool took to execute (dispatch → result), in milliseconds — + * the backend's authoritative execution time, distinct from any client-side + * wall-clock. Optional: present only when the runtime was given a clock. + */ + readonly durationMs?: number; } /** Streaming output from a tool execution (e.g. shell stdout/stderr). */ @@ -253,9 +260,43 @@ export interface TurnUsageEvent { readonly type: "usage"; readonly conversationId: string; readonly turnId: string; + /** + * The step this usage report belongs to, so a consumer can attribute tokens + * per step (and join with the matching `step-complete` timing by `stepId`). + * Optional: absent when the runtime had no step context, and on usage emitted + * before this field existed. + */ + readonly stepId?: StepId; readonly usage: Usage; } +/** + * A step (one LLM round-trip) has completed — the authoritative per-step metrics + * packet, emitted once at the step's end (after the generation stream finishes), + * so its timing is final (unlike `usage`, which may arrive mid-stream). Carries + * the step's generation timing; join to the step's tokens via `stepId` on the + * `usage` event. All timing fields are optional: present only when the runtime + * was given a clock, and `ttftMs`/`decodeMs` additionally require that a first + * content token (text or reasoning) was observed this step. + */ +export interface TurnStepCompleteEvent { + readonly type: "step-complete"; + readonly conversationId: string; + readonly turnId: string; + readonly stepId: StepId; + /** Time to first token: stream start → first text/reasoning delta. */ + readonly ttftMs?: number; + /** Decode time: first token → stream end (generation total − TTFT). */ + readonly decodeMs?: number; + /** + * Total generation time for the step: stream start → stream end. Present + * whenever a clock was available, even if no first token was seen (in which + * case `ttftMs`/`decodeMs` are absent). When a first token was seen, + * `genTotalMs === ttftMs + decodeMs`. + */ + readonly genTotalMs?: number; +} + /** An error occurred during the turn. */ export interface TurnErrorEvent { readonly type: "error"; @@ -271,6 +312,17 @@ export interface TurnDoneEvent { readonly conversationId: string; readonly turnId: string; readonly reason: string; + /** + * Total wall-clock duration of the turn (turn start → turn end), in + * milliseconds. Optional: present only when the runtime was given a clock. + */ + readonly durationMs?: number; + /** + * Aggregate token usage across all steps in the turn — a convenience total so + * a consumer need not sum the per-step `usage` events. Optional (absent if the + * provider reported no usage). + */ + readonly usage?: Usage; } /** |
