summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-07 17:46:53 +0900
committerAdam Malczewski <[email protected]>2026-06-07 17:46:53 +0900
commit7c459c7d919d1e08a228e8abc56129be174d8abe (patch)
tree93011125c001945723ac9b9358c4ddd450f87f72
parent5746cf4e545cd5b0d7faf0595554f273f236f3a9 (diff)
downloaddispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.tar.gz
dispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.zip
feat(wire,kernel,session-orchestrator): live turn metrics on the stream
Expose the backend's authoritative token+timing metrics on the live AgentEvent stream (observability-only -> now also client-facing). All additive/optional. - [email protected]: new TurnStepCompleteEvent (type:step-complete) with per-step ttftMs/decodeMs/genTotalMs; usage += stepId; tool-result += durationMs (exec); done += durationMs (turn wall-clock) + usage (turn total). RunTurnInput += now?. [email protected] (re-export bump). - kernel-runtime: when now injected, measures + emits the above (reuses the ttft/decode first-token detection); omits timing gracefully without a clock. - session-orchestrator: adds now? to deps, threads into RunTurnInput; extension activate injects () => Date.now(). - transport/cli/host-bin: untouched (verbatim pass-through; additive fields). FE handoff: frontend-metrics-handoff.md. typecheck clean; 520 vitest + 89 bun; biome 0/0. Replay/persistence = deferred Pass 2 (documented in tasks.md).
-rw-r--r--frontend-metrics-handoff.md121
-rw-r--r--packages/kernel/src/contracts/events.ts1
-rw-r--r--packages/kernel/src/contracts/index.ts1
-rw-r--r--packages/kernel/src/contracts/runtime.ts10
-rw-r--r--packages/kernel/src/runtime/events.ts80
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts330
-rw-r--r--packages/kernel/src/runtime/run-turn.ts65
-rw-r--r--packages/session-orchestrator/src/extension.ts1
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts47
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts3
-rw-r--r--packages/transport-contract/package.json2
-rw-r--r--packages/wire/package.json2
-rw-r--r--packages/wire/src/index.ts52
-rw-r--r--tasks.md47
14 files changed, 753 insertions, 9 deletions
diff --git a/frontend-metrics-handoff.md b/frontend-metrics-handoff.md
new file mode 100644
index 0000000..8da45ad
--- /dev/null
+++ b/frontend-metrics-handoff.md
@@ -0,0 +1,121 @@
+# Frontend handoff — live turn metrics (tokens + timing)
+
+> From: arch-rewrite (backend) orchestrator · For: the dispatch-web FE team.
+> Status: **LIVE on the stream now** (backend committed + live-verified). Consume via the pinned
+> contracts `@dispatch/[email protected]` + `@dispatch/[email protected]` (reference snapshots
+> regenerated in `dispatch-web/.dispatch/{wire,transport-contract}.reference.md`).
+
+## 1. What you can now access
+The backend's **authoritative** token + timing metrics are now on the live turn stream:
+
+| Metric | Where | Field(s) |
+|---|---|---|
+| Per-step tokens | `usage` event | `usage` (`inputTokens`/`outputTokens`/`cacheReadTokens?`/`cacheWriteTokens?`) + new `stepId?` |
+| Per-step **TTFT** | new `step-complete` event | `ttftMs?` |
+| Per-step **decode** time | new `step-complete` event | `decodeMs?` |
+| Per-step total generation | new `step-complete` event | `genTotalMs?` |
+| **Tool execution** time | `tool-result` event | `durationMs?` |
+| **Turn** wall-clock | `done` event | `durationMs?` |
+| **Turn** total tokens | `done` event | `usage?` |
+| **Tokens/sec** (TPS) | derive | `usage.outputTokens / (step-complete.decodeMs / 1000)` |
+| Context-size proxy | `usage` event | `usage.inputTokens` (size the model counted; `cacheReadTokens` = cached portion) |
+
+"Authoritative" = measured by the backend runtime, not client wall-clock. They differ from
+anything you'd time in the browser (no network/buffering in them).
+
+## 2. How they're delivered
+**Inline, in the same chat stream you already consume** — WS `chat.delta` frames (and the
+`POST /chat` NDJSON stream) carry the `AgentEvent` union; metrics are additional event types /
+fields in that union. **No new endpoint, no subscription/negotiation.** You already `switch` on
+`event.type`; route the metric events to a telemetry handler and ignore any you don't render
+(zero cost). They do **not** appear in message content — keep your transcript rendering as-is.
+
+These events are **low-frequency** (one `step-complete` per step, one `done` per turn, a
+`durationMs` per tool result) — not per-token — so there's no stream-volume concern.
+
+## 3. The new/changed events (shapes)
+All new fields are **optional** — see §5. Every event still carries `conversationId` + `turnId`.
+
+```ts
+// NEW variant in AgentEvent — emitted once per step, AT STEP END (timing is final here)
+interface TurnStepCompleteEvent {
+ type: "step-complete";
+ conversationId: string;
+ turnId: string;
+ stepId: StepId; // join key to the step's `usage` event + tool events
+ ttftMs?: number; // time to first token (stream start → first text|reasoning delta)
+ decodeMs?: number; // first token → stream end (== genTotalMs - ttftMs)
+ genTotalMs?: number; // whole-step generation (present even if no first token was seen)
+}
+
+// usage event — now labeled by step
+interface TurnUsageEvent {
+ type: "usage";
+ conversationId: string; turnId: string;
+ stepId?: StepId; // NEW — attribute tokens to a step / join to step-complete
+ usage: Usage; // { inputTokens, outputTokens, cacheReadTokens?, cacheWriteTokens? }
+}
+
+// tool-result — now carries execution time
+interface TurnToolResultEvent {
+ type: "tool-result";
+ conversationId: string; turnId: string;
+ stepId: StepId; toolCallId: string; toolName: string;
+ content: string; isError: boolean;
+ durationMs?: number; // NEW — tool execution time (dispatch → result)
+}
+
+// done — now carries turn totals
+interface TurnDoneEvent {
+ type: "done";
+ conversationId: string; turnId: string;
+ reason: string;
+ durationMs?: number; // NEW — whole-turn wall-clock
+ usage?: Usage; // NEW — aggregate turn tokens (so you needn't sum the usage events)
+}
+```
+
+## 4. Correlation & derived metrics
+Keys: `turnId` groups a turn; `stepId` groups a step within it; `toolCallId` pairs a tool call
+with its result. A turn has **one `step-complete` (and usually one `usage`) per step**.
+
+- **Per-step TPS** = `usage.outputTokens / (step-complete.decodeMs / 1000)` — join `usage` and
+ `step-complete` by `stepId`. (Use `decodeMs`, not `genTotalMs`, for decode-rate TPS; it excludes
+ first-token latency. See "which TPS" caveat below.)
+- **Turn TPS** = `done.usage.outputTokens / (Σ step-complete.decodeMs / 1000)`.
+- **Generation total per step** = `genTotalMs` (or `ttftMs + decodeMs`).
+- **Turn-visible first-token latency** = the `ttftMs` of **step 0** (the first `step-complete`).
+- **Total prefill overhead** = `Σ ttftMs` across steps; **pure generation** = `Σ decodeMs`.
+- **Tool time** = `tool-result.durationMs` per call; sum per `stepId` for a batch.
+
+"Which TPS": `decodeMs` is first-token → end, so TPS over it is the decode rate (first-token
+latency removed). If you want end-to-end rate including the wait, use `ttftMs + decodeMs`.
+
+## 5. Optionality — you MUST tolerate absence
+- `step-complete` is always emitted per step, but its **timing fields are present only when the
+ server runs with a clock** (it does in normal operation). `ttftMs`/`decodeMs` are additionally
+ absent for a step that produced **no text/reasoning token** (e.g. a tool-call-only step) —
+ `genTotalMs` is still present in that case.
+- `usage.stepId`, `tool-result.durationMs`, `done.durationMs`, `done.usage` are all optional.
+- Render gracefully when a value is missing (omit the figure; don't show `NaN`/`undefined`).
+
+## 6. What is NOT available yet (deferred — Pass 2)
+**Metrics are LIVE-ONLY.** They are **not persisted**, so:
+- `GET /conversations/:id` (history) returns messages/chunks but **no tokens/timing**. Reopening a
+ past conversation will show content without metrics.
+- If you need historical metrics (e.g. show TPS on a reloaded conversation), that's the planned
+ **Pass 2** (persist per-turn metrics + a read path) — see `tasks.md` "Pass 2 — DEFERRED". Tell
+ us if you need it and we'll prioritize.
+- TPS is not sent pre-computed (derive it, §4). No per-token timing (metrics are per-step/per-turn).
+
+## 7. Integration checklist
+1. Refresh deps: `bun run typecheck` in dispatch-web (picks up `[email protected]` / `[email protected]`).
+2. Extend your `chat.delta` event handler: add a `case "step-complete"` and read the new optional
+ fields on `usage`/`tool-result`/`done`. (No exhaustive-switch break — these are additive.)
+3. Keep a per-turn (and per-step, keyed by `stepId`) telemetry accumulator alongside the transcript
+ store; fold metric events into it; render where you want (e.g. a turn footer / per-step badges).
+4. Treat every metric field as optional (§5).
+
+## 8. Carrier facts (unchanged)
+HTTP 24203 (`POST /chat` NDJSON, `GET /conversations/:id`, `GET /models`), WS 24205 (one socket,
+`chat.delta` carries each `AgentEvent`), CORS `*`. Same events on both carriers.
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts
index 8737b02..be09066 100644
--- a/packages/kernel/src/contracts/events.ts
+++ b/packages/kernel/src/contracts/events.ts
@@ -13,6 +13,7 @@ export type {
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
+ TurnStepCompleteEvent,
TurnTextDeltaEvent,
TurnToolCallEvent,
TurnToolOutputEvent,
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index 1698486..38f1442 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -35,6 +35,7 @@ export type {
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
+ TurnStepCompleteEvent,
TurnTextDeltaEvent,
TurnToolCallEvent,
TurnToolOutputEvent,
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts
index 8917709..b7fe23c 100644
--- a/packages/kernel/src/contracts/runtime.ts
+++ b/packages/kernel/src/contracts/runtime.ts
@@ -90,6 +90,16 @@ export interface RunTurnInput {
* emitted (backward-compatible with callers that don't yet pass a logger).
*/
readonly logger?: Logger;
+
+ /**
+ * Optional monotonic-ish clock (milliseconds) for emitting wall-clock timing
+ * on outward events: per-step `step-complete` (ttft/decode/genTotal), tool
+ * execution `durationMs` on `tool-result`, and turn `durationMs` on `done`.
+ * Injected (not ambient) so the runtime stays pure and deterministic in tests.
+ * If omitted, the runtime emits no such timing (the optional fields stay
+ * absent) — backward-compatible with callers that don't provide a clock.
+ */
+ readonly now?: () => number;
}
/**
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts
index deeb012..300e711 100644
--- a/packages/kernel/src/runtime/events.ts
+++ b/packages/kernel/src/runtime/events.ts
@@ -33,9 +33,10 @@ export function toolResultEvent(
toolName: string,
content: string,
isError: boolean,
+ durationMs?: number,
): AgentEvent {
- return {
- type: "tool-result",
+ const base = {
+ type: "tool-result" as const,
conversationId,
turnId,
stepId,
@@ -44,6 +45,10 @@ export function toolResultEvent(
content,
isError,
};
+ if (durationMs !== undefined) {
+ return { ...base, durationMs };
+ }
+ return base;
}
export function toolOutputEvent(
@@ -56,7 +61,15 @@ export function toolOutputEvent(
return { type: "tool-output", conversationId, turnId, toolCallId, data, stream };
}
-export function usageEvent(conversationId: string, turnId: string, usage: Usage): AgentEvent {
+export function usageEvent(
+ conversationId: string,
+ turnId: string,
+ usage: Usage,
+ stepId?: StepId,
+): AgentEvent {
+ if (stepId !== undefined) {
+ return { type: "usage", conversationId, turnId, usage, stepId };
+ }
return { type: "usage", conversationId, turnId, usage };
}
@@ -64,7 +77,66 @@ export function turnStartEvent(conversationId: string, turnId: string): AgentEve
return { type: "turn-start", conversationId, turnId };
}
-export function doneEvent(conversationId: string, turnId: string, reason: string): AgentEvent {
+export function stepCompleteEvent(
+ conversationId: string,
+ turnId: string,
+ stepId: StepId,
+ timing?: { ttftMs?: number; decodeMs?: number; genTotalMs?: number },
+): AgentEvent {
+ if (timing !== undefined) {
+ if (timing.ttftMs !== undefined) {
+ if (timing.decodeMs !== undefined && timing.genTotalMs !== undefined) {
+ return {
+ type: "step-complete",
+ conversationId,
+ turnId,
+ stepId,
+ ttftMs: timing.ttftMs,
+ decodeMs: timing.decodeMs,
+ genTotalMs: timing.genTotalMs,
+ };
+ }
+ if (timing.genTotalMs !== undefined) {
+ return {
+ type: "step-complete",
+ conversationId,
+ turnId,
+ stepId,
+ ttftMs: timing.ttftMs,
+ genTotalMs: timing.genTotalMs,
+ };
+ }
+ return { type: "step-complete", conversationId, turnId, stepId, ttftMs: timing.ttftMs };
+ }
+ if (timing.genTotalMs !== undefined) {
+ return {
+ type: "step-complete",
+ conversationId,
+ turnId,
+ stepId,
+ genTotalMs: timing.genTotalMs,
+ };
+ }
+ }
+ return { type: "step-complete", conversationId, turnId, stepId };
+}
+
+export function doneEvent(
+ conversationId: string,
+ turnId: string,
+ reason: string,
+ durationMs?: number,
+ usage?: Usage,
+): AgentEvent {
+ if (durationMs !== undefined && usage !== undefined) {
+ return { type: "done", conversationId, turnId, reason, durationMs, usage };
+ }
+ if (durationMs !== undefined) {
+ return { type: "done", conversationId, turnId, reason, durationMs };
+ }
+ if (usage !== undefined) {
+ return { type: "done", conversationId, turnId, reason, usage };
+ }
return { type: "done", conversationId, turnId, reason };
}
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 089f65b..ce654d5 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -125,6 +125,7 @@ describe("runTurn", () => {
"text-delta",
"reasoning-delta",
"usage",
+ "step-complete",
"done",
]);
});
@@ -1970,4 +1971,333 @@ describe("runTurn", () => {
}
});
});
+
+ describe("timing events (now provided)", () => {
+ function createCounterNow(): { now: () => number; tick: (ms: number) => void } {
+ let current = 0;
+ return {
+ now: () => current,
+ tick: (ms: number) => {
+ current += ms;
+ },
+ };
+ }
+
+ it("emits step-complete per step with timing when now provided", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const { events, emit } = createCollectingEmit();
+
+ // Advance clock during stream: first token at +50ms, stream ends at +200ms
+ let streamCallCount = 0;
+ const wrappedProvider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools) {
+ const idx = streamCallCount++;
+ return (async function* () {
+ if (idx === 0) {
+ clock.tick(50); // stream starts
+ yield { type: "text-delta", delta: "Hello" } as ProviderEvent;
+ // first token seen at 150 (100+50)
+ clock.tick(100);
+ yield { type: "text-delta", delta: " world" } as ProviderEvent;
+ clock.tick(50);
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ await runTurn({
+ provider: wrappedProvider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const stepCompleteEvts = events.filter((e) => e.type === "step-complete");
+ expect(stepCompleteEvts).toHaveLength(1);
+
+ const sc = stepCompleteEvts[0];
+ if (sc?.type === "step-complete") {
+ expect(sc.conversationId).toBe("conv-1");
+ expect(sc.turnId).toBe("turn-1");
+ expect(sc.stepId).toBeDefined();
+ expect(sc.genTotalMs).toBe(200); // 50+100+50
+ expect(sc.ttftMs).toBe(50); // stream start → first text-delta
+ expect(sc.decodeMs).toBe(150); // first token → stream end
+ const ttft = sc.ttftMs;
+ const decode = sc.decodeMs;
+ const genTotal = sc.genTotalMs;
+ if (ttft !== undefined && decode !== undefined && genTotal !== undefined) {
+ expect(genTotal).toBe(ttft + decode);
+ }
+ }
+ });
+
+ it("step-complete omits ttft/decode but keeps genTotalMs for a no-content step", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ let streamCallCount = 0;
+ const wrappedProvider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools) {
+ const idx = streamCallCount++;
+ return (async function* () {
+ if (idx === 0) {
+ clock.tick(80); // stream starts at 180
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "echo",
+ input: {},
+ } as ProviderEvent;
+ clock.tick(20);
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ } else {
+ clock.tick(50);
+ yield { type: "text-delta", delta: "done" } as ProviderEvent;
+ clock.tick(50);
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider: wrappedProvider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const stepCompleteEvts = events.filter((e) => e.type === "step-complete");
+ expect(stepCompleteEvts).toHaveLength(2);
+
+ // First step: tool-call-only, no content token
+ const sc0 = stepCompleteEvts[0];
+ if (sc0?.type === "step-complete") {
+ expect(sc0.stepId).toBeDefined();
+ expect(sc0.genTotalMs).toBe(100); // 80+20
+ expect(sc0.ttftMs).toBeUndefined();
+ expect(sc0.decodeMs).toBeUndefined();
+ }
+
+ // Second step: has text-delta
+ const sc1 = stepCompleteEvts[1];
+ if (sc1?.type === "step-complete") {
+ expect(sc1.stepId).toBeDefined();
+ expect(sc1.genTotalMs).toBe(100); // 50+50
+ expect(sc1.ttftMs).toBe(50);
+ expect(sc1.decodeMs).toBe(50);
+ }
+ });
+
+ it("usage event carries stepId", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ const usageEvts = events.filter((e) => e.type === "usage");
+ expect(usageEvts).toHaveLength(1);
+ const ue = usageEvts[0];
+ if (ue?.type === "usage") {
+ expect(ue.stepId).toBeDefined();
+ }
+ });
+
+ it("tool-result carries durationMs (execution time) when now provided", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const tool = createFakeTool("slow", async () => {
+ clock.tick(200); // tool takes 200ms to execute
+ return { content: "done" };
+ });
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "slow", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "ok" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const toolResultEvts = events.filter((e) => e.type === "tool-result");
+ expect(toolResultEvts).toHaveLength(1);
+ const tr = toolResultEvts[0];
+ if (tr?.type === "tool-result") {
+ expect(tr.durationMs).toBeDefined();
+ expect(tr.durationMs).toBe(200);
+ }
+ });
+
+ it("done carries durationMs and aggregate usage when now provided", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const wrappedProvider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools) {
+ return (async function* () {
+ clock.tick(80); // stream duration
+ yield { type: "text-delta", delta: "hi" } as ProviderEvent;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider: wrappedProvider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const doneEvts = events.filter((e) => e.type === "done");
+ expect(doneEvts).toHaveLength(1);
+ const d = doneEvts[0];
+ if (d?.type === "done") {
+ expect(d.durationMs).toBeDefined();
+ expect(d.durationMs).toBeGreaterThan(0);
+ expect(d.usage).toBeDefined();
+ if (d.usage !== undefined) {
+ expect(d.usage.inputTokens).toBe(10);
+ expect(d.usage.outputTokens).toBe(5);
+ }
+ }
+ });
+
+ it("no now → timing fields absent", async () => {
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ // no now
+ });
+
+ // step-complete still emitted (with stepId, no timing)
+ const stepCompleteEvts = events.filter((e) => e.type === "step-complete");
+ expect(stepCompleteEvts).toHaveLength(2);
+ for (const sc of stepCompleteEvts) {
+ if (sc?.type === "step-complete") {
+ expect(sc.stepId).toBeDefined();
+ expect(sc.ttftMs).toBeUndefined();
+ expect(sc.decodeMs).toBeUndefined();
+ expect(sc.genTotalMs).toBeUndefined();
+ }
+ }
+
+ // usage still carries stepId
+ const usageEvts = events.filter((e) => e.type === "usage");
+ for (const ue of usageEvts) {
+ if (ue?.type === "usage") {
+ expect(ue.stepId).toBeDefined();
+ }
+ }
+
+ // no durationMs on tool-result
+ const toolResultEvts = events.filter((e) => e.type === "tool-result");
+ for (const tr of toolResultEvts) {
+ if (tr?.type === "tool-result") {
+ expect(tr.durationMs).toBeUndefined();
+ }
+ }
+
+ // no durationMs on done, but usage is present (independent of now)
+ const doneEvts = events.filter((e) => e.type === "done");
+ expect(doneEvts).toHaveLength(1);
+ const d = doneEvts[0];
+ if (d?.type === "done") {
+ expect(d.durationMs).toBeUndefined();
+ expect(d.usage).toBeDefined();
+ if (d.usage !== undefined) {
+ expect(d.usage.inputTokens).toBe(15);
+ expect(d.usage.outputTokens).toBe(8);
+ }
+ }
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index a8ee6c9..06069a2 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -8,6 +8,7 @@ import {
doneEvent,
errorEvent,
reasoningDeltaEvent,
+ stepCompleteEvent,
textDeltaEvent,
toolCallEvent,
toolResultEvent,
@@ -84,12 +85,15 @@ interface StepContext {
readonly turnSpan: Span | undefined;
readonly toolSpans: Map<string, Span>;
readonly cwd: string | undefined;
+ readonly now: (() => number) | undefined;
}
interface TimingState {
ttftSpan: Span | undefined;
decodeSpan: Span | undefined;
firstTokenSeen: boolean;
+ streamStartMs: number | undefined;
+ firstTokenMs: number | undefined;
}
interface StepResult {
@@ -108,11 +112,15 @@ function processEvent(
ctx: StepContext,
stepSpan: Span | undefined,
timing: TimingState,
+ toolDispatchTimes: Map<string, number>,
): void {
switch (event.type) {
case "text-delta":
if (!timing.firstTokenSeen) {
timing.firstTokenSeen = true;
+ if (ctx.now !== undefined) {
+ timing.firstTokenMs = ctx.now();
+ }
try {
timing.ttftSpan?.end({ attrs: { firstToken: true } });
} catch {
@@ -131,6 +139,9 @@ function processEvent(
case "reasoning-delta":
if (!timing.firstTokenSeen) {
timing.firstTokenSeen = true;
+ if (ctx.now !== undefined) {
+ timing.firstTokenMs = ctx.now();
+ }
try {
timing.ttftSpan?.end({ attrs: { firstToken: true } });
} catch {
@@ -171,6 +182,11 @@ function processEvent(
),
);
+ // Capture dispatch time for tool-call durationMs
+ if (ctx.now !== undefined) {
+ toolDispatchTimes.set(event.toolCallId, ctx.now());
+ }
+
// Open a tool-call span as a child of the step span (attrs: name, toolCallId)
try {
const tcSpan =
@@ -194,7 +210,7 @@ function processEvent(
break;
}
case "usage":
- ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage));
+ ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage, ctx.stepId));
break;
case "finish":
break;
@@ -212,6 +228,7 @@ function processEvent(
async function executeStep(ctx: StepContext): Promise<StepResult> {
const chunks: Chunk[] = [];
const toolCalls: ToolCall[] = [];
+ const toolDispatchTimes = new Map<string, number>();
let stepUsage = zeroUsage();
let finishReason = "stop";
@@ -250,6 +267,8 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
ttftSpan: undefined,
decodeSpan: undefined,
firstTokenSeen: false,
+ streamStartMs: ctx.now !== undefined ? ctx.now() : undefined,
+ firstTokenMs: undefined,
};
// Open TTFT span when spans are enabled
@@ -268,7 +287,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
for await (const event of stream) {
if (ctx.signal.aborted) break;
- processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing);
+ processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes);
if (event.type === "usage") {
stepUsage = addUsage(stepUsage, event.usage);
}
@@ -305,6 +324,22 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
// Swallow — D7.
}
+ // Emit step-complete event with timing
+ const streamEndMs = ctx.now !== undefined ? ctx.now() : undefined;
+ if (timing.streamStartMs !== undefined && streamEndMs !== undefined) {
+ const genTotalMs = streamEndMs - timing.streamStartMs;
+ const stepTiming: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = {
+ genTotalMs,
+ };
+ if (timing.firstTokenMs !== undefined) {
+ stepTiming.ttftMs = timing.firstTokenMs - timing.streamStartMs;
+ stepTiming.decodeMs = streamEndMs - timing.firstTokenMs;
+ }
+ ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId, stepTiming));
+ } else {
+ ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId));
+ }
+
if (!ctx.dispatch.eager) {
for (const call of toolCalls) {
dispatcher.submit(call);
@@ -337,6 +372,9 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const result = results.get(call.id);
if (result !== undefined) {
const isError = result.isError ?? false;
+ const dispatchTime = toolDispatchTimes.get(call.id);
+ const toolDurationMs =
+ ctx.now !== undefined && dispatchTime !== undefined ? ctx.now() - dispatchTime : undefined;
ctx.emit(
toolResultEvent(
ctx.conversationId,
@@ -346,6 +384,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
call.name,
result.content,
isError,
+ toolDurationMs,
),
);
toolMessages.push({
@@ -400,6 +439,10 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
const turnId = input.turnId;
const signal = input.signal ?? new AbortController().signal;
const logger = input.logger;
+ const now = input.now;
+
+ // Record turn start time for durationMs on done
+ const turnStartMs = now !== undefined ? now() : undefined;
// Open a turn span (attrs: conversationId, turnId, model)
let turnSpan: Span | undefined;
@@ -444,6 +487,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
turnSpan,
toolSpans,
cwd: input.cwd,
+ now,
});
totalUsage = addUsage(totalUsage, stepResult.usage);
@@ -499,7 +543,22 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
}
}
- input.emit(doneEvent(conversationId, turnId, finishReason));
+ const turnDurationMs =
+ turnStartMs !== undefined && now !== undefined ? now() - turnStartMs : undefined;
+ const hasUsage =
+ totalUsage.inputTokens > 0 ||
+ totalUsage.outputTokens > 0 ||
+ totalUsage.cacheReadTokens !== undefined ||
+ totalUsage.cacheWriteTokens !== undefined;
+ input.emit(
+ doneEvent(
+ conversationId,
+ turnId,
+ finishReason,
+ turnDurationMs,
+ hasUsage ? totalUsage : undefined,
+ ),
+ );
return { messages: resultMessages, usage: totalUsage, finishReason };
}
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index af7d6e0..fbb7c15 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -38,6 +38,7 @@ export function activate(host: HostAPI): void {
},
runTurn,
logger: host.logger,
+ now: () => Date.now(),
});
host.provideService(sessionOrchestratorHandle, orchestrator);
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index b648d42..3954ffe 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -363,6 +363,53 @@ describe("handleMessage model resolution", () => {
expect(captured).toHaveLength(2);
expect(captured[1]?.cwd).toBeUndefined();
});
+
+ it("forwards an injected now into the RunTurnInput passed to runTurn", async () => {
+ const store = createInMemoryStore();
+ const provider: ProviderContract = { id: "p", stream: async function* () {} };
+ const { captured, captureRunTurn } = createCapturingRunTurn();
+ const fakeNow = () => 42;
+
+ const orchestrator = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ runTurn: captureRunTurn,
+ now: fakeNow,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-now",
+ text: "hi",
+ onEvent: () => {},
+ });
+
+ expect(captured).toHaveLength(1);
+ expect(captured[0]?.now).toBe(fakeNow);
+ expect(captured[0]?.now?.()).toBe(42);
+ });
+
+ it("omits now from RunTurnInput when deps.now is not provided", async () => {
+ const store = createInMemoryStore();
+ const provider: ProviderContract = { id: "p", stream: async function* () {} };
+ const { captured, captureRunTurn } = createCapturingRunTurn();
+
+ const orchestrator = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ runTurn: captureRunTurn,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-no-now",
+ text: "hi",
+ onEvent: () => {},
+ });
+
+ expect(captured).toHaveLength(1);
+ expect(captured[0]?.now).toBeUndefined();
+ });
});
describe("turn-sealed event", () => {
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 311b620..04f6ad2 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -39,6 +39,8 @@ export interface SessionOrchestratorDeps {
readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>;
/** Base logger (auto-scoped to this extension); childed per turn for span capture. */
readonly logger?: Logger;
+ /** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */
+ readonly now?: () => number;
}
export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator {
@@ -86,6 +88,7 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio
...(turnLogger !== undefined ? { logger: turnLogger } : {}),
...(signal !== undefined ? { signal } : {}),
...(cwd !== undefined ? { cwd } : {}),
+ ...(deps.now !== undefined ? { now: deps.now } : {}),
};
const result = await deps.runTurn(opts);
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index 4e5f382..a2711af 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/transport-contract",
- "version": "0.2.0",
+ "version": "0.3.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/package.json b/packages/wire/package.json
index 6098703..762c06e 100644
--- a/packages/wire/package.json
+++ b/packages/wire/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/wire",
- "version": "0.2.0",
+ "version": "0.3.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 90213b4..a4790de 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -168,6 +168,7 @@ export type AgentEvent =
| TurnToolResultEvent
| TurnToolOutputEvent
| TurnUsageEvent
+ | TurnStepCompleteEvent
| TurnErrorEvent
| TurnDoneEvent
| TurnSealedEvent;
@@ -236,6 +237,12 @@ export interface TurnToolResultEvent {
readonly toolName: string;
readonly content: string;
readonly isError: boolean;
+ /**
+ * How long the tool took to execute (dispatch → result), in milliseconds —
+ * the backend's authoritative execution time, distinct from any client-side
+ * wall-clock. Optional: present only when the runtime was given a clock.
+ */
+ readonly durationMs?: number;
}
/** Streaming output from a tool execution (e.g. shell stdout/stderr). */
@@ -253,9 +260,43 @@ export interface TurnUsageEvent {
readonly type: "usage";
readonly conversationId: string;
readonly turnId: string;
+ /**
+ * The step this usage report belongs to, so a consumer can attribute tokens
+ * per step (and join with the matching `step-complete` timing by `stepId`).
+ * Optional: absent when the runtime had no step context, and on usage emitted
+ * before this field existed.
+ */
+ readonly stepId?: StepId;
readonly usage: Usage;
}
+/**
+ * A step (one LLM round-trip) has completed — the authoritative per-step metrics
+ * packet, emitted once at the step's end (after the generation stream finishes),
+ * so its timing is final (unlike `usage`, which may arrive mid-stream). Carries
+ * the step's generation timing; join to the step's tokens via `stepId` on the
+ * `usage` event. All timing fields are optional: present only when the runtime
+ * was given a clock, and `ttftMs`/`decodeMs` additionally require that a first
+ * content token (text or reasoning) was observed this step.
+ */
+export interface TurnStepCompleteEvent {
+ readonly type: "step-complete";
+ readonly conversationId: string;
+ readonly turnId: string;
+ readonly stepId: StepId;
+ /** Time to first token: stream start → first text/reasoning delta. */
+ readonly ttftMs?: number;
+ /** Decode time: first token → stream end (generation total − TTFT). */
+ readonly decodeMs?: number;
+ /**
+ * Total generation time for the step: stream start → stream end. Present
+ * whenever a clock was available, even if no first token was seen (in which
+ * case `ttftMs`/`decodeMs` are absent). When a first token was seen,
+ * `genTotalMs === ttftMs + decodeMs`.
+ */
+ readonly genTotalMs?: number;
+}
+
/** An error occurred during the turn. */
export interface TurnErrorEvent {
readonly type: "error";
@@ -271,6 +312,17 @@ export interface TurnDoneEvent {
readonly conversationId: string;
readonly turnId: string;
readonly reason: string;
+ /**
+ * Total wall-clock duration of the turn (turn start → turn end), in
+ * milliseconds. Optional: present only when the runtime was given a clock.
+ */
+ readonly durationMs?: number;
+ /**
+ * Aggregate token usage across all steps in the turn — a convenience total so
+ * a consumer need not sum the per-step `usage` events. Optional (absent if the
+ * provider reported no usage).
+ */
+ readonly usage?: Usage;
}
/**
diff --git a/tasks.md b/tasks.md
index 89662b9..46849cb 100644
--- a/tasks.md
+++ b/tasks.md
@@ -580,6 +580,53 @@ measured** (provider-agnostic, self-consistent); **first token = first text OR r
spans with valid `durationMs` (ttft 1090ms, decode 1673ms) + `firstToken:true`. GLOSSARY: TTFT,
decode time. NOT on the wire (clients don't receive it) — a future wire+FE step if desired.
+### Expose backend metrics to clients (timing/tokens) — IN PROGRESS
+User ask: surface the backend's authoritative metrics (tokens, TTFT/decode, TPS, tool-exec +
+turn durations) to clients (CLI/web FE). Decisions (user, §5.2):
+- **Delivery = inline in the existing chat stream**, as distinct `AgentEvent` types/fields —
+ push-everything, FE routes by `type` and ignores what it doesn't render (NOT a separate live
+ endpoint, NOT a negotiation handshake — events are tiny/low-frequency, not per-token).
+- **Carrier = a new `step-complete` event** (per-step end, ordering-safe timing) + additive fields.
+- **Scope = LIVE now**; persisted-for-replay is a documented fast-follow (below).
+
+#### Pass 1 — LIVE stream metrics [~] IN PROGRESS
+Metric set (all wire fields additive/optional): per-step tokens (`usage` += `stepId`), per-step
+`ttftMs`/`decodeMs`/`genTotalMs` (new `step-complete` event), tool-exec `durationMs` (`tool-result`),
+turn `durationMs` + aggregate `usage` (`done`). TPS derived FE-side (`outputTokens/decodeMs`);
+context-size proxy = `usage.inputTokens` (already present).
+- [x] **Contracts (orchestrator):** `@dispatch/wire` — new `TurnStepCompleteEvent` (+ union);
+ `stepId?` on `TurnUsageEvent`; `durationMs?` on `TurnToolResultEvent`; `durationMs?`+`usage?` on
+ `TurnDoneEvent`. Kernel re-export shims (events.ts/index.ts) updated. `RunTurnInput.now?: () =>
+ number` (additive optional clock — runtime has no clock today; needed to put real numbers on the
+ wire). **Whole-graph typecheck clean** → new variant breaks NO consumer (no exhaustive switches;
+ cli/transport unaffected). GLOSSARY: TTFT/decode already added.
+- [ ] **Build wave (2 disjoint owner-agents, parallel, mimo-v2.5-pro):**
+ - **kernel-runtime:** when `now` provided, measure + emit timing — `step-complete` per step
+ (ttft/decode/genTotal, reusing the just-built first-token detection), `stepId` on `usage`,
+ `durationMs` on `tool-result`, `durationMs`+`usage` on `done`. Keep the trace spans (may unify
+ span timing with the numeric measurement). Omit timing gracefully when `now` absent. +tests.
+ - **session-orchestrator:** add `now?` to `SessionOrchestratorDeps`, thread into the
+ `RunTurnInput` it builds; provide `() => Date.now()` from the extension's `activate` (shell
+ edge). +test asserting forwarding.
+ - **NOT touched (typecheck-confirmed):** transport-http/ws (verbatim pass-through), cli (optional
+ fields, non-exhaustive switch), host-bin (orchestrator self-injects the clock in activate).
+- [ ] **Post-wave (orchestrator):** full typecheck/test/biome; bump `wire`+`transport-contract`
+ minor (`0.2.0→0.3.0`); regen FE `.dispatch/{wire,transport-contract}.reference.md`; courier reply.
+ Live: confirm `step-complete`/durations/`usage.stepId` on the real stream.
+
+#### Pass 2 — DEFERRED: persisted metrics for REPLAY
+Today usage/timing are NOT persisted (conversation-store stores chunks only), so reopening a past
+conversation (`GET /conversations/:id`) shows messages but no metrics. To show historical
+tokens/timing:
+- Persist a per-turn (and/or per-step) metrics record in `conversation-store` (new storage shape —
+ metrics are not chunks; e.g. a `conv:<id>:turn:<turnId>:metrics` key or a sibling namespace).
+- Expose on the read path: a `metrics` field on `ConversationHistoryResponse`, or a sibling
+ `GET /conversations/:id/metrics` (read endpoint = the right home for historical metrics, vs the
+ live stream). New contract + transport-http route + conversation-store schema + FE.
+- The trace-store already holds rich per-span timing durably; Pass 2 could alternatively expose a
+ read slice of THAT rather than duplicating into conversation-store (decide at Pass-2 design time).
+Sequenced after Pass 1 lands (and can ride alongside the FE-Slice work that consumes Pass 1).
+
### 3. dedup / storage growth (after frontend)
The deferred trace-body de-duplication + rotation/compression (D5 volume-control +
`prefix.fingerprint` + §6 retention strategy) — already designed in