diff options
| author | Adam Malczewski <[email protected]> | 2026-06-11 14:48:30 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-11 14:48:30 +0900 |
| commit | bfbad3af79cab23f52be0f6388311a5798b7fd04 (patch) | |
| tree | 7bda9843aca8d7fad589ba0b7b5579e2627a3585 | |
| parent | 58e2ad559cccc8b35c513818e253b04e60af69b8 (diff) | |
| download | dispatch-bfbad3af79cab23f52be0f6388311a5798b7fd04.tar.gz dispatch-bfbad3af79cab23f52be0f6388311a5798b7fd04.zip | |
feat(cache-warming): CR-3 — manual warm resets timer + nextWarmAt/lastWarmAt surface
FE CR-3 (backend-handoff-cache-warming-timer.md). The inversion: session-orchestrator's
warm() (the single chokepoint for manual /chat/warm AND the automatic timer) emits a
warmCompleted bus event; cache-warming subscribes and does ALL post-warm handling. So a
manual warm now re-arms the timer + refreshes the surface with NO transport-http change
(core can't depend on the standard cache-warming ext).
- session-orchestrator: warmCompleted event hook + emit from warm() on success
- cache-warming: warmCompleted subscriber unifies result handling (manual + automatic);
adds nextWarmAt/lastWarmAt state + a custom 'cache-warming-timer' surface field
- fix: createWarmService was missing the emit dep (deps.emit?. silently no-oped) →
wired it + made emit REQUIRED so it can't regress
Live-verified vs claude haiku: manual POST /chat/warm now logs cache-warming 'warm
complete' ~2s after the turn (not the 4-min timer) → manual warm reaches the warmer.
800 vitest + 109 bun green; tsc -b 0; biome clean.
| -rw-r--r-- | packages/cache-warming/src/extension.ts | 14 | ||||
| -rw-r--r-- | packages/cache-warming/src/pure.test.ts | 31 | ||||
| -rw-r--r-- | packages/cache-warming/src/pure.ts | 19 | ||||
| -rw-r--r-- | packages/cache-warming/src/warmer.test.ts | 413 | ||||
| -rw-r--r-- | packages/cache-warming/src/warmer.ts | 82 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/extension.ts | 7 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/index.ts | 3 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 108 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 21 |
9 files changed, 467 insertions, 231 deletions
diff --git a/packages/cache-warming/src/extension.ts b/packages/cache-warming/src/extension.ts index 802618a..19f5130 100644 --- a/packages/cache-warming/src/extension.ts +++ b/packages/cache-warming/src/extension.ts @@ -1,5 +1,10 @@ import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; -import { cacheWarmHandle, turnSettled, turnStarted } from "@dispatch/session-orchestrator"; +import { + cacheWarmHandle, + turnSettled, + turnStarted, + warmCompleted, +} from "@dispatch/session-orchestrator"; import type { SurfaceContext, SurfaceProvider } from "@dispatch/surface-registry"; import { surfaceRegistryHandle } from "@dispatch/surface-registry"; import type { SurfaceSpec } from "@dispatch/ui-contract"; @@ -53,6 +58,7 @@ export function activate(host: HostAPI): void { } }, }, + now: () => Date.now(), onSurfaceChange: () => { for (const notify of subscribers) { notify(); @@ -71,6 +77,10 @@ export function activate(host: HostAPI): void { }); }); + host.on(warmCompleted, (payload) => { + warmer.onWarmCompleted(payload); + }); + function getSpec(context?: SurfaceContext): SurfaceSpec { const convId = context?.conversationId; if (convId === undefined) { @@ -82,6 +92,8 @@ export function activate(host: HostAPI): void { state.intervalMs, state.lastPct, state.lastExpectedPct, + state.nextWarmAt, + state.lastWarmAt, ); } diff --git a/packages/cache-warming/src/pure.test.ts b/packages/cache-warming/src/pure.test.ts index f5e2f1d..a503798 100644 --- a/packages/cache-warming/src/pure.test.ts +++ b/packages/cache-warming/src/pure.test.ts @@ -52,6 +52,8 @@ describe("shouldWarm", () => { active: false, lastPct: null, lastExpectedPct: null, + lastWarmAt: null, + nextWarmAt: null, token: 5, }; expect(shouldWarm(state, 5)).toBe(true); @@ -64,6 +66,8 @@ describe("shouldWarm", () => { active: false, lastPct: null, lastExpectedPct: null, + lastWarmAt: null, + nextWarmAt: null, token: 5, }; expect(shouldWarm(state, 5)).toBe(false); @@ -76,6 +80,8 @@ describe("shouldWarm", () => { active: true, lastPct: null, lastExpectedPct: null, + lastWarmAt: null, + nextWarmAt: null, token: 5, }; expect(shouldWarm(state, 5)).toBe(false); @@ -88,6 +94,8 @@ describe("shouldWarm", () => { active: false, lastPct: null, lastExpectedPct: null, + lastWarmAt: null, + nextWarmAt: null, token: 5, }; expect(shouldWarm(state, 6)).toBe(false); @@ -181,12 +189,12 @@ describe("parseIntervalPayload", () => { }); describe("buildConversationSpec", () => { - it("builds a per-conversation spec with toggle + number(interval) + last-% + retention fields", () => { - const spec = buildConversationSpec(true, 240_000, 80, 95); + it("builds a per-conversation spec with toggle + number(interval) + last-% + retention + timer fields", () => { + const spec = buildConversationSpec(true, 240_000, 80, 95, 1000, 500); expect(spec.id).toBe("cache-warming"); expect(spec.region).toBe("side"); expect(spec.title).toBe("Cache Warming"); - expect(spec.fields).toHaveLength(4); + expect(spec.fields).toHaveLength(5); const toggle = spec.fields[0]; expect(toggle).toEqual({ @@ -220,10 +228,17 @@ describe("buildConversationSpec", () => { label: "Cache retention", value: "95%", }); + + const timer = spec.fields[4]; + expect(timer).toEqual({ + kind: "custom", + rendererId: "cache-warming-timer", + payload: { nextWarmAt: 1000, lastWarmAt: 500 }, + }); }); it("shows — when lastPct and lastExpectedPct are null", () => { - const spec = buildConversationSpec(true, 240_000, null, null); + const spec = buildConversationSpec(true, 240_000, null, null, null, null); const stat = spec.fields[2]; expect(stat).toEqual({ kind: "stat", @@ -236,10 +251,16 @@ describe("buildConversationSpec", () => { label: "Cache retention", value: "—", }); + const timer = spec.fields[4]; + expect(timer).toEqual({ + kind: "custom", + rendererId: "cache-warming-timer", + payload: { nextWarmAt: null, lastWarmAt: null }, + }); }); it("reflects disabled state", () => { - const spec = buildConversationSpec(false, 120_000, 50, 75); + const spec = buildConversationSpec(false, 120_000, 50, 75, null, null); const toggle = spec.fields[0]; expect(toggle).toEqual({ kind: "toggle", diff --git a/packages/cache-warming/src/pure.ts b/packages/cache-warming/src/pure.ts index ab6fc79..c4cbe8a 100644 --- a/packages/cache-warming/src/pure.ts +++ b/packages/cache-warming/src/pure.ts @@ -3,7 +3,13 @@ * Every function is input → output; testable without mocks. */ -import type { NumberField, StatField, SurfaceSpec, ToggleField } from "@dispatch/ui-contract"; +import type { + CustomField, + NumberField, + StatField, + SurfaceSpec, + ToggleField, +} from "@dispatch/ui-contract"; // --- Types --- @@ -18,6 +24,8 @@ export interface ConversationState extends ConversationSettings { readonly active: boolean; readonly lastPct: number | null; readonly lastExpectedPct: number | null; + readonly lastWarmAt: number | null; + readonly nextWarmAt: number | null; readonly token: number; } @@ -137,6 +145,8 @@ export function buildConversationSpec( intervalMs: number, lastPct: number | null, lastExpectedPct: number | null, + nextWarmAt: number | null, + lastWarmAt: number | null, ): SurfaceSpec { const pctDisplay = lastPct === null ? "—" : `${lastPct}%`; const retentionDisplay = lastExpectedPct === null ? "—" : `${lastExpectedPct}%`; @@ -165,11 +175,16 @@ export function buildConversationSpec( label: "Cache retention", value: retentionDisplay, }; + const timer: CustomField = { + kind: "custom", + rendererId: "cache-warming-timer", + payload: { nextWarmAt, lastWarmAt }, + }; return { id: "cache-warming", region: "side", title: "Cache Warming", - fields: [toggle, interval, stat, retentionStat], + fields: [toggle, interval, stat, retentionStat, timer], }; } diff --git a/packages/cache-warming/src/warmer.test.ts b/packages/cache-warming/src/warmer.test.ts index 86908a2..a389ccb 100644 --- a/packages/cache-warming/src/warmer.test.ts +++ b/packages/cache-warming/src/warmer.test.ts @@ -1,4 +1,4 @@ -import type { Logger, Span } from "@dispatch/kernel"; +import type { Logger, Span, StorageNamespace } from "@dispatch/kernel"; import type { WarmResult } from "@dispatch/session-orchestrator"; import { describe, expect, it } from "vitest"; import { MIN_INTERVAL_MS } from "./pure.js"; @@ -70,179 +70,86 @@ const WARM_RESULT: WarmResult = { cacheWriteTokens: 0, }; -import type { StorageNamespace } from "@dispatch/kernel"; +function makeDeps( + overrides: Partial<{ + warm: (conversationId: string) => Promise<WarmResult>; + onSurfaceChange: () => void; + now: () => number; + }> = {}, +) { + const timers = fakeTimers(); + let nowMs = 1000; + return { + timers, + warm: overrides.warm ?? (async () => WARM_RESULT), + storage: memStorage(), + logger: makeLogger(), + now: overrides.now ?? (() => nowMs), + onSurfaceChange: overrides.onSurfaceChange ?? (() => {}), + setNow(ms: number) { + nowMs = ms; + }, + }; +} describe("CacheWarmer", () => { it("arms a timer on turnSettled and warms when it fires (enabled)", async () => { - const timers = fakeTimers(); + const deps = makeDeps(); const warmCalls: string[] = []; - const warmer = createCacheWarmer({ - warm: async (convId) => { - warmCalls.push(convId); - return WARM_RESULT; - }, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); + deps.warm = async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }; + const warmer = createCacheWarmer(deps); warmer.onTurnSettled("conv-1", {}); - timers.flush(); + deps.timers.flush(); await new Promise((r) => setTimeout(r, 10)); expect(warmCalls).toContain("conv-1"); }); it("cancels the timer on turnStarted (no warm while generating)", () => { - const timers = fakeTimers(); + const deps = makeDeps(); const warmCalls: string[] = []; - const warmer = createCacheWarmer({ - warm: async (convId) => { - warmCalls.push(convId); - return WARM_RESULT; - }, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); + deps.warm = async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }; + const warmer = createCacheWarmer(deps); warmer.onTurnSettled("conv-1", {}); warmer.onTurnStarted("conv-1"); - timers.flush(); + deps.timers.flush(); expect(warmCalls).toHaveLength(0); }); - it("in-flight warm result is dropped when superseded (token mismatch)", async () => { - const timers = fakeTimers(); - let resolveWarm: (v: WarmResult) => void = () => {}; - const warmPromise = new Promise<WarmResult>((r) => { - resolveWarm = r; - }); - const warmer = createCacheWarmer({ - warm: () => warmPromise, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); - - warmer.onTurnSettled("conv-1", {}); - timers.flush(); - - warmer.onTurnStarted("conv-1"); - warmer.onTurnSettled("conv-1", {}); - - resolveWarm?.(WARM_RESULT); - await new Promise((r) => setTimeout(r, 10)); - - const state = warmer.getState("conv-1"); - expect(state.lastPct).toBeNull(); - }); - it("disabled conversation does not warm", async () => { - const timers = fakeTimers(); + const deps = makeDeps(); const warmCalls: string[] = []; - const warmer = createCacheWarmer({ - warm: async (convId) => { - warmCalls.push(convId); - return WARM_RESULT; - }, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); + deps.warm = async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }; + const warmer = createCacheWarmer(deps); await warmer.setEnabled("conv-1", false); warmer.onTurnSettled("conv-1", {}); - timers.flush(); + deps.timers.flush(); await new Promise((r) => setTimeout(r, 10)); expect(warmCalls).toHaveLength(0); }); - it("stores lastPct from the warm result", async () => { - const timers = fakeTimers(); - const warmer = createCacheWarmer({ - warm: async () => WARM_RESULT, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); - - warmer.onTurnSettled("conv-1", {}); - timers.flush(); - - await new Promise((r) => setTimeout(r, 10)); - const state = warmer.getState("conv-1"); - expect(state.lastPct).toBe(80); - }); - - it("a completed warm stores both lastPct (rate) and lastExpectedPct (retention)", async () => { - const timers = fakeTimers(); - const warmer = createCacheWarmer({ - warm: async () => ({ - inputTokens: 1000, - outputTokens: 10, - cacheReadTokens: 700, - cacheWriteTokens: 300, - }), - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); - - warmer.onTurnSettled("conv-1", {}); - timers.flush(); - - await new Promise((r) => setTimeout(r, 10)); - const state = warmer.getState("conv-1"); - expect(state.lastPct).toBe(70); - expect(state.lastExpectedPct).toBe(70); - }); - - it("re-arms timer after warm completes", async () => { - const timers = fakeTimers(); - let warmCount = 0; - const warmer = createCacheWarmer({ - warm: async () => { - warmCount++; - return WARM_RESULT; - }, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); - - warmer.onTurnSettled("conv-1", {}); - timers.flush(); - await new Promise((r) => setTimeout(r, 10)); - - timers.flush(); - await new Promise((r) => setTimeout(r, 10)); - - expect(warmCount).toBe(2); - }); - it("setIntervalMs converts seconds→ms, floors at MIN_INTERVAL_MS, and re-arms", async () => { - const timers = fakeTimers(); + const deps = makeDeps(); const warmCalls: string[] = []; - const warmer = createCacheWarmer({ - warm: async (convId) => { - warmCalls.push(convId); - return WARM_RESULT; - }, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); + deps.warm = async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }; + const warmer = createCacheWarmer(deps); // Enable and settle to arm the timer warmer.onTurnSettled("conv-1", {}); @@ -255,20 +162,14 @@ describe("CacheWarmer", () => { expect(state.intervalMs).toBe(30_000); // Timer should still be armed — flush fires it - timers.flush(); + deps.timers.flush(); await new Promise((r) => setTimeout(r, 10)); expect(warmCalls).toContain("conv-1"); }); it("setIntervalMs clamps values below MIN_INTERVAL_MS", async () => { - const timers = fakeTimers(); - const warmer = createCacheWarmer({ - warm: async () => WARM_RESULT, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); + const deps = makeDeps(); + const warmer = createCacheWarmer(deps); warmer.onTurnSettled("conv-1", {}); @@ -278,14 +179,8 @@ describe("CacheWarmer", () => { }); it("setIntervalMs ignores NaN / non-positive (clamps to MIN_INTERVAL_MS)", async () => { - const timers = fakeTimers(); - const warmer = createCacheWarmer({ - warm: async () => WARM_RESULT, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); + const deps = makeDeps(); + const warmer = createCacheWarmer(deps); warmer.onTurnSettled("conv-1", {}); @@ -300,14 +195,8 @@ describe("CacheWarmer", () => { }); it("setEnabled flips enabled for a conversation", async () => { - const timers = fakeTimers(); - const warmer = createCacheWarmer({ - warm: async () => WARM_RESULT, - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, - }); + const deps = makeDeps(); + const warmer = createCacheWarmer(deps); // Default is enabled expect(warmer.getState("conv-1").enabled).toBe(true); @@ -322,17 +211,13 @@ describe("CacheWarmer", () => { }); it("onSurfaceChange is called when settings change", async () => { - const timers = fakeTimers(); let changeCount = 0; - const warmer = createCacheWarmer({ - warm: async () => WARM_RESULT, - storage: memStorage(), - logger: makeLogger(), - timers, + const deps = makeDeps({ onSurfaceChange: () => { changeCount++; }, }); + const warmer = createCacheWarmer(deps); await warmer.setEnabled("conv-1", false); expect(changeCount).toBe(1); @@ -341,24 +226,176 @@ describe("CacheWarmer", () => { expect(changeCount).toBe(2); }); - it("the per-conversation spec includes a cache-retention stat", async () => { - const timers = fakeTimers(); - const warmer = createCacheWarmer({ - warm: async () => ({ - inputTokens: 1000, - outputTokens: 10, - cacheReadTokens: 900, - cacheWriteTokens: 100, - }), - storage: memStorage(), - logger: makeLogger(), - timers, - onSurfaceChange: () => {}, + it("warmCompleted updates lastPct/lastExpectedPct/lastWarmAt and re-arms (nextWarmAt set), pushes onSurfaceChange", () => { + let changeCount = 0; + let nowMs = 5000; + const deps = makeDeps({ + onSurfaceChange: () => { + changeCount++; + }, + now: () => nowMs, }); + const warmer = createCacheWarmer(deps); warmer.onTurnSettled("conv-1", {}); - timers.flush(); - await new Promise((r) => setTimeout(r, 10)); + const stateBefore = warmer.getState("conv-1"); + expect(stateBefore.lastPct).toBeNull(); + expect(stateBefore.lastExpectedPct).toBeNull(); + expect(stateBefore.lastWarmAt).toBeNull(); + + nowMs = 6000; + warmer.onWarmCompleted({ + conversationId: "conv-1", + usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 700, cacheWriteTokens: 300 }, + }); + + const state = warmer.getState("conv-1"); + expect(state.lastPct).toBe(70); + expect(state.lastExpectedPct).toBe(70); + expect(state.lastWarmAt).toBe(6000); + expect(state.nextWarmAt).not.toBeNull(); + expect(changeCount).toBe(1); + }); + + it("a warm that completes while the conversation is active is dropped (no update, no re-arm)", () => { + let changeCount = 0; + const deps = makeDeps({ + onSurfaceChange: () => { + changeCount++; + }, + now: () => 5000, + }); + const warmer = createCacheWarmer(deps); + + warmer.onTurnSettled("conv-1", {}); + warmer.onTurnStarted("conv-1"); + + warmer.onWarmCompleted({ + conversationId: "conv-1", + usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 800, cacheWriteTokens: 0 }, + }); + + const state = warmer.getState("conv-1"); + expect(state.lastPct).toBeNull(); + expect(state.lastWarmAt).toBeNull(); + expect(state.nextWarmAt).toBeNull(); + expect(changeCount).toBe(0); + }); + + it("nextWarmAt is set when armed and null when disabled or active", async () => { + let nowMs = 1000; + const deps = makeDeps({ now: () => nowMs }); + const warmer = createCacheWarmer(deps); + + // Before any event — not armed + expect(warmer.getState("conv-1").nextWarmAt).toBeNull(); + + // After turnSettled — armed with nextWarmAt + nowMs = 2000; + warmer.onTurnSettled("conv-1", {}); + const stateArmed = warmer.getState("conv-1"); + expect(stateArmed.nextWarmAt).toBe(2000 + 240_000); + + // After turnStarted — cancelled (null) + warmer.onTurnStarted("conv-1"); + expect(warmer.getState("conv-1").nextWarmAt).toBeNull(); + + // After disabling — null + warmer.onTurnSettled("conv-2", {}); + await warmer.setEnabled("conv-2", false); + expect(warmer.getState("conv-2").nextWarmAt).toBeNull(); + }); + + it("a manual warm (warmCompleted for a conversation) resets the timer + refreshes the surface", () => { + let changeCount = 0; + let nowMs = 5000; + const deps = makeDeps({ + onSurfaceChange: () => { + changeCount++; + }, + now: () => nowMs, + }); + const warmer = createCacheWarmer(deps); + + // Settle to arm the timer + warmer.onTurnSettled("conv-1", {}); + const armed = warmer.getState("conv-1"); + expect(armed.nextWarmAt).toBe(5000 + 240_000); + + // Simulate a manual warm completing at t=8000 + nowMs = 8000; + warmer.onWarmCompleted({ + conversationId: "conv-1", + usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 900, cacheWriteTokens: 100 }, + }); + + const after = warmer.getState("conv-1"); + expect(after.lastPct).toBe(90); + expect(after.lastExpectedPct).toBe(90); + expect(after.lastWarmAt).toBe(8000); + // Timer should be re-armed with new nextWarmAt + expect(after.nextWarmAt).toBe(8000 + 240_000); + expect(changeCount).toBe(1); + }); + + it("stores lastPct from the warmCompleted event", () => { + const deps = makeDeps({ now: () => 5000 }); + const warmer = createCacheWarmer(deps); + + warmer.onTurnSettled("conv-1", {}); + warmer.onWarmCompleted({ + conversationId: "conv-1", + usage: WARM_RESULT, + }); + + const state = warmer.getState("conv-1"); + expect(state.lastPct).toBe(80); + }); + + it("a completed warm stores both lastPct (rate) and lastExpectedPct (retention)", () => { + const deps = makeDeps({ now: () => 5000 }); + const warmer = createCacheWarmer(deps); + + warmer.onTurnSettled("conv-1", {}); + warmer.onWarmCompleted({ + conversationId: "conv-1", + usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 700, cacheWriteTokens: 300 }, + }); + + const state = warmer.getState("conv-1"); + expect(state.lastPct).toBe(70); + expect(state.lastExpectedPct).toBe(70); + }); + + it("re-arms timer after warmCompleted", () => { + let nowMs = 1000; + const deps = makeDeps({ now: () => nowMs }); + const warmer = createCacheWarmer(deps); + + warmer.onTurnSettled("conv-1", {}); + const firstNextWarmAt = warmer.getState("conv-1").nextWarmAt; + + nowMs = 5000; + warmer.onWarmCompleted({ + conversationId: "conv-1", + usage: WARM_RESULT, + }); + + const after = warmer.getState("conv-1"); + expect(after.nextWarmAt).not.toBeNull(); + expect(after.nextWarmAt).not.toBe(firstNextWarmAt); + expect(after.nextWarmAt).toBe(5000 + 240_000); + }); + + it("the per-conversation spec includes a cache-retention stat", () => { + const deps = makeDeps({ now: () => 5000 }); + const warmer = createCacheWarmer(deps); + + warmer.onTurnSettled("conv-1", {}); + warmer.onWarmCompleted({ + conversationId: "conv-1", + usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 900, cacheWriteTokens: 100 }, + }); const state = warmer.getState("conv-1"); expect(state.lastExpectedPct).toBe(90); diff --git a/packages/cache-warming/src/warmer.ts b/packages/cache-warming/src/warmer.ts index f50f346..d77bfe0 100644 --- a/packages/cache-warming/src/warmer.ts +++ b/packages/cache-warming/src/warmer.ts @@ -1,5 +1,5 @@ import type { Logger, StorageNamespace } from "@dispatch/kernel"; -import type { WarmService } from "@dispatch/session-orchestrator"; +import type { WarmCompletedPayload, WarmService } from "@dispatch/session-orchestrator"; import { type ConversationContext, type ConversationSettings, @@ -7,7 +7,6 @@ import { computeCachePct, computeExpectedCacheRate, DEFAULT_INTERVAL_MS, - isTokenCurrent, MIN_INTERVAL_MS, parseSettings, serializeSettings, @@ -31,6 +30,9 @@ export interface CacheWarmer { /** Handle a turnSettled event — mark idle, store context, arm timer if enabled. */ readonly onTurnSettled: (conversationId: string, ctx: ConversationContext) => void; + /** Handle a warmCompleted event — process warm result, update surface, re-arm timer. */ + readonly onWarmCompleted: (payload: WarmCompletedPayload) => void; + /** Get the current state for a conversation (for surface rendering). */ readonly getState: (conversationId: string) => ConversationState; @@ -55,6 +57,8 @@ export interface CacheWarmerDeps { readonly storage: StorageNamespace; readonly logger: Logger; readonly timers: TimerDeps; + /** Injected clock — returns epoch-ms. */ + readonly now: () => number; /** Called when surface subscribers should re-fetch the spec. */ readonly onSurfaceChange: () => void; } @@ -65,6 +69,8 @@ const DEFAULT_STATE: ConversationState = { active: false, lastPct: null, lastExpectedPct: null, + lastWarmAt: null, + nextWarmAt: null, token: 0, }; @@ -96,6 +102,7 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer { deps.timers.clearTimer(existing); timers.delete(conversationId); } + mergeState(conversationId, { nextWarmAt: null }); } function armTimer(conversationId: string): void { @@ -104,7 +111,9 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer { if (!state.enabled || state.active) return; const token = nextToken++; - setState(conversationId, { ...state, token }); + const nowMs = deps.now(); + const nextWarmAt = nowMs + state.intervalMs; + setState(conversationId, { ...state, token, nextWarmAt }); const timerId = deps.timers.setTimer(() => { timers.delete(conversationId); @@ -126,39 +135,13 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer { const ctx = getContext(conversationId); deps.logger.debug("cache-warming: firing warm", { conversationId }); - const result = await deps.warm(conversationId, { + await deps.warm(conversationId, { ...(ctx.cwd !== undefined ? { cwd: ctx.cwd } : {}), ...(ctx.modelName !== undefined ? { modelName: ctx.modelName } : {}), }); - // Re-check token after async warm — result may be stale - const currentState = getState(conversationId); - if (!isTokenCurrent(currentState.token, token)) { - deps.logger.debug("cache-warming: discarding stale warm result", { - conversationId, - }); - return; - } - - if ("error" in result) { - deps.logger.debug("cache-warming: warm returned error (normal)", { - conversationId, - error: result.error, - }); - } else { - const pct = computeCachePct(result.inputTokens, result.cacheReadTokens); - const expectedPct = computeExpectedCacheRate(result.cacheReadTokens, result.cacheWriteTokens); - setState(conversationId, { ...currentState, lastPct: pct, lastExpectedPct: expectedPct }); - deps.onSurfaceChange(); - deps.logger.debug("cache-warming: warm complete", { - conversationId, - pct, - expectedPct, - }); - } - - // Re-arm for next cycle - armTimer(conversationId); + // Result processing is handled by the warmCompleted event handler. + // Timer re-arm is also handled there on success. } async function loadSettings(conversationId: string): Promise<ConversationSettings> { @@ -201,6 +184,41 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer { } }, + onWarmCompleted(payload) { + const { conversationId, usage } = payload; + const state = getState(conversationId); + + // Drop if the conversation became active while the warm was in flight + if (state.active) { + deps.logger.debug("cache-warming: dropping warm result (conversation active)", { + conversationId, + }); + return; + } + + const pct = computeCachePct(usage.inputTokens, usage.cacheReadTokens); + const expectedPct = computeExpectedCacheRate(usage.cacheReadTokens, usage.cacheWriteTokens); + const nowMs = deps.now(); + setState(conversationId, { + ...state, + lastPct: pct, + lastExpectedPct: expectedPct, + lastWarmAt: nowMs, + }); + deps.onSurfaceChange(); + deps.logger.debug("cache-warming: warm complete", { + conversationId, + pct, + expectedPct, + }); + + // Re-arm the automatic timer if enabled and not active + const updated = getState(conversationId); + if (updated.enabled && !updated.active) { + armTimer(conversationId); + } + }, + getState, getContext, diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index 12d387c..4175b0c 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -21,7 +21,11 @@ export const manifest: Manifest = { activation: "eager", contributes: { services: ["session-orchestrator/orchestrator", "session-orchestrator/warm"], - hooks: ["session-orchestrator/turn-started", "session-orchestrator/turn-settled"], + hooks: [ + "session-orchestrator/turn-started", + "session-orchestrator/turn-settled", + "session-orchestrator/warm-completed", + ], }, }; @@ -64,6 +68,7 @@ export function activate(host: HostAPI): void { runTurn, logger: host.logger, now: () => Date.now(), + emit: (hook, payload) => host.emit(hook, payload), }, activeConversations, ); diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts index 37ae5ce..2daf278 100644 --- a/packages/session-orchestrator/src/index.ts +++ b/packages/session-orchestrator/src/index.ts @@ -10,8 +10,11 @@ export { type TurnLifecyclePayload, turnSettled, turnStarted, + type WarmCompletedPayload, type WarmResult, type WarmService, + type WarmServiceDeps, + warmCompleted, } from "./orchestrator.js"; export { buildUserMessage, diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index 5d512ea..ba4912a 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -17,6 +17,7 @@ import { createSessionOrchestrator, createWarmService, type TurnLifecyclePayload, + type WarmCompletedPayload, } from "./orchestrator.js"; import type { ToolAssembly } from "./tools-filter.js"; @@ -1060,6 +1061,7 @@ describe("warm service", () => { resolveTools: () => [toolA], applyToolsFilter: identityApplyToolsFilter, runTurn, + emit: () => {}, }; const { activeConversations } = createSessionOrchestrator(deps); @@ -1115,6 +1117,7 @@ describe("warm service", () => { resolveTools: () => [], applyToolsFilter: identityApplyToolsFilter, runTurn: blockingRunTurn, + emit: () => {}, }; const { orchestrator, activeConversations } = createSessionOrchestrator(deps); @@ -1158,6 +1161,7 @@ describe("warm service", () => { resolveTools: () => [], applyToolsFilter: identityApplyToolsFilter, runTurn, + emit: () => {}, }; const { activeConversations } = createSessionOrchestrator(deps); @@ -1201,6 +1205,7 @@ describe("warm service", () => { resolveTools: () => [], applyToolsFilter: identityApplyToolsFilter, runTurn, + emit: () => {}, }; const { activeConversations } = createSessionOrchestrator(deps); @@ -1215,4 +1220,107 @@ describe("warm service", () => { cacheWriteTokens: 100, }); }); + + it("warm emits warmCompleted with the usage on success", async () => { + const store = createInMemoryStore(); + const existingMsg: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "existing" }], + }; + await store.append("conv-warm-emit", [existingMsg]); + + const provider: ProviderContract = { + id: "p", + stream: async function* () { + yield { + type: "usage", + usage: { inputTokens: 200, outputTokens: 10, cacheReadTokens: 150, cacheWriteTokens: 50 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + }, + }; + + const emitted: Array<{ hook: string; payload: WarmCompletedPayload }> = []; + const fakeEmit = <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload): void => { + emitted.push({ hook: hook.id, payload: payload as WarmCompletedPayload }); + }; + + const deps = { + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + emit: fakeEmit, + }; + + const { activeConversations } = createSessionOrchestrator(deps); + const warmService = createWarmService(deps, activeConversations); + + const result = await warmService.warm("conv-warm-emit"); + + if (!("inputTokens" in result)) throw new Error("expected success"); + + expect(emitted).toHaveLength(1); + expect(emitted[0]?.hook).toBe("session-orchestrator/warm-completed"); + expect(emitted[0]?.payload.conversationId).toBe("conv-warm-emit"); + expect(emitted[0]?.payload.usage).toEqual(result); + }); + + it("warm does NOT emit warmCompleted when it refuses (conversation generating / no history)", async () => { + const store = createInMemoryStore(); + + const provider: ProviderContract = { + id: "p", + stream: async function* () { + yield { type: "text-delta", delta: "slow" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + }, + }; + + const emitted: Array<{ hook: string }> = []; + const fakeEmit = <TPayload>(hook: EventHookDescriptor<TPayload>, _payload: TPayload): void => { + emitted.push({ hook: hook.id }); + }; + + const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => { + await new Promise<void>((resolve) => setTimeout(resolve, 50)); + return { + messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }], + usage: { inputTokens: 1, outputTokens: 1 }, + finishReason: "stop", + }; + }; + + const deps = { + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: blockingRunTurn, + emit: fakeEmit, + }; + + const { orchestrator, activeConversations } = createSessionOrchestrator(deps); + const warmService = createWarmService(deps, activeConversations); + + // Refuse because conversation is generating + const turnPromise = orchestrator.handleMessage({ + conversationId: "conv-refuse-gen", + text: "test", + onEvent: () => {}, + }); + + const genResult = await warmService.warm("conv-refuse-gen"); + expect(genResult).toEqual({ error: "conversation is generating" }); + + await turnPromise; + + // Refuse because no history + const noHistResult = await warmService.warm("conv-refuse-empty"); + expect(noHistResult).toEqual({ error: "no history" }); + + const warmEmits = emitted.filter((e) => e.hook === "session-orchestrator/warm-completed"); + expect(warmEmits).toHaveLength(0); + }); }); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index c39bc06..6df92c8 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -35,6 +35,16 @@ export const turnStarted: EventHookDescriptor<TurnLifecyclePayload> = export const turnSettled: EventHookDescriptor<TurnLifecyclePayload> = defineEventHook<TurnLifecyclePayload>("session-orchestrator/turn-settled"); +/** Payload for the warmCompleted bus event. */ +export interface WarmCompletedPayload { + readonly conversationId: string; + readonly usage: WarmResult; +} + +/** Fired when a warm probe succeeds (both automatic and manual paths). */ +export const warmCompleted: EventHookDescriptor<WarmCompletedPayload> = + defineEventHook<WarmCompletedPayload>("session-orchestrator/warm-completed"); + // --- Warm service --- export interface WarmResult { @@ -89,6 +99,11 @@ export interface SessionOrchestratorDeps { readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void; } +/** Deps for the warm service — emit is REQUIRED so warmCompleted is never silently dropped. */ +export type WarmServiceDeps = SessionOrchestratorDeps & { + readonly emit: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void; +}; + export interface SessionOrchestratorBundle { readonly orchestrator: SessionOrchestrator; /** The shared active-conversations set, for use by createWarmService. */ @@ -187,7 +202,7 @@ export function createSessionOrchestrator( } export function createWarmService( - deps: SessionOrchestratorDeps, + deps: WarmServiceDeps, activeConversations: ReadonlySet<string>, ): WarmService { return { @@ -247,7 +262,9 @@ export function createWarmService( } } - return { inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens }; + const result: WarmResult = { inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens }; + deps.emit(warmCompleted, { conversationId, usage: result }); + return result; }, }; } |
