diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 17:46:53 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 17:46:53 +0900 |
| commit | 7c459c7d919d1e08a228e8abc56129be174d8abe (patch) | |
| tree | 93011125c001945723ac9b9358c4ddd450f87f72 | |
| parent | 5746cf4e545cd5b0d7faf0595554f273f236f3a9 (diff) | |
| download | dispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.tar.gz dispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.zip | |
feat(wire,kernel,session-orchestrator): live turn metrics on the stream
Expose the backend's authoritative token+timing metrics on the live AgentEvent
stream (observability-only -> now also client-facing). All additive/optional.
- [email protected]: new TurnStepCompleteEvent (type:step-complete) with per-step
ttftMs/decodeMs/genTotalMs; usage += stepId; tool-result += durationMs (exec);
done += durationMs (turn wall-clock) + usage (turn total). RunTurnInput += now?.
[email protected] (re-export bump).
- kernel-runtime: when now injected, measures + emits the above (reuses the
ttft/decode first-token detection); omits timing gracefully without a clock.
- session-orchestrator: adds now? to deps, threads into RunTurnInput; extension
activate injects () => Date.now().
- transport/cli/host-bin: untouched (verbatim pass-through; additive fields).
FE handoff: frontend-metrics-handoff.md. typecheck clean; 520 vitest + 89 bun;
biome 0/0. Replay/persistence = deferred Pass 2 (documented in tasks.md).
| -rw-r--r-- | frontend-metrics-handoff.md | 121 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/events.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 10 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/events.ts | 80 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 330 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 65 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/extension.ts | 1 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 47 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 3 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 52 | ||||
| -rw-r--r-- | tasks.md | 47 |
14 files changed, 753 insertions, 9 deletions
diff --git a/frontend-metrics-handoff.md b/frontend-metrics-handoff.md new file mode 100644 index 0000000..8da45ad --- /dev/null +++ b/frontend-metrics-handoff.md @@ -0,0 +1,121 @@ +# Frontend handoff — live turn metrics (tokens + timing) + +> From: arch-rewrite (backend) orchestrator · For: the dispatch-web FE team. +> Status: **LIVE on the stream now** (backend committed + live-verified). Consume via the pinned +> contracts `@dispatch/[email protected]` + `@dispatch/[email protected]` (reference snapshots +> regenerated in `dispatch-web/.dispatch/{wire,transport-contract}.reference.md`). + +## 1. What you can now access +The backend's **authoritative** token + timing metrics are now on the live turn stream: + +| Metric | Where | Field(s) | +|---|---|---| +| Per-step tokens | `usage` event | `usage` (`inputTokens`/`outputTokens`/`cacheReadTokens?`/`cacheWriteTokens?`) + new `stepId?` | +| Per-step **TTFT** | new `step-complete` event | `ttftMs?` | +| Per-step **decode** time | new `step-complete` event | `decodeMs?` | +| Per-step total generation | new `step-complete` event | `genTotalMs?` | +| **Tool execution** time | `tool-result` event | `durationMs?` | +| **Turn** wall-clock | `done` event | `durationMs?` | +| **Turn** total tokens | `done` event | `usage?` | +| **Tokens/sec** (TPS) | derive | `usage.outputTokens / (step-complete.decodeMs / 1000)` | +| Context-size proxy | `usage` event | `usage.inputTokens` (size the model counted; `cacheReadTokens` = cached portion) | + +"Authoritative" = measured by the backend runtime, not client wall-clock. They differ from +anything you'd time in the browser (no network/buffering in them). + +## 2. How they're delivered +**Inline, in the same chat stream you already consume** — WS `chat.delta` frames (and the +`POST /chat` NDJSON stream) carry the `AgentEvent` union; metrics are additional event types / +fields in that union. **No new endpoint, no subscription/negotiation.** You already `switch` on +`event.type`; route the metric events to a telemetry handler and ignore any you don't render +(zero cost). They do **not** appear in message content — keep your transcript rendering as-is. + +These events are **low-frequency** (one `step-complete` per step, one `done` per turn, a +`durationMs` per tool result) — not per-token — so there's no stream-volume concern. + +## 3. The new/changed events (shapes) +All new fields are **optional** — see §5. Every event still carries `conversationId` + `turnId`. + +```ts +// NEW variant in AgentEvent — emitted once per step, AT STEP END (timing is final here) +interface TurnStepCompleteEvent { + type: "step-complete"; + conversationId: string; + turnId: string; + stepId: StepId; // join key to the step's `usage` event + tool events + ttftMs?: number; // time to first token (stream start → first text|reasoning delta) + decodeMs?: number; // first token → stream end (== genTotalMs - ttftMs) + genTotalMs?: number; // whole-step generation (present even if no first token was seen) +} + +// usage event — now labeled by step +interface TurnUsageEvent { + type: "usage"; + conversationId: string; turnId: string; + stepId?: StepId; // NEW — attribute tokens to a step / join to step-complete + usage: Usage; // { inputTokens, outputTokens, cacheReadTokens?, cacheWriteTokens? } +} + +// tool-result — now carries execution time +interface TurnToolResultEvent { + type: "tool-result"; + conversationId: string; turnId: string; + stepId: StepId; toolCallId: string; toolName: string; + content: string; isError: boolean; + durationMs?: number; // NEW — tool execution time (dispatch → result) +} + +// done — now carries turn totals +interface TurnDoneEvent { + type: "done"; + conversationId: string; turnId: string; + reason: string; + durationMs?: number; // NEW — whole-turn wall-clock + usage?: Usage; // NEW — aggregate turn tokens (so you needn't sum the usage events) +} +``` + +## 4. Correlation & derived metrics +Keys: `turnId` groups a turn; `stepId` groups a step within it; `toolCallId` pairs a tool call +with its result. A turn has **one `step-complete` (and usually one `usage`) per step**. + +- **Per-step TPS** = `usage.outputTokens / (step-complete.decodeMs / 1000)` — join `usage` and + `step-complete` by `stepId`. (Use `decodeMs`, not `genTotalMs`, for decode-rate TPS; it excludes + first-token latency. See "which TPS" caveat below.) +- **Turn TPS** = `done.usage.outputTokens / (Σ step-complete.decodeMs / 1000)`. +- **Generation total per step** = `genTotalMs` (or `ttftMs + decodeMs`). +- **Turn-visible first-token latency** = the `ttftMs` of **step 0** (the first `step-complete`). +- **Total prefill overhead** = `Σ ttftMs` across steps; **pure generation** = `Σ decodeMs`. +- **Tool time** = `tool-result.durationMs` per call; sum per `stepId` for a batch. + +"Which TPS": `decodeMs` is first-token → end, so TPS over it is the decode rate (first-token +latency removed). If you want end-to-end rate including the wait, use `ttftMs + decodeMs`. + +## 5. Optionality — you MUST tolerate absence +- `step-complete` is always emitted per step, but its **timing fields are present only when the + server runs with a clock** (it does in normal operation). `ttftMs`/`decodeMs` are additionally + absent for a step that produced **no text/reasoning token** (e.g. a tool-call-only step) — + `genTotalMs` is still present in that case. +- `usage.stepId`, `tool-result.durationMs`, `done.durationMs`, `done.usage` are all optional. +- Render gracefully when a value is missing (omit the figure; don't show `NaN`/`undefined`). + +## 6. What is NOT available yet (deferred — Pass 2) +**Metrics are LIVE-ONLY.** They are **not persisted**, so: +- `GET /conversations/:id` (history) returns messages/chunks but **no tokens/timing**. Reopening a + past conversation will show content without metrics. +- If you need historical metrics (e.g. show TPS on a reloaded conversation), that's the planned + **Pass 2** (persist per-turn metrics + a read path) — see `tasks.md` "Pass 2 — DEFERRED". Tell + us if you need it and we'll prioritize. +- TPS is not sent pre-computed (derive it, §4). No per-token timing (metrics are per-step/per-turn). + +## 7. Integration checklist +1. Refresh deps: `bun run typecheck` in dispatch-web (picks up `[email protected]` / `[email protected]`). +2. Extend your `chat.delta` event handler: add a `case "step-complete"` and read the new optional + fields on `usage`/`tool-result`/`done`. (No exhaustive-switch break — these are additive.) +3. Keep a per-turn (and per-step, keyed by `stepId`) telemetry accumulator alongside the transcript + store; fold metric events into it; render where you want (e.g. a turn footer / per-step badges). +4. Treat every metric field as optional (§5). + +## 8. Carrier facts (unchanged) +HTTP 24203 (`POST /chat` NDJSON, `GET /conversations/:id`, `GET /models`), WS 24205 (one socket, +`chat.delta` carries each `AgentEvent`), CORS `*`. Same events on both carriers. diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index 8737b02..be09066 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -13,6 +13,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, TurnToolOutputEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index 1698486..38f1442 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -35,6 +35,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, TurnToolOutputEvent, diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index 8917709..b7fe23c 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -90,6 +90,16 @@ export interface RunTurnInput { * emitted (backward-compatible with callers that don't yet pass a logger). */ readonly logger?: Logger; + + /** + * Optional monotonic-ish clock (milliseconds) for emitting wall-clock timing + * on outward events: per-step `step-complete` (ttft/decode/genTotal), tool + * execution `durationMs` on `tool-result`, and turn `durationMs` on `done`. + * Injected (not ambient) so the runtime stays pure and deterministic in tests. + * If omitted, the runtime emits no such timing (the optional fields stay + * absent) — backward-compatible with callers that don't provide a clock. + */ + readonly now?: () => number; } /** diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index deeb012..300e711 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -33,9 +33,10 @@ export function toolResultEvent( toolName: string, content: string, isError: boolean, + durationMs?: number, ): AgentEvent { - return { - type: "tool-result", + const base = { + type: "tool-result" as const, conversationId, turnId, stepId, @@ -44,6 +45,10 @@ export function toolResultEvent( content, isError, }; + if (durationMs !== undefined) { + return { ...base, durationMs }; + } + return base; } export function toolOutputEvent( @@ -56,7 +61,15 @@ export function toolOutputEvent( return { type: "tool-output", conversationId, turnId, toolCallId, data, stream }; } -export function usageEvent(conversationId: string, turnId: string, usage: Usage): AgentEvent { +export function usageEvent( + conversationId: string, + turnId: string, + usage: Usage, + stepId?: StepId, +): AgentEvent { + if (stepId !== undefined) { + return { type: "usage", conversationId, turnId, usage, stepId }; + } return { type: "usage", conversationId, turnId, usage }; } @@ -64,7 +77,66 @@ export function turnStartEvent(conversationId: string, turnId: string): AgentEve return { type: "turn-start", conversationId, turnId }; } -export function doneEvent(conversationId: string, turnId: string, reason: string): AgentEvent { +export function stepCompleteEvent( + conversationId: string, + turnId: string, + stepId: StepId, + timing?: { ttftMs?: number; decodeMs?: number; genTotalMs?: number }, +): AgentEvent { + if (timing !== undefined) { + if (timing.ttftMs !== undefined) { + if (timing.decodeMs !== undefined && timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + decodeMs: timing.decodeMs, + genTotalMs: timing.genTotalMs, + }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + genTotalMs: timing.genTotalMs, + }; + } + return { type: "step-complete", conversationId, turnId, stepId, ttftMs: timing.ttftMs }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + genTotalMs: timing.genTotalMs, + }; + } + } + return { type: "step-complete", conversationId, turnId, stepId }; +} + +export function doneEvent( + conversationId: string, + turnId: string, + reason: string, + durationMs?: number, + usage?: Usage, +): AgentEvent { + if (durationMs !== undefined && usage !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs, usage }; + } + if (durationMs !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs }; + } + if (usage !== undefined) { + return { type: "done", conversationId, turnId, reason, usage }; + } return { type: "done", conversationId, turnId, reason }; } diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 089f65b..ce654d5 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -125,6 +125,7 @@ describe("runTurn", () => { "text-delta", "reasoning-delta", "usage", + "step-complete", "done", ]); }); @@ -1970,4 +1971,333 @@ describe("runTurn", () => { } }); }); + + describe("timing events (now provided)", () => { + function createCounterNow(): { now: () => number; tick: (ms: number) => void } { + let current = 0; + return { + now: () => current, + tick: (ms: number) => { + current += ms; + }, + }; + } + + it("emits step-complete per step with timing when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const { events, emit } = createCollectingEmit(); + + // Advance clock during stream: first token at +50ms, stream ends at +200ms + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(50); // stream starts + yield { type: "text-delta", delta: "Hello" } as ProviderEvent; + // first token seen at 150 (100+50) + clock.tick(100); + yield { type: "text-delta", delta: " world" } as ProviderEvent; + clock.tick(50); + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(1); + + const sc = stepCompleteEvts[0]; + if (sc?.type === "step-complete") { + expect(sc.conversationId).toBe("conv-1"); + expect(sc.turnId).toBe("turn-1"); + expect(sc.stepId).toBeDefined(); + expect(sc.genTotalMs).toBe(200); // 50+100+50 + expect(sc.ttftMs).toBe(50); // stream start → first text-delta + expect(sc.decodeMs).toBe(150); // first token → stream end + const ttft = sc.ttftMs; + const decode = sc.decodeMs; + const genTotal = sc.genTotalMs; + if (ttft !== undefined && decode !== undefined && genTotal !== undefined) { + expect(genTotal).toBe(ttft + decode); + } + } + }); + + it("step-complete omits ttft/decode but keeps genTotalMs for a no-content step", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(80); // stream starts at 180 + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + clock.tick(20); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + clock.tick(50); + yield { type: "text-delta", delta: "done" } as ProviderEvent; + clock.tick(50); + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + + // First step: tool-call-only, no content token + const sc0 = stepCompleteEvts[0]; + if (sc0?.type === "step-complete") { + expect(sc0.stepId).toBeDefined(); + expect(sc0.genTotalMs).toBe(100); // 80+20 + expect(sc0.ttftMs).toBeUndefined(); + expect(sc0.decodeMs).toBeUndefined(); + } + + // Second step: has text-delta + const sc1 = stepCompleteEvts[1]; + if (sc1?.type === "step-complete") { + expect(sc1.stepId).toBeDefined(); + expect(sc1.genTotalMs).toBe(100); // 50+50 + expect(sc1.ttftMs).toBe(50); + expect(sc1.decodeMs).toBe(50); + } + }); + + it("usage event carries stepId", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const usageEvts = events.filter((e) => e.type === "usage"); + expect(usageEvts).toHaveLength(1); + const ue = usageEvts[0]; + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + }); + + it("tool-result carries durationMs (execution time) when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("slow", async () => { + clock.tick(200); // tool takes 200ms to execute + return { content: "done" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "slow", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + expect(toolResultEvts).toHaveLength(1); + const tr = toolResultEvts[0]; + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeDefined(); + expect(tr.durationMs).toBe(200); + } + }); + + it("done carries durationMs and aggregate usage when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + return (async function* () { + clock.tick(80); // stream duration + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeDefined(); + expect(d.durationMs).toBeGreaterThan(0); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(10); + expect(d.usage.outputTokens).toBe(5); + } + } + }); + + it("no now → timing fields absent", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + // no now + }); + + // step-complete still emitted (with stepId, no timing) + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + for (const sc of stepCompleteEvts) { + if (sc?.type === "step-complete") { + expect(sc.stepId).toBeDefined(); + expect(sc.ttftMs).toBeUndefined(); + expect(sc.decodeMs).toBeUndefined(); + expect(sc.genTotalMs).toBeUndefined(); + } + } + + // usage still carries stepId + const usageEvts = events.filter((e) => e.type === "usage"); + for (const ue of usageEvts) { + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + } + + // no durationMs on tool-result + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + for (const tr of toolResultEvts) { + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeUndefined(); + } + } + + // no durationMs on done, but usage is present (independent of now) + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeUndefined(); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(15); + expect(d.usage.outputTokens).toBe(8); + } + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index a8ee6c9..06069a2 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -8,6 +8,7 @@ import { doneEvent, errorEvent, reasoningDeltaEvent, + stepCompleteEvent, textDeltaEvent, toolCallEvent, toolResultEvent, @@ -84,12 +85,15 @@ interface StepContext { readonly turnSpan: Span | undefined; readonly toolSpans: Map<string, Span>; readonly cwd: string | undefined; + readonly now: (() => number) | undefined; } interface TimingState { ttftSpan: Span | undefined; decodeSpan: Span | undefined; firstTokenSeen: boolean; + streamStartMs: number | undefined; + firstTokenMs: number | undefined; } interface StepResult { @@ -108,11 +112,15 @@ function processEvent( ctx: StepContext, stepSpan: Span | undefined, timing: TimingState, + toolDispatchTimes: Map<string, number>, ): void { switch (event.type) { case "text-delta": if (!timing.firstTokenSeen) { timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } try { timing.ttftSpan?.end({ attrs: { firstToken: true } }); } catch { @@ -131,6 +139,9 @@ function processEvent( case "reasoning-delta": if (!timing.firstTokenSeen) { timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } try { timing.ttftSpan?.end({ attrs: { firstToken: true } }); } catch { @@ -171,6 +182,11 @@ function processEvent( ), ); + // Capture dispatch time for tool-call durationMs + if (ctx.now !== undefined) { + toolDispatchTimes.set(event.toolCallId, ctx.now()); + } + // Open a tool-call span as a child of the step span (attrs: name, toolCallId) try { const tcSpan = @@ -194,7 +210,7 @@ function processEvent( break; } case "usage": - ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage)); + ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage, ctx.stepId)); break; case "finish": break; @@ -212,6 +228,7 @@ function processEvent( async function executeStep(ctx: StepContext): Promise<StepResult> { const chunks: Chunk[] = []; const toolCalls: ToolCall[] = []; + const toolDispatchTimes = new Map<string, number>(); let stepUsage = zeroUsage(); let finishReason = "stop"; @@ -250,6 +267,8 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ttftSpan: undefined, decodeSpan: undefined, firstTokenSeen: false, + streamStartMs: ctx.now !== undefined ? ctx.now() : undefined, + firstTokenMs: undefined, }; // Open TTFT span when spans are enabled @@ -268,7 +287,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); for await (const event of stream) { if (ctx.signal.aborted) break; - processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing); + processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes); if (event.type === "usage") { stepUsage = addUsage(stepUsage, event.usage); } @@ -305,6 +324,22 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { // Swallow — D7. } + // Emit step-complete event with timing + const streamEndMs = ctx.now !== undefined ? ctx.now() : undefined; + if (timing.streamStartMs !== undefined && streamEndMs !== undefined) { + const genTotalMs = streamEndMs - timing.streamStartMs; + const stepTiming: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = { + genTotalMs, + }; + if (timing.firstTokenMs !== undefined) { + stepTiming.ttftMs = timing.firstTokenMs - timing.streamStartMs; + stepTiming.decodeMs = streamEndMs - timing.firstTokenMs; + } + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId, stepTiming)); + } else { + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId)); + } + if (!ctx.dispatch.eager) { for (const call of toolCalls) { dispatcher.submit(call); @@ -337,6 +372,9 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const result = results.get(call.id); if (result !== undefined) { const isError = result.isError ?? false; + const dispatchTime = toolDispatchTimes.get(call.id); + const toolDurationMs = + ctx.now !== undefined && dispatchTime !== undefined ? ctx.now() - dispatchTime : undefined; ctx.emit( toolResultEvent( ctx.conversationId, @@ -346,6 +384,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { call.name, result.content, isError, + toolDurationMs, ), ); toolMessages.push({ @@ -400,6 +439,10 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { const turnId = input.turnId; const signal = input.signal ?? new AbortController().signal; const logger = input.logger; + const now = input.now; + + // Record turn start time for durationMs on done + const turnStartMs = now !== undefined ? now() : undefined; // Open a turn span (attrs: conversationId, turnId, model) let turnSpan: Span | undefined; @@ -444,6 +487,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { turnSpan, toolSpans, cwd: input.cwd, + now, }); totalUsage = addUsage(totalUsage, stepResult.usage); @@ -499,7 +543,22 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { } } - input.emit(doneEvent(conversationId, turnId, finishReason)); + const turnDurationMs = + turnStartMs !== undefined && now !== undefined ? now() - turnStartMs : undefined; + const hasUsage = + totalUsage.inputTokens > 0 || + totalUsage.outputTokens > 0 || + totalUsage.cacheReadTokens !== undefined || + totalUsage.cacheWriteTokens !== undefined; + input.emit( + doneEvent( + conversationId, + turnId, + finishReason, + turnDurationMs, + hasUsage ? totalUsage : undefined, + ), + ); return { messages: resultMessages, usage: totalUsage, finishReason }; } diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index af7d6e0..fbb7c15 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -38,6 +38,7 @@ export function activate(host: HostAPI): void { }, runTurn, logger: host.logger, + now: () => Date.now(), }); host.provideService(sessionOrchestratorHandle, orchestrator); diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index b648d42..3954ffe 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -363,6 +363,53 @@ describe("handleMessage model resolution", () => { expect(captured).toHaveLength(2); expect(captured[1]?.cwd).toBeUndefined(); }); + + it("forwards an injected now into the RunTurnInput passed to runTurn", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captured, captureRunTurn } = createCapturingRunTurn(); + const fakeNow = () => 42; + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn: captureRunTurn, + now: fakeNow, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-now", + text: "hi", + onEvent: () => {}, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]?.now).toBe(fakeNow); + expect(captured[0]?.now?.()).toBe(42); + }); + + it("omits now from RunTurnInput when deps.now is not provided", async () => { + const store = createInMemoryStore(); + const provider: ProviderContract = { id: "p", stream: async function* () {} }; + const { captured, captureRunTurn } = createCapturingRunTurn(); + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn: captureRunTurn, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-no-now", + text: "hi", + onEvent: () => {}, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]?.now).toBeUndefined(); + }); }); describe("turn-sealed event", () => { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 311b620..04f6ad2 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -39,6 +39,8 @@ export interface SessionOrchestratorDeps { readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>; /** Base logger (auto-scoped to this extension); childed per turn for span capture. */ readonly logger?: Logger; + /** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */ + readonly now?: () => number; } export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator { @@ -86,6 +88,7 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio ...(turnLogger !== undefined ? { logger: turnLogger } : {}), ...(signal !== undefined ? { signal } : {}), ...(cwd !== undefined ? { cwd } : {}), + ...(deps.now !== undefined ? { now: deps.now } : {}), }; const result = await deps.runTurn(opts); diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 4e5f382..a2711af 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.2.0", + "version": "0.3.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/package.json b/packages/wire/package.json index 6098703..762c06e 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.2.0", + "version": "0.3.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 90213b4..a4790de 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -168,6 +168,7 @@ export type AgentEvent = | TurnToolResultEvent | TurnToolOutputEvent | TurnUsageEvent + | TurnStepCompleteEvent | TurnErrorEvent | TurnDoneEvent | TurnSealedEvent; @@ -236,6 +237,12 @@ export interface TurnToolResultEvent { readonly toolName: string; readonly content: string; readonly isError: boolean; + /** + * How long the tool took to execute (dispatch → result), in milliseconds — + * the backend's authoritative execution time, distinct from any client-side + * wall-clock. Optional: present only when the runtime was given a clock. + */ + readonly durationMs?: number; } /** Streaming output from a tool execution (e.g. shell stdout/stderr). */ @@ -253,9 +260,43 @@ export interface TurnUsageEvent { readonly type: "usage"; readonly conversationId: string; readonly turnId: string; + /** + * The step this usage report belongs to, so a consumer can attribute tokens + * per step (and join with the matching `step-complete` timing by `stepId`). + * Optional: absent when the runtime had no step context, and on usage emitted + * before this field existed. + */ + readonly stepId?: StepId; readonly usage: Usage; } +/** + * A step (one LLM round-trip) has completed — the authoritative per-step metrics + * packet, emitted once at the step's end (after the generation stream finishes), + * so its timing is final (unlike `usage`, which may arrive mid-stream). Carries + * the step's generation timing; join to the step's tokens via `stepId` on the + * `usage` event. All timing fields are optional: present only when the runtime + * was given a clock, and `ttftMs`/`decodeMs` additionally require that a first + * content token (text or reasoning) was observed this step. + */ +export interface TurnStepCompleteEvent { + readonly type: "step-complete"; + readonly conversationId: string; + readonly turnId: string; + readonly stepId: StepId; + /** Time to first token: stream start → first text/reasoning delta. */ + readonly ttftMs?: number; + /** Decode time: first token → stream end (generation total − TTFT). */ + readonly decodeMs?: number; + /** + * Total generation time for the step: stream start → stream end. Present + * whenever a clock was available, even if no first token was seen (in which + * case `ttftMs`/`decodeMs` are absent). When a first token was seen, + * `genTotalMs === ttftMs + decodeMs`. + */ + readonly genTotalMs?: number; +} + /** An error occurred during the turn. */ export interface TurnErrorEvent { readonly type: "error"; @@ -271,6 +312,17 @@ export interface TurnDoneEvent { readonly conversationId: string; readonly turnId: string; readonly reason: string; + /** + * Total wall-clock duration of the turn (turn start → turn end), in + * milliseconds. Optional: present only when the runtime was given a clock. + */ + readonly durationMs?: number; + /** + * Aggregate token usage across all steps in the turn — a convenience total so + * a consumer need not sum the per-step `usage` events. Optional (absent if the + * provider reported no usage). + */ + readonly usage?: Usage; } /** @@ -580,6 +580,53 @@ measured** (provider-agnostic, self-consistent); **first token = first text OR r spans with valid `durationMs` (ttft 1090ms, decode 1673ms) + `firstToken:true`. GLOSSARY: TTFT, decode time. NOT on the wire (clients don't receive it) — a future wire+FE step if desired. +### Expose backend metrics to clients (timing/tokens) — IN PROGRESS +User ask: surface the backend's authoritative metrics (tokens, TTFT/decode, TPS, tool-exec + +turn durations) to clients (CLI/web FE). Decisions (user, §5.2): +- **Delivery = inline in the existing chat stream**, as distinct `AgentEvent` types/fields — + push-everything, FE routes by `type` and ignores what it doesn't render (NOT a separate live + endpoint, NOT a negotiation handshake — events are tiny/low-frequency, not per-token). +- **Carrier = a new `step-complete` event** (per-step end, ordering-safe timing) + additive fields. +- **Scope = LIVE now**; persisted-for-replay is a documented fast-follow (below). + +#### Pass 1 — LIVE stream metrics [~] IN PROGRESS +Metric set (all wire fields additive/optional): per-step tokens (`usage` += `stepId`), per-step +`ttftMs`/`decodeMs`/`genTotalMs` (new `step-complete` event), tool-exec `durationMs` (`tool-result`), +turn `durationMs` + aggregate `usage` (`done`). TPS derived FE-side (`outputTokens/decodeMs`); +context-size proxy = `usage.inputTokens` (already present). +- [x] **Contracts (orchestrator):** `@dispatch/wire` — new `TurnStepCompleteEvent` (+ union); + `stepId?` on `TurnUsageEvent`; `durationMs?` on `TurnToolResultEvent`; `durationMs?`+`usage?` on + `TurnDoneEvent`. Kernel re-export shims (events.ts/index.ts) updated. `RunTurnInput.now?: () => + number` (additive optional clock — runtime has no clock today; needed to put real numbers on the + wire). **Whole-graph typecheck clean** → new variant breaks NO consumer (no exhaustive switches; + cli/transport unaffected). GLOSSARY: TTFT/decode already added. +- [ ] **Build wave (2 disjoint owner-agents, parallel, mimo-v2.5-pro):** + - **kernel-runtime:** when `now` provided, measure + emit timing — `step-complete` per step + (ttft/decode/genTotal, reusing the just-built first-token detection), `stepId` on `usage`, + `durationMs` on `tool-result`, `durationMs`+`usage` on `done`. Keep the trace spans (may unify + span timing with the numeric measurement). Omit timing gracefully when `now` absent. +tests. + - **session-orchestrator:** add `now?` to `SessionOrchestratorDeps`, thread into the + `RunTurnInput` it builds; provide `() => Date.now()` from the extension's `activate` (shell + edge). +test asserting forwarding. + - **NOT touched (typecheck-confirmed):** transport-http/ws (verbatim pass-through), cli (optional + fields, non-exhaustive switch), host-bin (orchestrator self-injects the clock in activate). +- [ ] **Post-wave (orchestrator):** full typecheck/test/biome; bump `wire`+`transport-contract` + minor (`0.2.0→0.3.0`); regen FE `.dispatch/{wire,transport-contract}.reference.md`; courier reply. + Live: confirm `step-complete`/durations/`usage.stepId` on the real stream. + +#### Pass 2 — DEFERRED: persisted metrics for REPLAY +Today usage/timing are NOT persisted (conversation-store stores chunks only), so reopening a past +conversation (`GET /conversations/:id`) shows messages but no metrics. To show historical +tokens/timing: +- Persist a per-turn (and/or per-step) metrics record in `conversation-store` (new storage shape — + metrics are not chunks; e.g. a `conv:<id>:turn:<turnId>:metrics` key or a sibling namespace). +- Expose on the read path: a `metrics` field on `ConversationHistoryResponse`, or a sibling + `GET /conversations/:id/metrics` (read endpoint = the right home for historical metrics, vs the + live stream). New contract + transport-http route + conversation-store schema + FE. +- The trace-store already holds rich per-span timing durably; Pass 2 could alternatively expose a + read slice of THAT rather than duplicating into conversation-store (decide at Pass-2 design time). +Sequenced after Pass 1 lands (and can ride alongside the FE-Slice work that consumes Pass 1). + ### 3. dedup / storage growth (after frontend) The deferred trace-body de-duplication + rotation/compression (D5 volume-control + `prefix.fingerprint` + §6 retention strategy) — already designed in |
