summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-11 14:48:30 +0900
committerAdam Malczewski <[email protected]>2026-06-11 14:48:30 +0900
commitbfbad3af79cab23f52be0f6388311a5798b7fd04 (patch)
tree7bda9843aca8d7fad589ba0b7b5579e2627a3585
parent58e2ad559cccc8b35c513818e253b04e60af69b8 (diff)
downloaddispatch-bfbad3af79cab23f52be0f6388311a5798b7fd04.tar.gz
dispatch-bfbad3af79cab23f52be0f6388311a5798b7fd04.zip
feat(cache-warming): CR-3 — manual warm resets timer + nextWarmAt/lastWarmAt surface
FE CR-3 (backend-handoff-cache-warming-timer.md). The inversion: session-orchestrator's warm() (the single chokepoint for manual /chat/warm AND the automatic timer) emits a warmCompleted bus event; cache-warming subscribes and does ALL post-warm handling. So a manual warm now re-arms the timer + refreshes the surface with NO transport-http change (core can't depend on the standard cache-warming ext). - session-orchestrator: warmCompleted event hook + emit from warm() on success - cache-warming: warmCompleted subscriber unifies result handling (manual + automatic); adds nextWarmAt/lastWarmAt state + a custom 'cache-warming-timer' surface field - fix: createWarmService was missing the emit dep (deps.emit?. silently no-oped) → wired it + made emit REQUIRED so it can't regress Live-verified vs claude haiku: manual POST /chat/warm now logs cache-warming 'warm complete' ~2s after the turn (not the 4-min timer) → manual warm reaches the warmer. 800 vitest + 109 bun green; tsc -b 0; biome clean.
-rw-r--r--packages/cache-warming/src/extension.ts14
-rw-r--r--packages/cache-warming/src/pure.test.ts31
-rw-r--r--packages/cache-warming/src/pure.ts19
-rw-r--r--packages/cache-warming/src/warmer.test.ts413
-rw-r--r--packages/cache-warming/src/warmer.ts82
-rw-r--r--packages/session-orchestrator/src/extension.ts7
-rw-r--r--packages/session-orchestrator/src/index.ts3
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts108
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts21
9 files changed, 467 insertions, 231 deletions
diff --git a/packages/cache-warming/src/extension.ts b/packages/cache-warming/src/extension.ts
index 802618a..19f5130 100644
--- a/packages/cache-warming/src/extension.ts
+++ b/packages/cache-warming/src/extension.ts
@@ -1,5 +1,10 @@
import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
-import { cacheWarmHandle, turnSettled, turnStarted } from "@dispatch/session-orchestrator";
+import {
+ cacheWarmHandle,
+ turnSettled,
+ turnStarted,
+ warmCompleted,
+} from "@dispatch/session-orchestrator";
import type { SurfaceContext, SurfaceProvider } from "@dispatch/surface-registry";
import { surfaceRegistryHandle } from "@dispatch/surface-registry";
import type { SurfaceSpec } from "@dispatch/ui-contract";
@@ -53,6 +58,7 @@ export function activate(host: HostAPI): void {
}
},
},
+ now: () => Date.now(),
onSurfaceChange: () => {
for (const notify of subscribers) {
notify();
@@ -71,6 +77,10 @@ export function activate(host: HostAPI): void {
});
});
+ host.on(warmCompleted, (payload) => {
+ warmer.onWarmCompleted(payload);
+ });
+
function getSpec(context?: SurfaceContext): SurfaceSpec {
const convId = context?.conversationId;
if (convId === undefined) {
@@ -82,6 +92,8 @@ export function activate(host: HostAPI): void {
state.intervalMs,
state.lastPct,
state.lastExpectedPct,
+ state.nextWarmAt,
+ state.lastWarmAt,
);
}
diff --git a/packages/cache-warming/src/pure.test.ts b/packages/cache-warming/src/pure.test.ts
index f5e2f1d..a503798 100644
--- a/packages/cache-warming/src/pure.test.ts
+++ b/packages/cache-warming/src/pure.test.ts
@@ -52,6 +52,8 @@ describe("shouldWarm", () => {
active: false,
lastPct: null,
lastExpectedPct: null,
+ lastWarmAt: null,
+ nextWarmAt: null,
token: 5,
};
expect(shouldWarm(state, 5)).toBe(true);
@@ -64,6 +66,8 @@ describe("shouldWarm", () => {
active: false,
lastPct: null,
lastExpectedPct: null,
+ lastWarmAt: null,
+ nextWarmAt: null,
token: 5,
};
expect(shouldWarm(state, 5)).toBe(false);
@@ -76,6 +80,8 @@ describe("shouldWarm", () => {
active: true,
lastPct: null,
lastExpectedPct: null,
+ lastWarmAt: null,
+ nextWarmAt: null,
token: 5,
};
expect(shouldWarm(state, 5)).toBe(false);
@@ -88,6 +94,8 @@ describe("shouldWarm", () => {
active: false,
lastPct: null,
lastExpectedPct: null,
+ lastWarmAt: null,
+ nextWarmAt: null,
token: 5,
};
expect(shouldWarm(state, 6)).toBe(false);
@@ -181,12 +189,12 @@ describe("parseIntervalPayload", () => {
});
describe("buildConversationSpec", () => {
- it("builds a per-conversation spec with toggle + number(interval) + last-% + retention fields", () => {
- const spec = buildConversationSpec(true, 240_000, 80, 95);
+ it("builds a per-conversation spec with toggle + number(interval) + last-% + retention + timer fields", () => {
+ const spec = buildConversationSpec(true, 240_000, 80, 95, 1000, 500);
expect(spec.id).toBe("cache-warming");
expect(spec.region).toBe("side");
expect(spec.title).toBe("Cache Warming");
- expect(spec.fields).toHaveLength(4);
+ expect(spec.fields).toHaveLength(5);
const toggle = spec.fields[0];
expect(toggle).toEqual({
@@ -220,10 +228,17 @@ describe("buildConversationSpec", () => {
label: "Cache retention",
value: "95%",
});
+
+ const timer = spec.fields[4];
+ expect(timer).toEqual({
+ kind: "custom",
+ rendererId: "cache-warming-timer",
+ payload: { nextWarmAt: 1000, lastWarmAt: 500 },
+ });
});
it("shows — when lastPct and lastExpectedPct are null", () => {
- const spec = buildConversationSpec(true, 240_000, null, null);
+ const spec = buildConversationSpec(true, 240_000, null, null, null, null);
const stat = spec.fields[2];
expect(stat).toEqual({
kind: "stat",
@@ -236,10 +251,16 @@ describe("buildConversationSpec", () => {
label: "Cache retention",
value: "—",
});
+ const timer = spec.fields[4];
+ expect(timer).toEqual({
+ kind: "custom",
+ rendererId: "cache-warming-timer",
+ payload: { nextWarmAt: null, lastWarmAt: null },
+ });
});
it("reflects disabled state", () => {
- const spec = buildConversationSpec(false, 120_000, 50, 75);
+ const spec = buildConversationSpec(false, 120_000, 50, 75, null, null);
const toggle = spec.fields[0];
expect(toggle).toEqual({
kind: "toggle",
diff --git a/packages/cache-warming/src/pure.ts b/packages/cache-warming/src/pure.ts
index ab6fc79..c4cbe8a 100644
--- a/packages/cache-warming/src/pure.ts
+++ b/packages/cache-warming/src/pure.ts
@@ -3,7 +3,13 @@
* Every function is input → output; testable without mocks.
*/
-import type { NumberField, StatField, SurfaceSpec, ToggleField } from "@dispatch/ui-contract";
+import type {
+ CustomField,
+ NumberField,
+ StatField,
+ SurfaceSpec,
+ ToggleField,
+} from "@dispatch/ui-contract";
// --- Types ---
@@ -18,6 +24,8 @@ export interface ConversationState extends ConversationSettings {
readonly active: boolean;
readonly lastPct: number | null;
readonly lastExpectedPct: number | null;
+ readonly lastWarmAt: number | null;
+ readonly nextWarmAt: number | null;
readonly token: number;
}
@@ -137,6 +145,8 @@ export function buildConversationSpec(
intervalMs: number,
lastPct: number | null,
lastExpectedPct: number | null,
+ nextWarmAt: number | null,
+ lastWarmAt: number | null,
): SurfaceSpec {
const pctDisplay = lastPct === null ? "—" : `${lastPct}%`;
const retentionDisplay = lastExpectedPct === null ? "—" : `${lastExpectedPct}%`;
@@ -165,11 +175,16 @@ export function buildConversationSpec(
label: "Cache retention",
value: retentionDisplay,
};
+ const timer: CustomField = {
+ kind: "custom",
+ rendererId: "cache-warming-timer",
+ payload: { nextWarmAt, lastWarmAt },
+ };
return {
id: "cache-warming",
region: "side",
title: "Cache Warming",
- fields: [toggle, interval, stat, retentionStat],
+ fields: [toggle, interval, stat, retentionStat, timer],
};
}
diff --git a/packages/cache-warming/src/warmer.test.ts b/packages/cache-warming/src/warmer.test.ts
index 86908a2..a389ccb 100644
--- a/packages/cache-warming/src/warmer.test.ts
+++ b/packages/cache-warming/src/warmer.test.ts
@@ -1,4 +1,4 @@
-import type { Logger, Span } from "@dispatch/kernel";
+import type { Logger, Span, StorageNamespace } from "@dispatch/kernel";
import type { WarmResult } from "@dispatch/session-orchestrator";
import { describe, expect, it } from "vitest";
import { MIN_INTERVAL_MS } from "./pure.js";
@@ -70,179 +70,86 @@ const WARM_RESULT: WarmResult = {
cacheWriteTokens: 0,
};
-import type { StorageNamespace } from "@dispatch/kernel";
+function makeDeps(
+ overrides: Partial<{
+ warm: (conversationId: string) => Promise<WarmResult>;
+ onSurfaceChange: () => void;
+ now: () => number;
+ }> = {},
+) {
+ const timers = fakeTimers();
+ let nowMs = 1000;
+ return {
+ timers,
+ warm: overrides.warm ?? (async () => WARM_RESULT),
+ storage: memStorage(),
+ logger: makeLogger(),
+ now: overrides.now ?? (() => nowMs),
+ onSurfaceChange: overrides.onSurfaceChange ?? (() => {}),
+ setNow(ms: number) {
+ nowMs = ms;
+ },
+ };
+}
describe("CacheWarmer", () => {
it("arms a timer on turnSettled and warms when it fires (enabled)", async () => {
- const timers = fakeTimers();
+ const deps = makeDeps();
const warmCalls: string[] = [];
- const warmer = createCacheWarmer({
- warm: async (convId) => {
- warmCalls.push(convId);
- return WARM_RESULT;
- },
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
+ deps.warm = async (convId) => {
+ warmCalls.push(convId);
+ return WARM_RESULT;
+ };
+ const warmer = createCacheWarmer(deps);
warmer.onTurnSettled("conv-1", {});
- timers.flush();
+ deps.timers.flush();
await new Promise((r) => setTimeout(r, 10));
expect(warmCalls).toContain("conv-1");
});
it("cancels the timer on turnStarted (no warm while generating)", () => {
- const timers = fakeTimers();
+ const deps = makeDeps();
const warmCalls: string[] = [];
- const warmer = createCacheWarmer({
- warm: async (convId) => {
- warmCalls.push(convId);
- return WARM_RESULT;
- },
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
+ deps.warm = async (convId) => {
+ warmCalls.push(convId);
+ return WARM_RESULT;
+ };
+ const warmer = createCacheWarmer(deps);
warmer.onTurnSettled("conv-1", {});
warmer.onTurnStarted("conv-1");
- timers.flush();
+ deps.timers.flush();
expect(warmCalls).toHaveLength(0);
});
- it("in-flight warm result is dropped when superseded (token mismatch)", async () => {
- const timers = fakeTimers();
- let resolveWarm: (v: WarmResult) => void = () => {};
- const warmPromise = new Promise<WarmResult>((r) => {
- resolveWarm = r;
- });
- const warmer = createCacheWarmer({
- warm: () => warmPromise,
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
-
- warmer.onTurnSettled("conv-1", {});
- timers.flush();
-
- warmer.onTurnStarted("conv-1");
- warmer.onTurnSettled("conv-1", {});
-
- resolveWarm?.(WARM_RESULT);
- await new Promise((r) => setTimeout(r, 10));
-
- const state = warmer.getState("conv-1");
- expect(state.lastPct).toBeNull();
- });
-
it("disabled conversation does not warm", async () => {
- const timers = fakeTimers();
+ const deps = makeDeps();
const warmCalls: string[] = [];
- const warmer = createCacheWarmer({
- warm: async (convId) => {
- warmCalls.push(convId);
- return WARM_RESULT;
- },
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
+ deps.warm = async (convId) => {
+ warmCalls.push(convId);
+ return WARM_RESULT;
+ };
+ const warmer = createCacheWarmer(deps);
await warmer.setEnabled("conv-1", false);
warmer.onTurnSettled("conv-1", {});
- timers.flush();
+ deps.timers.flush();
await new Promise((r) => setTimeout(r, 10));
expect(warmCalls).toHaveLength(0);
});
- it("stores lastPct from the warm result", async () => {
- const timers = fakeTimers();
- const warmer = createCacheWarmer({
- warm: async () => WARM_RESULT,
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
-
- warmer.onTurnSettled("conv-1", {});
- timers.flush();
-
- await new Promise((r) => setTimeout(r, 10));
- const state = warmer.getState("conv-1");
- expect(state.lastPct).toBe(80);
- });
-
- it("a completed warm stores both lastPct (rate) and lastExpectedPct (retention)", async () => {
- const timers = fakeTimers();
- const warmer = createCacheWarmer({
- warm: async () => ({
- inputTokens: 1000,
- outputTokens: 10,
- cacheReadTokens: 700,
- cacheWriteTokens: 300,
- }),
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
-
- warmer.onTurnSettled("conv-1", {});
- timers.flush();
-
- await new Promise((r) => setTimeout(r, 10));
- const state = warmer.getState("conv-1");
- expect(state.lastPct).toBe(70);
- expect(state.lastExpectedPct).toBe(70);
- });
-
- it("re-arms timer after warm completes", async () => {
- const timers = fakeTimers();
- let warmCount = 0;
- const warmer = createCacheWarmer({
- warm: async () => {
- warmCount++;
- return WARM_RESULT;
- },
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
-
- warmer.onTurnSettled("conv-1", {});
- timers.flush();
- await new Promise((r) => setTimeout(r, 10));
-
- timers.flush();
- await new Promise((r) => setTimeout(r, 10));
-
- expect(warmCount).toBe(2);
- });
-
it("setIntervalMs converts seconds→ms, floors at MIN_INTERVAL_MS, and re-arms", async () => {
- const timers = fakeTimers();
+ const deps = makeDeps();
const warmCalls: string[] = [];
- const warmer = createCacheWarmer({
- warm: async (convId) => {
- warmCalls.push(convId);
- return WARM_RESULT;
- },
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
+ deps.warm = async (convId) => {
+ warmCalls.push(convId);
+ return WARM_RESULT;
+ };
+ const warmer = createCacheWarmer(deps);
// Enable and settle to arm the timer
warmer.onTurnSettled("conv-1", {});
@@ -255,20 +162,14 @@ describe("CacheWarmer", () => {
expect(state.intervalMs).toBe(30_000);
// Timer should still be armed — flush fires it
- timers.flush();
+ deps.timers.flush();
await new Promise((r) => setTimeout(r, 10));
expect(warmCalls).toContain("conv-1");
});
it("setIntervalMs clamps values below MIN_INTERVAL_MS", async () => {
- const timers = fakeTimers();
- const warmer = createCacheWarmer({
- warm: async () => WARM_RESULT,
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
+ const deps = makeDeps();
+ const warmer = createCacheWarmer(deps);
warmer.onTurnSettled("conv-1", {});
@@ -278,14 +179,8 @@ describe("CacheWarmer", () => {
});
it("setIntervalMs ignores NaN / non-positive (clamps to MIN_INTERVAL_MS)", async () => {
- const timers = fakeTimers();
- const warmer = createCacheWarmer({
- warm: async () => WARM_RESULT,
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
+ const deps = makeDeps();
+ const warmer = createCacheWarmer(deps);
warmer.onTurnSettled("conv-1", {});
@@ -300,14 +195,8 @@ describe("CacheWarmer", () => {
});
it("setEnabled flips enabled for a conversation", async () => {
- const timers = fakeTimers();
- const warmer = createCacheWarmer({
- warm: async () => WARM_RESULT,
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
- });
+ const deps = makeDeps();
+ const warmer = createCacheWarmer(deps);
// Default is enabled
expect(warmer.getState("conv-1").enabled).toBe(true);
@@ -322,17 +211,13 @@ describe("CacheWarmer", () => {
});
it("onSurfaceChange is called when settings change", async () => {
- const timers = fakeTimers();
let changeCount = 0;
- const warmer = createCacheWarmer({
- warm: async () => WARM_RESULT,
- storage: memStorage(),
- logger: makeLogger(),
- timers,
+ const deps = makeDeps({
onSurfaceChange: () => {
changeCount++;
},
});
+ const warmer = createCacheWarmer(deps);
await warmer.setEnabled("conv-1", false);
expect(changeCount).toBe(1);
@@ -341,24 +226,176 @@ describe("CacheWarmer", () => {
expect(changeCount).toBe(2);
});
- it("the per-conversation spec includes a cache-retention stat", async () => {
- const timers = fakeTimers();
- const warmer = createCacheWarmer({
- warm: async () => ({
- inputTokens: 1000,
- outputTokens: 10,
- cacheReadTokens: 900,
- cacheWriteTokens: 100,
- }),
- storage: memStorage(),
- logger: makeLogger(),
- timers,
- onSurfaceChange: () => {},
+ it("warmCompleted updates lastPct/lastExpectedPct/lastWarmAt and re-arms (nextWarmAt set), pushes onSurfaceChange", () => {
+ let changeCount = 0;
+ let nowMs = 5000;
+ const deps = makeDeps({
+ onSurfaceChange: () => {
+ changeCount++;
+ },
+ now: () => nowMs,
});
+ const warmer = createCacheWarmer(deps);
warmer.onTurnSettled("conv-1", {});
- timers.flush();
- await new Promise((r) => setTimeout(r, 10));
+ const stateBefore = warmer.getState("conv-1");
+ expect(stateBefore.lastPct).toBeNull();
+ expect(stateBefore.lastExpectedPct).toBeNull();
+ expect(stateBefore.lastWarmAt).toBeNull();
+
+ nowMs = 6000;
+ warmer.onWarmCompleted({
+ conversationId: "conv-1",
+ usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 700, cacheWriteTokens: 300 },
+ });
+
+ const state = warmer.getState("conv-1");
+ expect(state.lastPct).toBe(70);
+ expect(state.lastExpectedPct).toBe(70);
+ expect(state.lastWarmAt).toBe(6000);
+ expect(state.nextWarmAt).not.toBeNull();
+ expect(changeCount).toBe(1);
+ });
+
+ it("a warm that completes while the conversation is active is dropped (no update, no re-arm)", () => {
+ let changeCount = 0;
+ const deps = makeDeps({
+ onSurfaceChange: () => {
+ changeCount++;
+ },
+ now: () => 5000,
+ });
+ const warmer = createCacheWarmer(deps);
+
+ warmer.onTurnSettled("conv-1", {});
+ warmer.onTurnStarted("conv-1");
+
+ warmer.onWarmCompleted({
+ conversationId: "conv-1",
+ usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 800, cacheWriteTokens: 0 },
+ });
+
+ const state = warmer.getState("conv-1");
+ expect(state.lastPct).toBeNull();
+ expect(state.lastWarmAt).toBeNull();
+ expect(state.nextWarmAt).toBeNull();
+ expect(changeCount).toBe(0);
+ });
+
+ it("nextWarmAt is set when armed and null when disabled or active", async () => {
+ let nowMs = 1000;
+ const deps = makeDeps({ now: () => nowMs });
+ const warmer = createCacheWarmer(deps);
+
+ // Before any event — not armed
+ expect(warmer.getState("conv-1").nextWarmAt).toBeNull();
+
+ // After turnSettled — armed with nextWarmAt
+ nowMs = 2000;
+ warmer.onTurnSettled("conv-1", {});
+ const stateArmed = warmer.getState("conv-1");
+ expect(stateArmed.nextWarmAt).toBe(2000 + 240_000);
+
+ // After turnStarted — cancelled (null)
+ warmer.onTurnStarted("conv-1");
+ expect(warmer.getState("conv-1").nextWarmAt).toBeNull();
+
+ // After disabling — null
+ warmer.onTurnSettled("conv-2", {});
+ await warmer.setEnabled("conv-2", false);
+ expect(warmer.getState("conv-2").nextWarmAt).toBeNull();
+ });
+
+ it("a manual warm (warmCompleted for a conversation) resets the timer + refreshes the surface", () => {
+ let changeCount = 0;
+ let nowMs = 5000;
+ const deps = makeDeps({
+ onSurfaceChange: () => {
+ changeCount++;
+ },
+ now: () => nowMs,
+ });
+ const warmer = createCacheWarmer(deps);
+
+ // Settle to arm the timer
+ warmer.onTurnSettled("conv-1", {});
+ const armed = warmer.getState("conv-1");
+ expect(armed.nextWarmAt).toBe(5000 + 240_000);
+
+ // Simulate a manual warm completing at t=8000
+ nowMs = 8000;
+ warmer.onWarmCompleted({
+ conversationId: "conv-1",
+ usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 900, cacheWriteTokens: 100 },
+ });
+
+ const after = warmer.getState("conv-1");
+ expect(after.lastPct).toBe(90);
+ expect(after.lastExpectedPct).toBe(90);
+ expect(after.lastWarmAt).toBe(8000);
+ // Timer should be re-armed with new nextWarmAt
+ expect(after.nextWarmAt).toBe(8000 + 240_000);
+ expect(changeCount).toBe(1);
+ });
+
+ it("stores lastPct from the warmCompleted event", () => {
+ const deps = makeDeps({ now: () => 5000 });
+ const warmer = createCacheWarmer(deps);
+
+ warmer.onTurnSettled("conv-1", {});
+ warmer.onWarmCompleted({
+ conversationId: "conv-1",
+ usage: WARM_RESULT,
+ });
+
+ const state = warmer.getState("conv-1");
+ expect(state.lastPct).toBe(80);
+ });
+
+ it("a completed warm stores both lastPct (rate) and lastExpectedPct (retention)", () => {
+ const deps = makeDeps({ now: () => 5000 });
+ const warmer = createCacheWarmer(deps);
+
+ warmer.onTurnSettled("conv-1", {});
+ warmer.onWarmCompleted({
+ conversationId: "conv-1",
+ usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 700, cacheWriteTokens: 300 },
+ });
+
+ const state = warmer.getState("conv-1");
+ expect(state.lastPct).toBe(70);
+ expect(state.lastExpectedPct).toBe(70);
+ });
+
+ it("re-arms timer after warmCompleted", () => {
+ let nowMs = 1000;
+ const deps = makeDeps({ now: () => nowMs });
+ const warmer = createCacheWarmer(deps);
+
+ warmer.onTurnSettled("conv-1", {});
+ const firstNextWarmAt = warmer.getState("conv-1").nextWarmAt;
+
+ nowMs = 5000;
+ warmer.onWarmCompleted({
+ conversationId: "conv-1",
+ usage: WARM_RESULT,
+ });
+
+ const after = warmer.getState("conv-1");
+ expect(after.nextWarmAt).not.toBeNull();
+ expect(after.nextWarmAt).not.toBe(firstNextWarmAt);
+ expect(after.nextWarmAt).toBe(5000 + 240_000);
+ });
+
+ it("the per-conversation spec includes a cache-retention stat", () => {
+ const deps = makeDeps({ now: () => 5000 });
+ const warmer = createCacheWarmer(deps);
+
+ warmer.onTurnSettled("conv-1", {});
+ warmer.onWarmCompleted({
+ conversationId: "conv-1",
+ usage: { inputTokens: 1000, outputTokens: 10, cacheReadTokens: 900, cacheWriteTokens: 100 },
+ });
const state = warmer.getState("conv-1");
expect(state.lastExpectedPct).toBe(90);
diff --git a/packages/cache-warming/src/warmer.ts b/packages/cache-warming/src/warmer.ts
index f50f346..d77bfe0 100644
--- a/packages/cache-warming/src/warmer.ts
+++ b/packages/cache-warming/src/warmer.ts
@@ -1,5 +1,5 @@
import type { Logger, StorageNamespace } from "@dispatch/kernel";
-import type { WarmService } from "@dispatch/session-orchestrator";
+import type { WarmCompletedPayload, WarmService } from "@dispatch/session-orchestrator";
import {
type ConversationContext,
type ConversationSettings,
@@ -7,7 +7,6 @@ import {
computeCachePct,
computeExpectedCacheRate,
DEFAULT_INTERVAL_MS,
- isTokenCurrent,
MIN_INTERVAL_MS,
parseSettings,
serializeSettings,
@@ -31,6 +30,9 @@ export interface CacheWarmer {
/** Handle a turnSettled event — mark idle, store context, arm timer if enabled. */
readonly onTurnSettled: (conversationId: string, ctx: ConversationContext) => void;
+ /** Handle a warmCompleted event — process warm result, update surface, re-arm timer. */
+ readonly onWarmCompleted: (payload: WarmCompletedPayload) => void;
+
/** Get the current state for a conversation (for surface rendering). */
readonly getState: (conversationId: string) => ConversationState;
@@ -55,6 +57,8 @@ export interface CacheWarmerDeps {
readonly storage: StorageNamespace;
readonly logger: Logger;
readonly timers: TimerDeps;
+ /** Injected clock — returns epoch-ms. */
+ readonly now: () => number;
/** Called when surface subscribers should re-fetch the spec. */
readonly onSurfaceChange: () => void;
}
@@ -65,6 +69,8 @@ const DEFAULT_STATE: ConversationState = {
active: false,
lastPct: null,
lastExpectedPct: null,
+ lastWarmAt: null,
+ nextWarmAt: null,
token: 0,
};
@@ -96,6 +102,7 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer {
deps.timers.clearTimer(existing);
timers.delete(conversationId);
}
+ mergeState(conversationId, { nextWarmAt: null });
}
function armTimer(conversationId: string): void {
@@ -104,7 +111,9 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer {
if (!state.enabled || state.active) return;
const token = nextToken++;
- setState(conversationId, { ...state, token });
+ const nowMs = deps.now();
+ const nextWarmAt = nowMs + state.intervalMs;
+ setState(conversationId, { ...state, token, nextWarmAt });
const timerId = deps.timers.setTimer(() => {
timers.delete(conversationId);
@@ -126,39 +135,13 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer {
const ctx = getContext(conversationId);
deps.logger.debug("cache-warming: firing warm", { conversationId });
- const result = await deps.warm(conversationId, {
+ await deps.warm(conversationId, {
...(ctx.cwd !== undefined ? { cwd: ctx.cwd } : {}),
...(ctx.modelName !== undefined ? { modelName: ctx.modelName } : {}),
});
- // Re-check token after async warm — result may be stale
- const currentState = getState(conversationId);
- if (!isTokenCurrent(currentState.token, token)) {
- deps.logger.debug("cache-warming: discarding stale warm result", {
- conversationId,
- });
- return;
- }
-
- if ("error" in result) {
- deps.logger.debug("cache-warming: warm returned error (normal)", {
- conversationId,
- error: result.error,
- });
- } else {
- const pct = computeCachePct(result.inputTokens, result.cacheReadTokens);
- const expectedPct = computeExpectedCacheRate(result.cacheReadTokens, result.cacheWriteTokens);
- setState(conversationId, { ...currentState, lastPct: pct, lastExpectedPct: expectedPct });
- deps.onSurfaceChange();
- deps.logger.debug("cache-warming: warm complete", {
- conversationId,
- pct,
- expectedPct,
- });
- }
-
- // Re-arm for next cycle
- armTimer(conversationId);
+ // Result processing is handled by the warmCompleted event handler.
+ // Timer re-arm is also handled there on success.
}
async function loadSettings(conversationId: string): Promise<ConversationSettings> {
@@ -201,6 +184,41 @@ export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer {
}
},
+ onWarmCompleted(payload) {
+ const { conversationId, usage } = payload;
+ const state = getState(conversationId);
+
+ // Drop if the conversation became active while the warm was in flight
+ if (state.active) {
+ deps.logger.debug("cache-warming: dropping warm result (conversation active)", {
+ conversationId,
+ });
+ return;
+ }
+
+ const pct = computeCachePct(usage.inputTokens, usage.cacheReadTokens);
+ const expectedPct = computeExpectedCacheRate(usage.cacheReadTokens, usage.cacheWriteTokens);
+ const nowMs = deps.now();
+ setState(conversationId, {
+ ...state,
+ lastPct: pct,
+ lastExpectedPct: expectedPct,
+ lastWarmAt: nowMs,
+ });
+ deps.onSurfaceChange();
+ deps.logger.debug("cache-warming: warm complete", {
+ conversationId,
+ pct,
+ expectedPct,
+ });
+
+ // Re-arm the automatic timer if enabled and not active
+ const updated = getState(conversationId);
+ if (updated.enabled && !updated.active) {
+ armTimer(conversationId);
+ }
+ },
+
getState,
getContext,
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index 12d387c..4175b0c 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -21,7 +21,11 @@ export const manifest: Manifest = {
activation: "eager",
contributes: {
services: ["session-orchestrator/orchestrator", "session-orchestrator/warm"],
- hooks: ["session-orchestrator/turn-started", "session-orchestrator/turn-settled"],
+ hooks: [
+ "session-orchestrator/turn-started",
+ "session-orchestrator/turn-settled",
+ "session-orchestrator/warm-completed",
+ ],
},
};
@@ -64,6 +68,7 @@ export function activate(host: HostAPI): void {
runTurn,
logger: host.logger,
now: () => Date.now(),
+ emit: (hook, payload) => host.emit(hook, payload),
},
activeConversations,
);
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index 37ae5ce..2daf278 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -10,8 +10,11 @@ export {
type TurnLifecyclePayload,
turnSettled,
turnStarted,
+ type WarmCompletedPayload,
type WarmResult,
type WarmService,
+ type WarmServiceDeps,
+ warmCompleted,
} from "./orchestrator.js";
export {
buildUserMessage,
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index 5d512ea..ba4912a 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -17,6 +17,7 @@ import {
createSessionOrchestrator,
createWarmService,
type TurnLifecyclePayload,
+ type WarmCompletedPayload,
} from "./orchestrator.js";
import type { ToolAssembly } from "./tools-filter.js";
@@ -1060,6 +1061,7 @@ describe("warm service", () => {
resolveTools: () => [toolA],
applyToolsFilter: identityApplyToolsFilter,
runTurn,
+ emit: () => {},
};
const { activeConversations } = createSessionOrchestrator(deps);
@@ -1115,6 +1117,7 @@ describe("warm service", () => {
resolveTools: () => [],
applyToolsFilter: identityApplyToolsFilter,
runTurn: blockingRunTurn,
+ emit: () => {},
};
const { orchestrator, activeConversations } = createSessionOrchestrator(deps);
@@ -1158,6 +1161,7 @@ describe("warm service", () => {
resolveTools: () => [],
applyToolsFilter: identityApplyToolsFilter,
runTurn,
+ emit: () => {},
};
const { activeConversations } = createSessionOrchestrator(deps);
@@ -1201,6 +1205,7 @@ describe("warm service", () => {
resolveTools: () => [],
applyToolsFilter: identityApplyToolsFilter,
runTurn,
+ emit: () => {},
};
const { activeConversations } = createSessionOrchestrator(deps);
@@ -1215,4 +1220,107 @@ describe("warm service", () => {
cacheWriteTokens: 100,
});
});
+
+ it("warm emits warmCompleted with the usage on success", async () => {
+ const store = createInMemoryStore();
+ const existingMsg: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "existing" }],
+ };
+ await store.append("conv-warm-emit", [existingMsg]);
+
+ const provider: ProviderContract = {
+ id: "p",
+ stream: async function* () {
+ yield {
+ type: "usage",
+ usage: { inputTokens: 200, outputTokens: 10, cacheReadTokens: 150, cacheWriteTokens: 50 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ },
+ };
+
+ const emitted: Array<{ hook: string; payload: WarmCompletedPayload }> = [];
+ const fakeEmit = <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload): void => {
+ emitted.push({ hook: hook.id, payload: payload as WarmCompletedPayload });
+ };
+
+ const deps = {
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ emit: fakeEmit,
+ };
+
+ const { activeConversations } = createSessionOrchestrator(deps);
+ const warmService = createWarmService(deps, activeConversations);
+
+ const result = await warmService.warm("conv-warm-emit");
+
+ if (!("inputTokens" in result)) throw new Error("expected success");
+
+ expect(emitted).toHaveLength(1);
+ expect(emitted[0]?.hook).toBe("session-orchestrator/warm-completed");
+ expect(emitted[0]?.payload.conversationId).toBe("conv-warm-emit");
+ expect(emitted[0]?.payload.usage).toEqual(result);
+ });
+
+ it("warm does NOT emit warmCompleted when it refuses (conversation generating / no history)", async () => {
+ const store = createInMemoryStore();
+
+ const provider: ProviderContract = {
+ id: "p",
+ stream: async function* () {
+ yield { type: "text-delta", delta: "slow" } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ },
+ };
+
+ const emitted: Array<{ hook: string }> = [];
+ const fakeEmit = <TPayload>(hook: EventHookDescriptor<TPayload>, _payload: TPayload): void => {
+ emitted.push({ hook: hook.id });
+ };
+
+ const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => {
+ await new Promise<void>((resolve) => setTimeout(resolve, 50));
+ return {
+ messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }],
+ usage: { inputTokens: 1, outputTokens: 1 },
+ finishReason: "stop",
+ };
+ };
+
+ const deps = {
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: blockingRunTurn,
+ emit: fakeEmit,
+ };
+
+ const { orchestrator, activeConversations } = createSessionOrchestrator(deps);
+ const warmService = createWarmService(deps, activeConversations);
+
+ // Refuse because conversation is generating
+ const turnPromise = orchestrator.handleMessage({
+ conversationId: "conv-refuse-gen",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const genResult = await warmService.warm("conv-refuse-gen");
+ expect(genResult).toEqual({ error: "conversation is generating" });
+
+ await turnPromise;
+
+ // Refuse because no history
+ const noHistResult = await warmService.warm("conv-refuse-empty");
+ expect(noHistResult).toEqual({ error: "no history" });
+
+ const warmEmits = emitted.filter((e) => e.hook === "session-orchestrator/warm-completed");
+ expect(warmEmits).toHaveLength(0);
+ });
});
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index c39bc06..6df92c8 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -35,6 +35,16 @@ export const turnStarted: EventHookDescriptor<TurnLifecyclePayload> =
export const turnSettled: EventHookDescriptor<TurnLifecyclePayload> =
defineEventHook<TurnLifecyclePayload>("session-orchestrator/turn-settled");
+/** Payload for the warmCompleted bus event. */
+export interface WarmCompletedPayload {
+ readonly conversationId: string;
+ readonly usage: WarmResult;
+}
+
+/** Fired when a warm probe succeeds (both automatic and manual paths). */
+export const warmCompleted: EventHookDescriptor<WarmCompletedPayload> =
+ defineEventHook<WarmCompletedPayload>("session-orchestrator/warm-completed");
+
// --- Warm service ---
export interface WarmResult {
@@ -89,6 +99,11 @@ export interface SessionOrchestratorDeps {
readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void;
}
+/** Deps for the warm service — emit is REQUIRED so warmCompleted is never silently dropped. */
+export type WarmServiceDeps = SessionOrchestratorDeps & {
+ readonly emit: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void;
+};
+
export interface SessionOrchestratorBundle {
readonly orchestrator: SessionOrchestrator;
/** The shared active-conversations set, for use by createWarmService. */
@@ -187,7 +202,7 @@ export function createSessionOrchestrator(
}
export function createWarmService(
- deps: SessionOrchestratorDeps,
+ deps: WarmServiceDeps,
activeConversations: ReadonlySet<string>,
): WarmService {
return {
@@ -247,7 +262,9 @@ export function createWarmService(
}
}
- return { inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens };
+ const result: WarmResult = { inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens };
+ deps.emit(warmCompleted, { conversationId, usage: result });
+ return result;
},
};
}