diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/metrics/format.test.ts | 59 | ||||
| -rw-r--r-- | src/core/metrics/format.ts | 29 | ||||
| -rw-r--r-- | src/core/metrics/index.ts | 2 | ||||
| -rw-r--r-- | src/core/metrics/place.test.ts | 13 | ||||
| -rw-r--r-- | src/core/metrics/place.ts | 13 | ||||
| -rw-r--r-- | src/core/metrics/types.ts | 5 | ||||
| -rw-r--r-- | src/core/protocol/index.ts | 11 | ||||
| -rw-r--r-- | src/core/protocol/reducer.test.ts | 110 | ||||
| -rw-r--r-- | src/core/protocol/reducer.ts | 101 | ||||
| -rw-r--r-- | src/core/protocol/types.ts | 19 |
10 files changed, 328 insertions, 34 deletions
diff --git a/src/core/metrics/format.test.ts b/src/core/metrics/format.test.ts index 77c5204..3eec93d 100644 --- a/src/core/metrics/format.test.ts +++ b/src/core/metrics/format.test.ts @@ -2,8 +2,10 @@ import type { StepId, StepMetrics, TurnMetrics } from "@dispatch/wire"; import { describe, expect, it } from "vitest"; import { computeCachePct, + computeExpectedCachePct, computeTps, viewCacheRate, + viewExpectedCache, viewStepMetrics, viewTurnMetrics, } from "./format"; @@ -249,3 +251,60 @@ describe("viewCacheRate", () => { expect(miss.isHit).toBe(false); }); }); + +describe("computeExpectedCachePct", () => { + it("null when there is no prior turn (first turn has no baseline)", () => { + expect(computeExpectedCachePct({ inputTokens: 100, outputTokens: 0 }, null)).toBeNull(); + }); + + it("null when the prior turn cached nothing (denominator 0)", () => { + const prev = { inputTokens: 100, outputTokens: 0 }; + const current = { inputTokens: 200, outputTokens: 0, cacheReadTokens: 50 }; + expect(computeExpectedCachePct(current, prev)).toBeNull(); + }); + + it("100% when the whole prior cached prefix was read back (backend worked example)", () => { + // turn 1: cacheRead 0, cacheWrite 5146 → prefix 5146; turn 2 reads 5146 back. + const prev = { inputTokens: 5149, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 5146 }; + const current = { + inputTokens: 8462, + outputTokens: 0, + cacheReadTokens: 5146, + cacheWriteTokens: 3313, + }; + expect(computeExpectedCachePct(current, prev)).toBe(100); + }); + + it("drops below 100% when the cache busted (read < prior prefix)", () => { + const prev = { + inputTokens: 1000, + outputTokens: 0, + cacheReadTokens: 100, + cacheWriteTokens: 900, + }; + const current = { inputTokens: 1000, outputTokens: 0, cacheReadTokens: 500 }; + // 500 / (100 + 900) = 50% + expect(computeExpectedCachePct(current, prev)).toBe(50); + }); + + it("clamps to 100 if read somehow exceeds the prior prefix", () => { + const prev = { inputTokens: 100, outputTokens: 0, cacheWriteTokens: 100 }; + const current = { inputTokens: 100, outputTokens: 0, cacheReadTokens: 250 }; + expect(computeExpectedCachePct(current, prev)).toBe(100); + }); +}); + +describe("viewExpectedCache", () => { + it("null view when it cannot be derived (no prior turn)", () => { + expect(viewExpectedCache({ inputTokens: 100, outputTokens: 0 }, null)).toBeNull(); + }); + + it("success level + hit flag for full retention", () => { + const prev = { inputTokens: 5149, outputTokens: 0, cacheWriteTokens: 5146 }; + const current = { inputTokens: 8462, outputTokens: 0, cacheReadTokens: 5146 }; + const v = viewExpectedCache(current, prev); + expect(v?.pct).toBe(100); + expect(v?.level).toBe("success"); + expect(v?.isHit).toBe(true); + }); +}); diff --git a/src/core/metrics/format.ts b/src/core/metrics/format.ts index cc86976..ee8db60 100644 --- a/src/core/metrics/format.ts +++ b/src/core/metrics/format.ts @@ -75,6 +75,35 @@ export function viewCacheRate(u: Usage): CacheRateView { return { pct, level: cacheLevel(pct), isHit: (u.cacheReadTokens ?? 0) > 0 }; } +/** + * Expected cache (retention): of the cache that existed going INTO this turn, how + * much was read back — `clamp01(cacheRead_N / (cacheRead_{N-1} + cacheWrite_{N-1}))`. + * The denominator is the PRIOR turn's cached prefix (what it read + what it wrote). + * Ideally ~100% on every turn after the first; <100% = the cache busted/expired. + * + * Returns `null` when it cannot be derived: no prior turn (`prev === null`) or the + * prior turn cached nothing (denominator <= 0) — distinct from a real 0%. + */ +export function computeExpectedCachePct(current: Usage, prev: Usage | null): number | null { + if (prev === null) return null; + const denom = (prev.cacheReadTokens ?? 0) + (prev.cacheWriteTokens ?? 0); + if (denom <= 0) return null; + const read = current.cacheReadTokens ?? 0; + const rate = read / denom; + const clamped = rate < 0 ? 0 : rate > 1 ? 1 : rate; + return Math.round(clamped * 100); +} + +/** + * Build a view of the cross-turn retention (percentage + colour level + hit flag), + * or `null` when it can't be derived (see `computeExpectedCachePct`). + */ +export function viewExpectedCache(current: Usage, prev: Usage | null): CacheRateView | null { + const pct = computeExpectedCachePct(current, prev); + if (pct === null) return null; + return { pct, level: cacheLevel(pct), isHit: (current.cacheReadTokens ?? 0) > 0 }; +} + /** Build a formatted view of a turn's aggregate metrics. */ export function viewTurnMetrics(turn: TurnMetrics): TurnMetricsView { const total = totalTokens(turn.usage); diff --git a/src/core/metrics/index.ts b/src/core/metrics/index.ts index 6997ab9..8822159 100644 --- a/src/core/metrics/index.ts +++ b/src/core/metrics/index.ts @@ -1,7 +1,9 @@ export { computeCachePct, + computeExpectedCachePct, computeTps, viewCacheRate, + viewExpectedCache, viewStepMetrics, viewTurnMetrics, } from "./format"; diff --git a/src/core/metrics/place.test.ts b/src/core/metrics/place.test.ts index d94882d..0b9c0ec 100644 --- a/src/core/metrics/place.test.ts +++ b/src/core/metrics/place.test.ts @@ -526,4 +526,17 @@ describe("interleaveTurnMetrics — cumulative usage (cache total)", () => { expect(tm[0]?.cumulativeUsage.inputTokens).toBe(1000); expect(tm[0]?.cumulativeUsage.cacheReadTokens).toBe(500); }); + + it("carries the prior finalized turn's usage as the retention baseline", () => { + const rows = interleaveTurnMetrics( + [userGroup(1, "q1"), assistantGroup(2, "a1"), userGroup(3, "q2"), assistantGroup(4, "a2")], + [cacheEntry("t1", 2669, 10, 384), cacheEntry("t2", 2737, 10, 2560)], + ); + const tm = turnMetricsRows(rows); + // first finalized turn has no earlier baseline + expect(tm[0]?.prevTurnUsage).toBeNull(); + // second turn's baseline is the first turn's usage + expect(tm[1]?.prevTurnUsage?.inputTokens).toBe(2669); + expect(tm[1]?.prevTurnUsage?.cacheReadTokens).toBe(384); + }); }); diff --git a/src/core/metrics/place.ts b/src/core/metrics/place.ts index fc30df0..afeb84b 100644 --- a/src/core/metrics/place.ts +++ b/src/core/metrics/place.ts @@ -79,11 +79,19 @@ export function interleaveTurnMetrics( } // Running cumulative usage across finalized turns (conversation total at each - // entry index), for the per-turn "chat total" cache rate. + // entry index), for the per-turn "chat total" cache rate. Alongside it, the + // previous finalized turn's usage at each index — the baseline for cross-turn + // retention (expected cache). const cumulativeByEntry: Usage[] = []; + const prevUsageByEntry: (Usage | null)[] = []; let runningUsage: Usage = { inputTokens: 0, outputTokens: 0 }; + let lastFinalizedUsage: Usage | null = null; for (const e of entries) { - if (e.total !== null) runningUsage = addUsage(runningUsage, e.total.usage); + prevUsageByEntry.push(lastFinalizedUsage); + if (e.total !== null) { + runningUsage = addUsage(runningUsage, e.total.usage); + lastFinalizedUsage = e.total.usage; + } cumulativeByEntry.push(runningUsage); } @@ -170,6 +178,7 @@ export function interleaveTurnMetrics( kind: "turn-metrics", turn: entry.total, cumulativeUsage: cumulativeByEntry[seg] ?? entry.total.usage, + prevTurnUsage: prevUsageByEntry[seg] ?? null, }); } } diff --git a/src/core/metrics/types.ts b/src/core/metrics/types.ts index cf2511c..f5557f7 100644 --- a/src/core/metrics/types.ts +++ b/src/core/metrics/types.ts @@ -52,6 +52,11 @@ export type MetricsRow = readonly turn: TurnMetrics; /** Cumulative usage across all finalized turns up to and including this one. */ readonly cumulativeUsage: Usage; + /** + * Usage of the most recent EARLIER finalized turn, or `null` when this is the + * first finalized turn. The baseline for cross-turn retention (expected cache). + */ + readonly prevTurnUsage: Usage | null; }; /** Formatted cache hit-rate view: percentage + colour severity + hit flag. */ diff --git a/src/core/protocol/index.ts b/src/core/protocol/index.ts index 25174ea..e7fd161 100644 --- a/src/core/protocol/index.ts +++ b/src/core/protocol/index.ts @@ -1,2 +1,9 @@ -export { applyServerMessage, initialState, invoke, subscribe, unsubscribe } from "./reducer"; -export type { ProtocolResult, ProtocolState } from "./types"; +export { + applyServerMessage, + getSurfaceSpec, + initialState, + invoke, + subscribe, + unsubscribe, +} from "./reducer"; +export type { ProtocolResult, ProtocolState, Subscription } from "./types"; diff --git a/src/core/protocol/reducer.test.ts b/src/core/protocol/reducer.test.ts index 57e12f2..c8e517a 100644 --- a/src/core/protocol/reducer.test.ts +++ b/src/core/protocol/reducer.test.ts @@ -1,5 +1,12 @@ import { describe, expect, it } from "vitest"; -import { applyServerMessage, initialState, invoke, subscribe, unsubscribe } from "./reducer"; +import { + applyServerMessage, + getSurfaceSpec, + initialState, + invoke, + subscribe, + unsubscribe, +} from "./reducer"; const makeSpec = (id: string, title = id) => ({ id, @@ -32,11 +39,10 @@ describe("applyServerMessage — catalog", () => { describe("applyServerMessage — surface", () => { it("sets the spec for a subscribed surface", () => { let s = initialState(); - const result = subscribe(s, "s1"); - s = result.state; + s = subscribe(s, "s1").state; const spec = makeSpec("s1", "Surface 1"); const next = applyServerMessage(s, { type: "surface", spec }); - expect(next.subscriptions.get("s1")).toEqual(spec); + expect(getSurfaceSpec(next, "s1")).toEqual(spec); }); it("ignores a surface message for a non-subscribed surface", () => { @@ -56,7 +62,7 @@ describe("applyServerMessage — update", () => { type: "update", update: { surfaceId: "s1", spec: makeSpec("s1", "V2") }, }); - expect(next.subscriptions.get("s1")?.title).toBe("V2"); + expect(getSurfaceSpec(next, "s1")?.title).toBe("V2"); }); it("ignores an update for a non-subscribed surface", () => { @@ -86,7 +92,7 @@ describe("applyServerMessage — error", () => { }); describe("subscribe", () => { - it("emits exactly one subscribe message", () => { + it("emits exactly one subscribe message (global, no conversationId)", () => { const s = initialState(); const result = subscribe(s, "s1"); expect(result.outgoing).toEqual([{ type: "subscribe", surfaceId: "s1" }]); @@ -96,10 +102,14 @@ describe("subscribe", () => { it("adds the surface to subscriptions with null spec", () => { const s = initialState(); const result = subscribe(s, "s1"); - expect(result.state.subscriptions.get("s1")).toBeNull(); + expect(result.state.subscriptions.get("s1")).toEqual({ + conversationId: undefined, + spec: null, + }); + expect(getSurfaceSpec(result.state, "s1")).toBeNull(); }); - it("is idempotent — second subscribe is a no-op", () => { + it("is idempotent — second subscribe with the same scope is a no-op", () => { let s = initialState(); s = subscribe(s, "s1").state; const result = subscribe(s, "s1"); @@ -108,6 +118,67 @@ describe("subscribe", () => { }); }); +describe("subscribe — conversation-scoped", () => { + it("includes conversationId in the subscribe message", () => { + const s = initialState(); + const result = subscribe(s, "cache-warming", "conv-A"); + expect(result.outgoing).toEqual([ + { type: "subscribe", surfaceId: "cache-warming", conversationId: "conv-A" }, + ]); + expect(result.state.subscriptions.get("cache-warming")?.conversationId).toBe("conv-A"); + }); + + it("re-scopes on conversation switch: unsubscribe old pair then subscribe new", () => { + let s = initialState(); + s = subscribe(s, "cw", "conv-A").state; + s = applyServerMessage(s, { + type: "surface", + spec: makeSpec("cw", "A-spec"), + conversationId: "conv-A", + }); + const result = subscribe(s, "cw", "conv-B"); + expect(result.outgoing).toEqual([ + { type: "unsubscribe", surfaceId: "cw", conversationId: "conv-A" }, + { type: "subscribe", surfaceId: "cw", conversationId: "conv-B" }, + ]); + // previous spec retained until the new one arrives (no flicker) + expect(getSurfaceSpec(result.state, "cw")?.title).toBe("A-spec"); + expect(result.state.subscriptions.get("cw")?.conversationId).toBe("conv-B"); + }); + + it("drops a stale update echoing the previous conversationId", () => { + let s = initialState(); + s = subscribe(s, "cw", "conv-A").state; + s = subscribe(s, "cw", "conv-B").state; // re-scoped to B + const next = applyServerMessage(s, { + type: "update", + update: { surfaceId: "cw", spec: makeSpec("cw", "STALE-A"), conversationId: "conv-A" }, + }); + expect(getSurfaceSpec(next, "cw")).toBeNull(); // stale ignored, no spec yet for B + }); + + it("accepts an update echoing the current conversationId", () => { + let s = initialState(); + s = subscribe(s, "cw", "conv-B").state; + const next = applyServerMessage(s, { + type: "update", + update: { surfaceId: "cw", spec: makeSpec("cw", "B-spec"), conversationId: "conv-B" }, + }); + expect(getSurfaceSpec(next, "cw")?.title).toBe("B-spec"); + }); + + it("accepts a global (no-echo) surface message even when subscribed with a conversationId", () => { + // loaded-extensions is global: server ignores our conversationId and echoes none. + let s = initialState(); + s = subscribe(s, "loaded-extensions", "conv-A").state; + const next = applyServerMessage(s, { + type: "surface", + spec: makeSpec("loaded-extensions", "Ext"), + }); + expect(getSurfaceSpec(next, "loaded-extensions")?.title).toBe("Ext"); + }); +}); + describe("unsubscribe", () => { it("emits unsubscribe and drops the spec", () => { let s = initialState(); @@ -118,6 +189,15 @@ describe("unsubscribe", () => { expect(result.state.subscriptions.has("s1")).toBe(false); }); + it("includes conversationId for a scoped subscription", () => { + let s = initialState(); + s = subscribe(s, "cw", "conv-A").state; + const result = unsubscribe(s, "cw"); + expect(result.outgoing).toEqual([ + { type: "unsubscribe", surfaceId: "cw", conversationId: "conv-A" }, + ]); + }); + it("is a no-op if not subscribed", () => { const s = initialState(); const result = unsubscribe(s, "nope"); @@ -143,6 +223,20 @@ describe("invoke", () => { ]); }); + it("includes conversationId when provided", () => { + const s = initialState(); + const result = invoke(s, "cw", "cache-warming/set-interval", 120, "conv-A"); + expect(result.outgoing).toEqual([ + { + type: "invoke", + surfaceId: "cw", + actionId: "cache-warming/set-interval", + payload: 120, + conversationId: "conv-A", + }, + ]); + }); + it("does not mutate state", () => { const s = initialState(); const result = invoke(s, "s1", "a1"); diff --git a/src/core/protocol/reducer.ts b/src/core/protocol/reducer.ts index 992a918..3d6b1c8 100644 --- a/src/core/protocol/reducer.ts +++ b/src/core/protocol/reducer.ts @@ -2,6 +2,7 @@ import type { InvokeMessage, SubscribeMessage, SurfaceServerMessage, + SurfaceSpec, UnsubscribeMessage, } from "@dispatch/ui-contract"; import type { ProtocolResult, ProtocolState } from "./types"; @@ -15,6 +16,31 @@ export function initialState(): ProtocolState { }; } +// ── Message builders (respect exactOptionalPropertyTypes: omit `conversationId` +// entirely for a global subscription rather than setting it to `undefined`). ── + +function subMsg(surfaceId: string, conversationId: string | undefined): SubscribeMessage { + return conversationId === undefined + ? { type: "subscribe", surfaceId } + : { type: "subscribe", surfaceId, conversationId }; +} + +function unsubMsg(surfaceId: string, conversationId: string | undefined): UnsubscribeMessage { + return conversationId === undefined + ? { type: "unsubscribe", surfaceId } + : { type: "unsubscribe", surfaceId, conversationId }; +} + +/** + * Is an inbound spec/update (which echoes `echoedId`) current for the + * subscription whose desired scope is `desiredId`? A scoped surface echoes its + * conversationId, so it must match the one we last subscribed with; a GLOBAL + * surface echoes nothing (`undefined`) and is always current. + */ +function isCurrent(desiredId: string | undefined, echoedId: string | undefined): boolean { + return echoedId === undefined || echoedId === desiredId; +} + /** Fold an inbound server message into the next protocol state. */ export function applyServerMessage(state: ProtocolState, msg: SurfaceServerMessage): ProtocolState { switch (msg.type) { @@ -22,18 +48,21 @@ export function applyServerMessage(state: ProtocolState, msg: SurfaceServerMessa return { ...state, catalog: msg.catalog }; case "surface": { - const surfaceId = msg.spec.id; - if (!state.subscriptions.has(surfaceId)) return state; + const sub = state.subscriptions.get(msg.spec.id); + if (sub === undefined) return state; + if (!isCurrent(sub.conversationId, msg.conversationId)) return state; const subs = new Map(state.subscriptions); - subs.set(surfaceId, msg.spec); + subs.set(msg.spec.id, { conversationId: sub.conversationId, spec: msg.spec }); return { ...state, subscriptions: subs }; } case "update": { - const surfaceId = msg.update.surfaceId; - if (!state.subscriptions.has(surfaceId)) return state; + const { surfaceId, spec, conversationId } = msg.update; + const sub = state.subscriptions.get(surfaceId); + if (sub === undefined) return state; + if (!isCurrent(sub.conversationId, conversationId)) return state; const subs = new Map(state.subscriptions); - subs.set(surfaceId, msg.update.spec); + subs.set(surfaceId, { conversationId: sub.conversationId, spec }); return { ...state, subscriptions: subs }; } @@ -43,40 +72,72 @@ export function applyServerMessage(state: ProtocolState, msg: SurfaceServerMessa } /** - * Subscribe to a surface. Idempotent: if already subscribed, returns the same - * state with no outgoing message. + * Subscribe to a surface for a given conversation (omit `conversationId` for a + * GLOBAL surface / when no conversation is focused). + * + * - Not yet subscribed → emits one `subscribe`. + * - Already subscribed with the SAME scope → idempotent no-op. + * - Already subscribed with a DIFFERENT conversation (a re-scope on conversation + * switch) → emits `unsubscribe` for the old pair then `subscribe` for the new + * one, retaining the previous spec until the new one arrives (no flicker). */ -export function subscribe(state: ProtocolState, surfaceId: string): ProtocolResult { - if (state.subscriptions.has(surfaceId)) { +export function subscribe( + state: ProtocolState, + surfaceId: string, + conversationId?: string, +): ProtocolResult { + const existing = state.subscriptions.get(surfaceId); + if (existing !== undefined && existing.conversationId === conversationId) { return { state, outgoing: [] }; } const subs = new Map(state.subscriptions); - subs.set(surfaceId, null); - const outgoing: SubscribeMessage = { type: "subscribe", surfaceId }; - return { state: { ...state, subscriptions: subs }, outgoing: [outgoing] }; + const outgoing: (SubscribeMessage | UnsubscribeMessage)[] = []; + const priorSpec: SurfaceSpec | null = existing?.spec ?? null; + if (existing !== undefined) { + outgoing.push(unsubMsg(surfaceId, existing.conversationId)); + } + subs.set(surfaceId, { conversationId, spec: priorSpec }); + outgoing.push(subMsg(surfaceId, conversationId)); + return { state: { ...state, subscriptions: subs }, outgoing }; } /** - * Unsubscribe from a surface. Drops the local spec and emits one unsubscribe. - * If not subscribed, returns the same state with no outgoing. + * Unsubscribe from a surface. Drops the local subscription and emits one + * `unsubscribe` (for the conversation pair it was subscribed under). No-op if + * not subscribed. */ export function unsubscribe(state: ProtocolState, surfaceId: string): ProtocolResult { - if (!state.subscriptions.has(surfaceId)) { + const existing = state.subscriptions.get(surfaceId); + if (existing === undefined) { return { state, outgoing: [] }; } const subs = new Map(state.subscriptions); subs.delete(surfaceId); - const outgoing: UnsubscribeMessage = { type: "unsubscribe", surfaceId }; - return { state: { ...state, subscriptions: subs }, outgoing: [outgoing] }; + return { + state: { ...state, subscriptions: subs }, + outgoing: [unsubMsg(surfaceId, existing.conversationId)], + }; } -/** Invoke a field's action on a surface. Emits an InvokeMessage; no state change. */ +/** + * Invoke a field's action on a surface. Emits an InvokeMessage (carrying + * `conversationId` for a scoped surface); no state change. + */ export function invoke( state: ProtocolState, surfaceId: string, actionId: string, payload?: unknown, + conversationId?: string, ): ProtocolResult { - const outgoing: InvokeMessage = { type: "invoke", surfaceId, actionId, payload }; + const outgoing: InvokeMessage = + conversationId === undefined + ? { type: "invoke", surfaceId, actionId, payload } + : { type: "invoke", surfaceId, actionId, payload, conversationId }; return { state, outgoing: [outgoing] }; } + +/** The current spec for a subscribed surface, or `null` if absent/unsubscribed. */ +export function getSurfaceSpec(state: ProtocolState, surfaceId: string): SurfaceSpec | null { + return state.subscriptions.get(surfaceId)?.spec ?? null; +} diff --git a/src/core/protocol/types.ts b/src/core/protocol/types.ts index effec0d..db8886a 100644 --- a/src/core/protocol/types.ts +++ b/src/core/protocol/types.ts @@ -5,12 +5,27 @@ import type { SurfaceSpec, } from "@dispatch/ui-contract"; +/** + * One surface subscription's local state. + * + * `conversationId` is the conversation we last subscribed this surface WITH + * (`undefined` = subscribed globally, no conversation in focus). It is the + * "desired" scope: an inbound `surface`/`update` that echoes a DIFFERENT + * conversation is stale (we have since re-scoped) and is dropped. A GLOBAL + * surface ignores the id server-side and echoes none — that (`undefined` echo) + * is always accepted. `spec` is `null` until the first `surface` arrives. + */ +export interface Subscription { + readonly conversationId: string | undefined; + readonly spec: SurfaceSpec | null; +} + /** The client-side view of the surface protocol state. */ export interface ProtocolState { /** The latest catalog received from the server (empty until first CatalogMessage). */ readonly catalog: SurfaceCatalog; - /** Surfaces the client intends to be subscribed to; null = subscribed but no spec yet. */ - readonly subscriptions: ReadonlyMap<string, SurfaceSpec | null>; + /** Surfaces the client intends to be subscribed to, keyed by surfaceId. */ + readonly subscriptions: ReadonlyMap<string, Subscription>; /** The last error received from the server, if any. */ readonly lastError: SurfaceErrorMessage | null; } |
