summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--bun.lock13
-rw-r--r--packages/cache-warming/package.json14
-rw-r--r--packages/cache-warming/src/extension.ts130
-rw-r--r--packages/cache-warming/src/index.ts19
-rw-r--r--packages/cache-warming/src/pure.test.ts109
-rw-r--r--packages/cache-warming/src/pure.ts95
-rw-r--r--packages/cache-warming/src/warmer.test.ts207
-rw-r--r--packages/cache-warming/src/warmer.ts248
-rw-r--r--packages/cache-warming/tsconfig.json11
-rw-r--r--packages/host-bin/package.json1
-rw-r--r--packages/host-bin/src/main.ts2
-rw-r--r--packages/host-bin/tsconfig.json1
-rw-r--r--packages/kernel/src/contracts/extension.ts12
-rw-r--r--packages/kernel/src/host/host.test.ts43
-rw-r--r--packages/kernel/src/host/host.ts3
-rw-r--r--packages/session-orchestrator/src/extension.ts31
-rw-r--r--packages/session-orchestrator/src/index.ts8
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts296
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts219
-rw-r--r--packages/transport-http/src/server.bun.test.ts1
-rw-r--r--tasks.md27
-rw-r--r--tsconfig.json1
22 files changed, 1422 insertions, 69 deletions
diff --git a/bun.lock b/bun.lock
index e1c62d6..9b63971 100644
--- a/bun.lock
+++ b/bun.lock
@@ -18,6 +18,16 @@
"@dispatch/kernel": "workspace:*",
},
},
+ "packages/cache-warming": {
+ "name": "@dispatch/cache-warming",
+ "version": "0.0.0",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ "@dispatch/session-orchestrator": "workspace:*",
+ "@dispatch/surface-registry": "workspace:*",
+ "@dispatch/ui-contract": "workspace:*",
+ },
+ },
"packages/cli": {
"name": "@dispatch/cli",
"version": "0.0.0",
@@ -45,6 +55,7 @@
"version": "0.0.0",
"dependencies": {
"@dispatch/auth-apikey": "workspace:*",
+ "@dispatch/cache-warming": "workspace:*",
"@dispatch/conversation-store": "workspace:*",
"@dispatch/credential-store": "workspace:*",
"@dispatch/journal-sink": "workspace:*",
@@ -243,6 +254,8 @@
"@dispatch/auth-apikey": ["@dispatch/auth-apikey@workspace:packages/auth-apikey"],
+ "@dispatch/cache-warming": ["@dispatch/cache-warming@workspace:packages/cache-warming"],
+
"@dispatch/cli": ["@dispatch/cli@workspace:packages/cli"],
"@dispatch/conversation-store": ["@dispatch/conversation-store@workspace:packages/conversation-store"],
diff --git a/packages/cache-warming/package.json b/packages/cache-warming/package.json
new file mode 100644
index 0000000..eaf3fda
--- /dev/null
+++ b/packages/cache-warming/package.json
@@ -0,0 +1,14 @@
+{
+ "name": "@dispatch/cache-warming",
+ "version": "0.0.0",
+ "type": "module",
+ "private": true,
+ "main": "dist/index.js",
+ "types": "dist/index.d.ts",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ "@dispatch/session-orchestrator": "workspace:*",
+ "@dispatch/surface-registry": "workspace:*",
+ "@dispatch/ui-contract": "workspace:*"
+ }
+}
diff --git a/packages/cache-warming/src/extension.ts b/packages/cache-warming/src/extension.ts
new file mode 100644
index 0000000..16515a8
--- /dev/null
+++ b/packages/cache-warming/src/extension.ts
@@ -0,0 +1,130 @@
+import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
+import { cacheWarmHandle, turnSettled, turnStarted } from "@dispatch/session-orchestrator";
+import type { SurfaceProvider } from "@dispatch/surface-registry";
+import { surfaceRegistryHandle } from "@dispatch/surface-registry";
+import type { SurfaceSpec } from "@dispatch/ui-contract";
+import { createCacheWarmer } from "./warmer.js";
+
+export const manifest: Manifest = {
+ id: "cache-warming",
+ name: "Cache Warming",
+ version: "0.0.0",
+ apiVersion: "^0.1.0",
+ trust: "bundled",
+ activation: "eager",
+ dependsOn: ["session-orchestrator", "surface-registry"],
+ capabilities: {},
+ contributes: {
+ services: ["cache-warming/surface"],
+ },
+};
+
+function buildSurfaceSpec(
+ _conversationId: string | undefined,
+ enabled: boolean,
+ lastPct: number | null,
+): SurfaceSpec {
+ const pctDisplay = lastPct === null ? "—" : `${lastPct}%`;
+ return {
+ id: "cache-warming",
+ region: "side",
+ title: "Cache Warming",
+ fields: [
+ {
+ kind: "toggle",
+ label: "Enabled",
+ value: enabled,
+ action: { actionId: "cache-warming/toggle" },
+ },
+ {
+ kind: "stat",
+ label: "Last Cache %",
+ value: pctDisplay,
+ },
+ ],
+ };
+}
+
+export function activate(host: HostAPI): void {
+ const warmService = host.getService(cacheWarmHandle);
+ const registry = host.getService(surfaceRegistryHandle);
+ const storage = host.storage("cache-warming");
+
+ let currentConversationId: string | undefined;
+
+ // Timer wrapper: setTimeout/clearTimeout return Timeout in Node types,
+ // but our TimerDeps uses number ids. Map between them.
+ const timeoutMap = new Map<number, ReturnType<typeof setTimeout>>();
+ let nextTimerId = 1;
+
+ const warmer = createCacheWarmer({
+ warm: warmService.warm,
+ storage,
+ logger: host.logger,
+ timers: {
+ setTimer(fn, ms) {
+ const id = nextTimerId++;
+ timeoutMap.set(id, setTimeout(fn, ms));
+ return id;
+ },
+ clearTimer(id) {
+ const timeout = timeoutMap.get(id);
+ if (timeout !== undefined) {
+ clearTimeout(timeout);
+ timeoutMap.delete(id);
+ }
+ },
+ },
+ onSurfaceChange: () => {
+ // Surface subscribers will re-fetch on next getSpec()
+ },
+ });
+
+ host.on(turnStarted, (payload) => {
+ currentConversationId = payload.conversationId;
+ warmer.onTurnStarted(payload.conversationId);
+ });
+
+ host.on(turnSettled, (payload) => {
+ currentConversationId = payload.conversationId;
+ warmer.onTurnSettled(payload.conversationId, {
+ ...(payload.cwd !== undefined ? { cwd: payload.cwd } : {}),
+ ...(payload.modelName !== undefined ? { modelName: payload.modelName } : {}),
+ });
+ });
+
+ const provider: SurfaceProvider = {
+ catalogEntry: {
+ id: "cache-warming",
+ region: "side",
+ title: "Cache Warming",
+ },
+ getSpec() {
+ const convId = currentConversationId;
+ const state =
+ convId !== undefined ? warmer.getState(convId) : { enabled: true, lastPct: null };
+ return buildSurfaceSpec(convId, state.enabled, state.lastPct);
+ },
+ async invoke(actionId, payload) {
+ const pl = payload as Record<string, unknown> | undefined;
+ const convId =
+ (typeof pl?.conversationId === "string" ? pl.conversationId : undefined) ??
+ currentConversationId;
+ if (convId === undefined) return;
+
+ if (actionId === "cache-warming/toggle") {
+ const current = warmer.getState(convId);
+ await warmer.setEnabled(convId, !current.enabled);
+ }
+ },
+ };
+
+ registry.register(provider);
+
+ host.logger.info("cache-warming: registered");
+}
+
+export const extension: Extension = {
+ manifest,
+ activate,
+};
diff --git a/packages/cache-warming/src/index.ts b/packages/cache-warming/src/index.ts
new file mode 100644
index 0000000..8670dc5
--- /dev/null
+++ b/packages/cache-warming/src/index.ts
@@ -0,0 +1,19 @@
+export { extension, manifest } from "./extension.js";
+export {
+ type ConversationSettings,
+ type ConversationState,
+ computeCachePct,
+ DEFAULT_INTERVAL_MS,
+ isTokenCurrent,
+ MIN_INTERVAL_MS,
+ parseSettings,
+ serializeSettings,
+ settingsKey,
+ shouldWarm,
+} from "./pure.js";
+export {
+ type CacheWarmer,
+ type CacheWarmerDeps,
+ createCacheWarmer,
+ type TimerDeps,
+} from "./warmer.js";
diff --git a/packages/cache-warming/src/pure.test.ts b/packages/cache-warming/src/pure.test.ts
new file mode 100644
index 0000000..820260b
--- /dev/null
+++ b/packages/cache-warming/src/pure.test.ts
@@ -0,0 +1,109 @@
+import { describe, expect, it } from "vitest";
+import type { ConversationState } from "./pure.js";
+import {
+ computeCachePct,
+ isTokenCurrent,
+ parseSettings,
+ serializeSettings,
+ shouldWarm,
+} from "./pure.js";
+
+describe("computeCachePct", () => {
+ it("cacheRead/input rounded and clamped to 0..100", () => {
+ expect(computeCachePct(1000, 800)).toBe(80);
+ expect(computeCachePct(1000, 1200)).toBe(100);
+ expect(computeCachePct(1000, -100)).toBe(0);
+ expect(computeCachePct(1000, 0)).toBe(0);
+ expect(computeCachePct(1000, 333)).toBe(33);
+ });
+
+ it("zero input tokens → 0", () => {
+ expect(computeCachePct(0, 500)).toBe(0);
+ expect(computeCachePct(-1, 500)).toBe(0);
+ });
+});
+
+describe("shouldWarm", () => {
+ it("returns true when enabled, idle, and token matches", () => {
+ const state: ConversationState = {
+ enabled: true,
+ intervalMs: 240_000,
+ active: false,
+ lastPct: null,
+ token: 5,
+ };
+ expect(shouldWarm(state, 5)).toBe(true);
+ });
+
+ it("returns false when disabled", () => {
+ const state: ConversationState = {
+ enabled: false,
+ intervalMs: 240_000,
+ active: false,
+ lastPct: null,
+ token: 5,
+ };
+ expect(shouldWarm(state, 5)).toBe(false);
+ });
+
+ it("returns false when active", () => {
+ const state: ConversationState = {
+ enabled: true,
+ intervalMs: 240_000,
+ active: true,
+ lastPct: null,
+ token: 5,
+ };
+ expect(shouldWarm(state, 5)).toBe(false);
+ });
+
+ it("returns false when token is superseded", () => {
+ const state: ConversationState = {
+ enabled: true,
+ intervalMs: 240_000,
+ active: false,
+ lastPct: null,
+ token: 5,
+ };
+ expect(shouldWarm(state, 6)).toBe(false);
+ });
+});
+
+describe("isTokenCurrent", () => {
+ it("returns true when tokens match", () => {
+ expect(isTokenCurrent(5, 5)).toBe(true);
+ });
+
+ it("returns false when tokens differ", () => {
+ expect(isTokenCurrent(5, 6)).toBe(false);
+ });
+});
+
+describe("parseSettings/serializeSettings round-trip", () => {
+ it("round-trips enabled + intervalMs", () => {
+ const original = { enabled: false, intervalMs: 120_000 };
+ const serialized = serializeSettings(original);
+ const parsed = parseSettings(serialized);
+ expect(parsed).toEqual(original);
+ });
+
+ it("returns defaults for null input", () => {
+ const parsed = parseSettings(null);
+ expect(parsed).toEqual({ enabled: true, intervalMs: 240_000 });
+ });
+
+ it("returns defaults for malformed JSON", () => {
+ const parsed = parseSettings("not-json{{{");
+ expect(parsed).toEqual({ enabled: true, intervalMs: 240_000 });
+ });
+
+ it("clamps non-positive interval to MIN_INTERVAL_MS", () => {
+ const parsed = parseSettings(JSON.stringify({ enabled: true, intervalMs: -500 }));
+ expect(parsed.intervalMs).toBe(1000);
+ });
+
+ it("uses default for NaN interval", () => {
+ const parsed = parseSettings(JSON.stringify({ enabled: true, intervalMs: Number.NaN }));
+ expect(parsed.intervalMs).toBe(240_000);
+ });
+});
diff --git a/packages/cache-warming/src/pure.ts b/packages/cache-warming/src/pure.ts
new file mode 100644
index 0000000..2b00dab
--- /dev/null
+++ b/packages/cache-warming/src/pure.ts
@@ -0,0 +1,95 @@
+/**
+ * Pure core for cache-warming — zero I/O, zero ambient state.
+ * Every function is input → output; testable without mocks.
+ */
+
+// --- Types ---
+
+/** Persisted per-conversation settings (storage-facing). */
+export interface ConversationSettings {
+ readonly enabled: boolean;
+ readonly intervalMs: number;
+}
+
+/** Full per-conversation runtime state (in-memory, not persisted). */
+export interface ConversationState extends ConversationSettings {
+ readonly active: boolean;
+ readonly lastPct: number | null;
+ readonly token: number;
+}
+
+/** Context stored per-conversation from the latest lifecycle event. */
+export interface ConversationContext {
+ readonly cwd?: string;
+ readonly modelName?: string;
+}
+
+export const DEFAULT_INTERVAL_MS = 240_000;
+export const MIN_INTERVAL_MS = 1000;
+
+// --- Pure functions ---
+
+/**
+ * Compute cache-hit percentage from token counts.
+ * Returns an integer in [0, 100]. inputTokens ≤ 0 → 0.
+ */
+export function computeCachePct(inputTokens: number, cacheReadTokens: number): number {
+ if (inputTokens <= 0) return 0;
+ const ratio = cacheReadTokens / inputTokens;
+ const clamped = Math.max(0, Math.min(1, ratio));
+ return Math.round(clamped * 100);
+}
+
+/**
+ * Decide whether a conversation should be warmed right now.
+ * Requires: enabled, idle (not active), and the token is current (not superseded).
+ */
+export function shouldWarm(state: ConversationState, currentToken: number): boolean {
+ return state.enabled && !state.active && state.token === currentToken;
+}
+
+/**
+ * Check whether a token is still current (not superseded by a newer cancel/fire).
+ */
+export function isTokenCurrent(current: number, expected: number): boolean {
+ return current === expected;
+}
+
+const SETTINGS_KEY = "settings";
+
+/**
+ * Parse settings from a raw storage string.
+ * Returns defaults if null or malformed.
+ */
+export function parseSettings(raw: string | null): ConversationSettings {
+ if (raw === null) return { enabled: true, intervalMs: DEFAULT_INTERVAL_MS };
+ try {
+ const parsed: unknown = JSON.parse(raw);
+ if (typeof parsed !== "object" || parsed === null) {
+ return { enabled: true, intervalMs: DEFAULT_INTERVAL_MS };
+ }
+ const obj = parsed as Record<string, unknown>;
+ const enabled = typeof obj.enabled === "boolean" ? obj.enabled : true;
+ const rawInterval = obj.intervalMs;
+ let intervalMs = DEFAULT_INTERVAL_MS;
+ if (typeof rawInterval === "number" && Number.isFinite(rawInterval)) {
+ intervalMs =
+ rawInterval <= 0 ? MIN_INTERVAL_MS : Math.max(MIN_INTERVAL_MS, Math.round(rawInterval));
+ }
+ return { enabled, intervalMs };
+ } catch {
+ return { enabled: true, intervalMs: DEFAULT_INTERVAL_MS };
+ }
+}
+
+/**
+ * Serialize settings for storage.
+ */
+export function serializeSettings(settings: ConversationSettings): string {
+ return JSON.stringify(settings);
+}
+
+/** The storage key for a conversation's settings. */
+export function settingsKey(conversationId: string): string {
+ return `${SETTINGS_KEY}:${conversationId}`;
+}
diff --git a/packages/cache-warming/src/warmer.test.ts b/packages/cache-warming/src/warmer.test.ts
new file mode 100644
index 0000000..9b9ba93
--- /dev/null
+++ b/packages/cache-warming/src/warmer.test.ts
@@ -0,0 +1,207 @@
+import type { Logger, Span } from "@dispatch/kernel";
+import type { WarmResult } from "@dispatch/session-orchestrator";
+import { describe, expect, it } from "vitest";
+import { createCacheWarmer, type TimerDeps } from "./warmer.js";
+
+function memStorage(): StorageNamespace {
+ const map = new Map<string, string>();
+ return {
+ get: async (k) => map.get(k) ?? null,
+ set: async (k, v) => {
+ map.set(k, v);
+ },
+ delete: async (k) => {
+ map.delete(k);
+ },
+ has: async (k) => map.has(k),
+ keys: async (prefix) =>
+ [...map.keys()].filter((k) => (prefix === undefined ? true : k.startsWith(prefix))),
+ };
+}
+
+function makeSpan(): Span {
+ const span: Span = {
+ id: "span",
+ log: makeLogger(),
+ setAttributes: () => {},
+ addLink: () => {},
+ child: () => makeSpan(),
+ end: () => {},
+ };
+ return span;
+}
+
+function makeLogger(): Logger {
+ return {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: () => makeLogger(),
+ span: () => makeSpan(),
+ };
+}
+
+function fakeTimers(): TimerDeps & { flush: () => void } {
+ let nextId = 1;
+ const pending = new Map<number, () => void>();
+ return {
+ setTimer(fn, _ms) {
+ const id = nextId++;
+ pending.set(id, fn);
+ return id;
+ },
+ clearTimer(id) {
+ pending.delete(id);
+ },
+ flush() {
+ const fns = [...pending.values()];
+ pending.clear();
+ for (const fn of fns) fn();
+ },
+ };
+}
+
+const WARM_RESULT: WarmResult = {
+ inputTokens: 1000,
+ outputTokens: 10,
+ cacheReadTokens: 800,
+ cacheWriteTokens: 0,
+};
+
+import type { StorageNamespace } from "@dispatch/kernel";
+
+describe("CacheWarmer", () => {
+ it("arms a timer on turnSettled and warms when it fires (enabled)", async () => {
+ const timers = fakeTimers();
+ const warmCalls: string[] = [];
+ const warmer = createCacheWarmer({
+ warm: async (convId) => {
+ warmCalls.push(convId);
+ return WARM_RESULT;
+ },
+ storage: memStorage(),
+ logger: makeLogger(),
+ timers,
+ onSurfaceChange: () => {},
+ });
+
+ warmer.onTurnSettled("conv-1", {});
+ timers.flush();
+
+ await new Promise((r) => setTimeout(r, 10));
+ expect(warmCalls).toContain("conv-1");
+ });
+
+ it("cancels the timer on turnStarted (no warm while generating)", () => {
+ const timers = fakeTimers();
+ const warmCalls: string[] = [];
+ const warmer = createCacheWarmer({
+ warm: async (convId) => {
+ warmCalls.push(convId);
+ return WARM_RESULT;
+ },
+ storage: memStorage(),
+ logger: makeLogger(),
+ timers,
+ onSurfaceChange: () => {},
+ });
+
+ warmer.onTurnSettled("conv-1", {});
+ warmer.onTurnStarted("conv-1");
+ timers.flush();
+
+ expect(warmCalls).toHaveLength(0);
+ });
+
+ it("in-flight warm result is dropped when superseded (token mismatch)", async () => {
+ const timers = fakeTimers();
+ let resolveWarm: (v: WarmResult) => void = () => {};
+ const warmPromise = new Promise<WarmResult>((r) => {
+ resolveWarm = r;
+ });
+ const warmer = createCacheWarmer({
+ warm: () => warmPromise,
+ storage: memStorage(),
+ logger: makeLogger(),
+ timers,
+ onSurfaceChange: () => {},
+ });
+
+ warmer.onTurnSettled("conv-1", {});
+ timers.flush();
+
+ warmer.onTurnStarted("conv-1");
+ warmer.onTurnSettled("conv-1", {});
+
+ resolveWarm?.(WARM_RESULT);
+ await new Promise((r) => setTimeout(r, 10));
+
+ const state = warmer.getState("conv-1");
+ expect(state.lastPct).toBeNull();
+ });
+
+ it("disabled conversation does not warm", async () => {
+ const timers = fakeTimers();
+ const warmCalls: string[] = [];
+ const warmer = createCacheWarmer({
+ warm: async (convId) => {
+ warmCalls.push(convId);
+ return WARM_RESULT;
+ },
+ storage: memStorage(),
+ logger: makeLogger(),
+ timers,
+ onSurfaceChange: () => {},
+ });
+
+ await warmer.setEnabled("conv-1", false);
+ warmer.onTurnSettled("conv-1", {});
+ timers.flush();
+
+ await new Promise((r) => setTimeout(r, 10));
+ expect(warmCalls).toHaveLength(0);
+ });
+
+ it("stores lastPct from the warm result", async () => {
+ const timers = fakeTimers();
+ const warmer = createCacheWarmer({
+ warm: async () => WARM_RESULT,
+ storage: memStorage(),
+ logger: makeLogger(),
+ timers,
+ onSurfaceChange: () => {},
+ });
+
+ warmer.onTurnSettled("conv-1", {});
+ timers.flush();
+
+ await new Promise((r) => setTimeout(r, 10));
+ const state = warmer.getState("conv-1");
+ expect(state.lastPct).toBe(80);
+ });
+
+ it("re-arms timer after warm completes", async () => {
+ const timers = fakeTimers();
+ let warmCount = 0;
+ const warmer = createCacheWarmer({
+ warm: async () => {
+ warmCount++;
+ return WARM_RESULT;
+ },
+ storage: memStorage(),
+ logger: makeLogger(),
+ timers,
+ onSurfaceChange: () => {},
+ });
+
+ warmer.onTurnSettled("conv-1", {});
+ timers.flush();
+ await new Promise((r) => setTimeout(r, 10));
+
+ timers.flush();
+ await new Promise((r) => setTimeout(r, 10));
+
+ expect(warmCount).toBe(2);
+ });
+});
diff --git a/packages/cache-warming/src/warmer.ts b/packages/cache-warming/src/warmer.ts
new file mode 100644
index 0000000..31dd41e
--- /dev/null
+++ b/packages/cache-warming/src/warmer.ts
@@ -0,0 +1,248 @@
+import type { Logger, StorageNamespace } from "@dispatch/kernel";
+import type { WarmService } from "@dispatch/session-orchestrator";
+import {
+ type ConversationContext,
+ type ConversationSettings,
+ type ConversationState,
+ computeCachePct,
+ DEFAULT_INTERVAL_MS,
+ isTokenCurrent,
+ MIN_INTERVAL_MS,
+ parseSettings,
+ serializeSettings,
+ settingsKey,
+ shouldWarm,
+} from "./pure.js";
+
+// --- Timer abstraction (injectable for tests) ---
+
+export interface TimerDeps {
+ readonly setTimer: (fn: () => void, ms: number) => number;
+ readonly clearTimer: (id: number) => void;
+}
+
+// --- Warmer interface ---
+
+export interface CacheWarmer {
+ /** Handle a turnStarted event — mark conversation active, cancel pending warm. */
+ readonly onTurnStarted: (conversationId: string) => void;
+
+ /** Handle a turnSettled event — mark idle, store context, arm timer if enabled. */
+ readonly onTurnSettled: (conversationId: string, ctx: ConversationContext) => void;
+
+ /** Get the current state for a conversation (for surface rendering). */
+ readonly getState: (conversationId: string) => ConversationState;
+
+ /** Get the stored context for a conversation. */
+ readonly getContext: (conversationId: string) => ConversationContext;
+
+ /** Toggle enabled for a conversation. Returns updated settings. */
+ readonly setEnabled: (conversationId: string, enabled: boolean) => Promise<ConversationSettings>;
+
+ /** Set the refresh interval for a conversation. Returns updated settings. */
+ readonly setIntervalMs: (
+ conversationId: string,
+ intervalMs: number,
+ ) => Promise<ConversationSettings>;
+
+ /** Dispose all timers (for cleanup). */
+ readonly dispose: () => void;
+}
+
+export interface CacheWarmerDeps {
+ readonly warm: WarmService["warm"];
+ readonly storage: StorageNamespace;
+ readonly logger: Logger;
+ readonly timers: TimerDeps;
+ /** Called when surface subscribers should re-fetch the spec. */
+ readonly onSurfaceChange: () => void;
+}
+
+const DEFAULT_STATE: ConversationState = {
+ enabled: true,
+ intervalMs: DEFAULT_INTERVAL_MS,
+ active: false,
+ lastPct: null,
+ token: 0,
+};
+
+export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer {
+ // Per-conversation runtime state (not persisted — reconstructed from storage + events)
+ const states = new Map<string, ConversationState>();
+ // Per-conversation context from latest lifecycle event
+ const contexts = new Map<string, ConversationContext>();
+ // Per-conversation pending timer id
+ const timers = new Map<string, number>();
+ // Monotonic token per conversation for in-flight invalidation
+ let nextToken = 1;
+
+ function getState(conversationId: string): ConversationState {
+ return states.get(conversationId) ?? DEFAULT_STATE;
+ }
+
+ function getContext(conversationId: string): ConversationContext {
+ return contexts.get(conversationId) ?? {};
+ }
+
+ function setState(conversationId: string, state: ConversationState): void {
+ states.set(conversationId, state);
+ }
+
+ function cancelTimer(conversationId: string): void {
+ const existing = timers.get(conversationId);
+ if (existing !== undefined) {
+ deps.timers.clearTimer(existing);
+ timers.delete(conversationId);
+ }
+ }
+
+ function armTimer(conversationId: string): void {
+ cancelTimer(conversationId);
+ const state = getState(conversationId);
+ if (!state.enabled || state.active) return;
+
+ const token = nextToken++;
+ setState(conversationId, { ...state, token });
+
+ const timerId = deps.timers.setTimer(() => {
+ timers.delete(conversationId);
+ void fireWarm(conversationId, token);
+ }, state.intervalMs);
+
+ timers.set(conversationId, timerId);
+ }
+
+ async function fireWarm(conversationId: string, token: number): Promise<void> {
+ const state = getState(conversationId);
+ if (!shouldWarm(state, token)) {
+ deps.logger.debug("cache-warming: skip warm (superseded or disabled)", {
+ conversationId,
+ });
+ return;
+ }
+
+ const ctx = getContext(conversationId);
+ deps.logger.debug("cache-warming: firing warm", { conversationId });
+
+ const result = await deps.warm(conversationId, {
+ ...(ctx.cwd !== undefined ? { cwd: ctx.cwd } : {}),
+ ...(ctx.modelName !== undefined ? { modelName: ctx.modelName } : {}),
+ });
+
+ // Re-check token after async warm — result may be stale
+ const currentState = getState(conversationId);
+ if (!isTokenCurrent(currentState.token, token)) {
+ deps.logger.debug("cache-warming: discarding stale warm result", {
+ conversationId,
+ });
+ return;
+ }
+
+ if ("error" in result) {
+ deps.logger.debug("cache-warming: warm returned error (normal)", {
+ conversationId,
+ error: result.error,
+ });
+ } else {
+ const pct = computeCachePct(result.inputTokens, result.cacheReadTokens);
+ setState(conversationId, { ...currentState, lastPct: pct });
+ deps.onSurfaceChange();
+ deps.logger.debug("cache-warming: warm complete", {
+ conversationId,
+ pct,
+ });
+ }
+
+ // Re-arm for next cycle
+ armTimer(conversationId);
+ }
+
+ async function loadSettings(conversationId: string): Promise<ConversationSettings> {
+ const raw = await deps.storage.get(settingsKey(conversationId));
+ return parseSettings(raw);
+ }
+
+ async function persistSettings(
+ conversationId: string,
+ settings: ConversationSettings,
+ ): Promise<void> {
+ await deps.storage.set(settingsKey(conversationId), serializeSettings(settings));
+ }
+
+ function mergeState(
+ conversationId: string,
+ partial: Partial<ConversationState>,
+ ): ConversationState {
+ const current = getState(conversationId);
+ const updated = { ...current, ...partial };
+ setState(conversationId, updated);
+ return updated;
+ }
+
+ return {
+ onTurnStarted(conversationId) {
+ deps.logger.debug("cache-warming: turn started", { conversationId });
+ mergeState(conversationId, { active: true });
+ cancelTimer(conversationId);
+ },
+
+ onTurnSettled(conversationId, ctx) {
+ deps.logger.debug("cache-warming: turn settled", { conversationId });
+ contexts.set(conversationId, ctx);
+ mergeState(conversationId, { active: false });
+
+ const state = getState(conversationId);
+ if (state.enabled) {
+ armTimer(conversationId);
+ }
+ },
+
+ getState,
+ getContext,
+
+ async setEnabled(conversationId, enabled) {
+ const settings = await loadSettings(conversationId);
+ const updated = { ...settings, enabled };
+ await persistSettings(conversationId, updated);
+ mergeState(conversationId, { enabled });
+
+ if (enabled) {
+ const state = getState(conversationId);
+ if (!state.active) {
+ armTimer(conversationId);
+ }
+ } else {
+ cancelTimer(conversationId);
+ }
+
+ deps.onSurfaceChange();
+ return updated;
+ },
+
+ async setIntervalMs(conversationId, intervalMs) {
+ const clamped =
+ !Number.isFinite(intervalMs) || intervalMs <= 0
+ ? MIN_INTERVAL_MS
+ : Math.max(MIN_INTERVAL_MS, Math.round(intervalMs));
+ const settings = await loadSettings(conversationId);
+ const updated = { ...settings, intervalMs: clamped };
+ await persistSettings(conversationId, updated);
+ mergeState(conversationId, { intervalMs: clamped });
+
+ // Re-arm with new interval if currently armed
+ const state = getState(conversationId);
+ if (state.enabled && !state.active && timers.has(conversationId)) {
+ armTimer(conversationId);
+ }
+
+ deps.onSurfaceChange();
+ return updated;
+ },
+
+ dispose() {
+ for (const [conversationId] of timers) {
+ cancelTimer(conversationId);
+ }
+ },
+ };
+}
diff --git a/packages/cache-warming/tsconfig.json b/packages/cache-warming/tsconfig.json
new file mode 100644
index 0000000..6557731
--- /dev/null
+++ b/packages/cache-warming/tsconfig.json
@@ -0,0 +1,11 @@
+{
+ "extends": "../../tsconfig.base.json",
+ "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true },
+ "include": ["src/**/*.ts"],
+ "references": [
+ { "path": "../kernel" },
+ { "path": "../session-orchestrator" },
+ { "path": "../surface-registry" },
+ { "path": "../ui-contract" }
+ ]
+}
diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json
index 6a1d24a..0d2b817 100644
--- a/packages/host-bin/package.json
+++ b/packages/host-bin/package.json
@@ -8,6 +8,7 @@
"@dispatch/storage-sqlite": "workspace:*",
"@dispatch/conversation-store": "workspace:*",
"@dispatch/auth-apikey": "workspace:*",
+ "@dispatch/cache-warming": "workspace:*",
"@dispatch/credential-store": "workspace:*",
"@dispatch/provider-openai-compat": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index ef75f55..1594dcc 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -1,6 +1,7 @@
import { mkdirSync } from "node:fs";
import { dirname } from "node:path";
import { extension as authApikeyExt } from "@dispatch/auth-apikey";
+import { extension as cacheWarmingExt } from "@dispatch/cache-warming";
import { extension as conversationStoreExt } from "@dispatch/conversation-store";
import { createCredentialStoreExtension } from "@dispatch/credential-store";
import { createJournalSink } from "@dispatch/journal-sink";
@@ -73,6 +74,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [
throughputStoreExt,
sessionOrchestratorExt,
skillsExt,
+ cacheWarmingExt,
createTransportHttpExtension(),
// Surface extensions — dependency order: surface-registry first, then consumers.
createSurfaceRegistryExtension(),
diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json
index b357c51..77de667 100644
--- a/packages/host-bin/tsconfig.json
+++ b/packages/host-bin/tsconfig.json
@@ -3,6 +3,7 @@
"compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true },
"include": ["src/**/*.ts"],
"references": [
+ { "path": "../cache-warming" },
{ "path": "../kernel" },
{ "path": "../storage-sqlite" },
{ "path": "../surface-loaded-extensions" },
diff --git a/packages/kernel/src/contracts/extension.ts b/packages/kernel/src/contracts/extension.ts
index 5a7821b..4d6cf07 100644
--- a/packages/kernel/src/contracts/extension.ts
+++ b/packages/kernel/src/contracts/extension.ts
@@ -190,6 +190,18 @@ export interface HostAPI {
handler: EventHandler<TPayload>,
) => () => void;
+ /**
+ * Emit an event hook: fire-and-forget dispatch to every `on` subscriber,
+ * error-isolated per handler (a thrown handler is caught + logged, never
+ * breaks the caller). The counterpart of `on`.
+ *
+ * This lets a core extension that OWNS a lifecycle publish typed events that
+ * standard extensions react to — e.g. the session-orchestrator emitting
+ * per-turn start/settle events a cache-warming extension subscribes to. The
+ * kernel owns the mechanism; the owner declares the typed `EventHookDescriptor`.
+ */
+ readonly emit: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void;
+
/** Add a filter to a filter hook chain. Filters are awaited in-band. */
readonly addFilter: <TValue>(
hook: FilterDescriptor<TValue>,
diff --git a/packages/kernel/src/host/host.test.ts b/packages/kernel/src/host/host.test.ts
index 669d093..0067091 100644
--- a/packages/kernel/src/host/host.test.ts
+++ b/packages/kernel/src/host/host.test.ts
@@ -617,6 +617,49 @@ describe("createHost", () => {
expect(received).toEqual(["hello"]);
});
+ it("emit dispatches to handlers registered via on", async () => {
+ const hook = defineEventHook<string>("test/emit-dispatch");
+ const received: string[] = [];
+
+ const ext = createExtension("emit-ext", {
+ activate: (host) => {
+ host.on(hook, (payload) => {
+ received.push(payload);
+ });
+ },
+ });
+
+ const host = createHost([ext], deps);
+ await host.activate();
+
+ const api = host.getHostAPI();
+ api.emit(hook, "world");
+ expect(received).toEqual(["world"]);
+ });
+
+ it("emit isolates a throwing handler (does not propagate)", async () => {
+ const hook = defineEventHook<string>("test/emit-isolation");
+ const received: string[] = [];
+
+ const ext = createExtension("emit-isolation-ext", {
+ activate: (host) => {
+ host.on(hook, () => {
+ throw new Error("handler boom");
+ });
+ host.on(hook, (payload) => {
+ received.push(payload);
+ });
+ },
+ });
+
+ const host = createHost([ext], deps);
+ await host.activate();
+
+ const api = host.getHostAPI();
+ expect(() => api.emit(hook, "safe")).not.toThrow();
+ expect(received).toEqual(["safe"]);
+ });
+
it("applyFilters threads a value through registered filters in order", async () => {
const hook = defineFilter<string>("test/text-transform");
diff --git a/packages/kernel/src/host/host.ts b/packages/kernel/src/host/host.ts
index a6396a9..2a262be 100644
--- a/packages/kernel/src/host/host.ts
+++ b/packages/kernel/src/host/host.ts
@@ -122,6 +122,9 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho
on<TPayload>(hook: EventHookDescriptor<TPayload>, handler: EventHandler<TPayload>) {
return deps.bus.on(hook, handler);
},
+ emit<TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) {
+ deps.bus.emit(hook, payload);
+ },
addFilter<TValue>(hook: FilterDescriptor<TValue>, fn: FilterHandler<TValue>) {
return deps.bus.addFilter(hook, fn);
},
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index 697eb4a..12d387c 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -3,8 +3,9 @@ import { credentialStoreHandle } from "@dispatch/credential-store";
import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
import { runTurn } from "@dispatch/kernel";
import {
+ cacheWarmHandle,
createSessionOrchestrator,
- type SessionOrchestrator,
+ createWarmService,
sessionOrchestratorHandle,
} from "./orchestrator.js";
import { selectFirstProvider } from "./pure.js";
@@ -19,14 +20,15 @@ export const manifest: Manifest = {
dependsOn: ["conversation-store", "credential-store"],
activation: "eager",
contributes: {
- services: ["session-orchestrator/orchestrator"],
+ services: ["session-orchestrator/orchestrator", "session-orchestrator/warm"],
+ hooks: ["session-orchestrator/turn-started", "session-orchestrator/turn-settled"],
},
};
export function activate(host: HostAPI): void {
const conversationStore = host.getService(conversationStoreHandle);
- const orchestrator: SessionOrchestrator = createSessionOrchestrator({
+ const { orchestrator, activeConversations } = createSessionOrchestrator({
conversationStore,
resolveProvider: () => selectFirstProvider(host.getProviders()),
resolveTools: () => [...host.getTools().values()],
@@ -41,9 +43,32 @@ export function activate(host: HostAPI): void {
runTurn,
logger: host.logger,
now: () => Date.now(),
+ emit: (hook, payload) => host.emit(hook, payload),
});
host.provideService(sessionOrchestratorHandle, orchestrator);
+
+ const warmService = createWarmService(
+ {
+ conversationStore,
+ resolveProvider: () => selectFirstProvider(host.getProviders()),
+ resolveTools: () => [...host.getTools().values()],
+ resolveModel: (modelName: string) => {
+ const store = host.getService(credentialStoreHandle);
+ const r = store.resolve(modelName);
+ if (r === undefined) return undefined;
+ const provider = host.getProviders().get(r.providerId);
+ return provider ? { provider, model: r.model } : undefined;
+ },
+ applyToolsFilter: (assembly) => host.applyFilters(toolsFilter, assembly),
+ runTurn,
+ logger: host.logger,
+ now: () => Date.now(),
+ },
+ activeConversations,
+ );
+
+ host.provideService(cacheWarmHandle, warmService);
}
export const extension: Extension = {
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index 071b616..37ae5ce 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -1,9 +1,17 @@
export { extension, manifest } from "./extension.js";
export {
+ cacheWarmHandle,
createSessionOrchestrator,
+ createWarmService,
type SessionOrchestrator,
+ type SessionOrchestratorBundle,
type SessionOrchestratorDeps,
sessionOrchestratorHandle,
+ type TurnLifecyclePayload,
+ turnSettled,
+ turnStarted,
+ type WarmResult,
+ type WarmService,
} from "./orchestrator.js";
export {
buildUserMessage,
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index dcaad7d..5d512ea 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -2,6 +2,7 @@ import type { ConversationStore } from "@dispatch/conversation-store";
import type {
AgentEvent,
ChatMessage,
+ EventHookDescriptor,
ProviderContract,
ProviderEvent,
RunTurnInput,
@@ -12,7 +13,11 @@ import type {
} from "@dispatch/kernel";
import { runTurn } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
-import { createSessionOrchestrator } from "./orchestrator.js";
+import {
+ createSessionOrchestrator,
+ createWarmService,
+ type TurnLifecyclePayload,
+} from "./orchestrator.js";
import type { ToolAssembly } from "./tools-filter.js";
function createInMemoryStore(): ConversationStore & {
@@ -104,7 +109,7 @@ describe("handleMessage integration", () => {
],
]);
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -156,7 +161,7 @@ describe("handleMessage integration", () => {
},
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -203,7 +208,7 @@ describe("handleMessage integration", () => {
],
]);
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -233,7 +238,7 @@ describe("handleMessage integration", () => {
],
]);
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -282,7 +287,7 @@ describe("handleMessage model resolution", () => {
const fallbackProvider: ProviderContract = { id: "fallback", stream: async function* () {} };
const { captured, captureRunTurn } = createCapturingRunTurn();
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => fallbackProvider,
resolveTools: () => [],
@@ -314,7 +319,7 @@ describe("handleMessage model resolution", () => {
const { captured, captureRunTurn } = createCapturingRunTurn();
const events: AgentEvent[] = [];
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => fallbackProvider,
resolveTools: () => [],
@@ -345,7 +350,7 @@ describe("handleMessage model resolution", () => {
const fallbackProvider: ProviderContract = { id: "fallback", stream: async function* () {} };
const { captured, captureRunTurn } = createCapturingRunTurn();
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => fallbackProvider,
resolveTools: () => [],
@@ -373,7 +378,7 @@ describe("handleMessage model resolution", () => {
const provider: ProviderContract = { id: "p", stream: async function* () {} };
const { captured, captureRunTurn } = createCapturingRunTurn();
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -407,7 +412,7 @@ describe("handleMessage model resolution", () => {
const { captured, captureRunTurn } = createCapturingRunTurn();
const fakeNow = () => 42;
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -432,7 +437,7 @@ describe("handleMessage model resolution", () => {
const provider: ProviderContract = { id: "p", stream: async function* () {} };
const { captured, captureRunTurn } = createCapturingRunTurn();
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -461,7 +466,7 @@ describe("turn-sealed event", () => {
],
]);
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -514,7 +519,7 @@ describe("turn-sealed event", () => {
},
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: wrappedStore,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -561,7 +566,7 @@ describe("turn-sealed event", () => {
},
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: failingStore,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -595,7 +600,7 @@ describe("turn metrics persistence", () => {
],
]);
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -655,7 +660,7 @@ describe("turn metrics persistence", () => {
},
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [tool],
@@ -714,7 +719,7 @@ describe("turn metrics persistence", () => {
},
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -781,7 +786,7 @@ describe("turn metrics persistence", () => {
},
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [tool],
@@ -835,7 +840,7 @@ describe("turn metrics persistence", () => {
},
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: failingMetricsStore,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -874,7 +879,7 @@ describe("tools filter", () => {
return Promise.resolve({ ...assembly, tools: [toolB] });
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [toolA],
@@ -902,7 +907,7 @@ describe("tools filter", () => {
const toolA = createFakeTool("tool-a", async () => ({ content: "a" }));
const toolB = createFakeTool("tool-b", async () => ({ content: "b" }));
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [toolA, toolB],
@@ -933,7 +938,7 @@ describe("tools filter", () => {
return Promise.resolve(assembly);
};
- const orchestrator = createSessionOrchestrator({
+ const { orchestrator } = createSessionOrchestrator({
conversationStore: store,
resolveProvider: () => provider,
resolveTools: () => [],
@@ -964,3 +969,250 @@ function createCounterNow(): { now: () => number; tick: (ms: number) => void } {
},
};
}
+
+describe("lifecycle event hooks", () => {
+ it("emits turnStarted before and turnSettled after a turn", async () => {
+ const store = createInMemoryStore();
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "ok" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const emitted: Array<{ hook: string; payload: TurnLifecyclePayload; order: number }> = [];
+ let order = 0;
+
+ const fakeEmit = <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload): void => {
+ emitted.push({ hook: hook.id, payload: payload as TurnLifecyclePayload, order: order++ });
+ };
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ emit: fakeEmit,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-lifecycle",
+ text: "test",
+ onEvent: () => {},
+ cwd: "/work",
+ modelName: "mymodel",
+ });
+
+ expect(emitted).toHaveLength(2);
+ expect(emitted[0]?.hook).toBe("session-orchestrator/turn-started");
+ expect(emitted[0]?.payload.conversationId).toBe("conv-lifecycle");
+ expect(emitted[0]?.payload.cwd).toBe("/work");
+ expect(emitted[0]?.payload.modelName).toBe("mymodel");
+ expect(emitted[0]?.order).toBe(0);
+
+ expect(emitted[1]?.hook).toBe("session-orchestrator/turn-settled");
+ expect(emitted[1]?.payload.conversationId).toBe("conv-lifecycle");
+ expect(emitted[1]?.payload.cwd).toBe("/work");
+ expect(emitted[1]?.payload.modelName).toBe("mymodel");
+ expect(emitted[1]?.order).toBe(1);
+ });
+});
+
+describe("warm service", () => {
+ it("warm reuses the assembled tools + full history and appends the probe turn", async () => {
+ const store = createInMemoryStore();
+ const existingMsg: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "existing" }],
+ };
+ const assistantMsg: ChatMessage = {
+ role: "assistant",
+ chunks: [{ type: "text", text: "reply" }],
+ };
+ await store.append("conv-warm-reuse", [existingMsg, assistantMsg]);
+
+ let capturedMessages: readonly ChatMessage[] | undefined;
+ let capturedTools: readonly ToolContract[] | undefined;
+ let _capturedOpts: unknown;
+
+ const toolA = createFakeTool("tool-a", async () => ({ content: "a" }));
+
+ const provider: ProviderContract = {
+ id: "warm-provider",
+ stream(messages, tools, opts) {
+ capturedMessages = messages;
+ capturedTools = tools;
+ _capturedOpts = opts;
+ return (async function* () {
+ yield {
+ type: "usage",
+ usage: { inputTokens: 100, outputTokens: 5, cacheReadTokens: 80, cacheWriteTokens: 20 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const deps = {
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [toolA],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ };
+
+ const { activeConversations } = createSessionOrchestrator(deps);
+ const warmService = createWarmService(deps, activeConversations);
+
+ const result = await warmService.warm("conv-warm-reuse", { cwd: "/test" });
+
+ expect(capturedMessages).toBeDefined();
+ expect(capturedMessages).toHaveLength(3);
+ expect(capturedMessages?.[0]?.chunks[0]).toEqual({ type: "text", text: "existing" });
+ expect(capturedMessages?.[1]?.chunks[0]).toEqual({ type: "text", text: "reply" });
+ expect(capturedMessages?.[2]?.role).toBe("user");
+ expect((capturedMessages?.[2]?.chunks[0] as { type: "text"; text: string }).text).toBe(
+ "reply with just a .",
+ );
+
+ expect(capturedTools).toHaveLength(1);
+ expect(capturedTools?.[0]?.name).toBe("tool-a");
+
+ if ("inputTokens" in result) {
+ expect(result.inputTokens).toBe(100);
+ expect(result.cacheReadTokens).toBe(80);
+ }
+ });
+
+ it("warm refuses while the conversation is generating", async () => {
+ const store = createInMemoryStore();
+ let resolveRunTurn: (() => void) | undefined;
+ const runTurnBlocker = new Promise<void>((resolve) => {
+ resolveRunTurn = resolve;
+ });
+
+ const provider: ProviderContract = {
+ id: "p",
+ stream: async function* () {
+ yield { type: "text-delta", delta: "slow" } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ },
+ };
+
+ const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => {
+ await runTurnBlocker;
+ return {
+ messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }],
+ usage: { inputTokens: 1, outputTokens: 1 },
+ finishReason: "stop",
+ };
+ };
+
+ const deps = {
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn: blockingRunTurn,
+ };
+
+ const { orchestrator, activeConversations } = createSessionOrchestrator(deps);
+ const warmService = createWarmService(deps, activeConversations);
+
+ const turnPromise = orchestrator.handleMessage({
+ conversationId: "conv-blocking",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const warmResult = await warmService.warm("conv-blocking");
+ expect(warmResult).toEqual({ error: "conversation is generating" });
+
+ resolveRunTurn?.();
+ await turnPromise;
+ });
+
+ it("warm never persists (no append) and emits no AgentEvents", async () => {
+ const store = createInMemoryStore();
+ const existingMsg: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "existing" }],
+ };
+ await store.append("conv-no-persist", [existingMsg]);
+
+ const provider: ProviderContract = {
+ id: "p",
+ stream: async function* () {
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 2 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ },
+ };
+
+ const deps = {
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ };
+
+ const { activeConversations } = createSessionOrchestrator(deps);
+ const warmService = createWarmService(deps, activeConversations);
+
+ const sizeBefore = store.data.get("conv-no-persist")?.length;
+
+ await warmService.warm("conv-no-persist");
+
+ const sizeAfter = store.data.get("conv-no-persist")?.length;
+ expect(sizeAfter).toBe(sizeBefore);
+ });
+
+ it("warm returns provider usage (input + cacheReadTokens)", async () => {
+ const store = createInMemoryStore();
+ const existingMsg: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "existing" }],
+ };
+ await store.append("conv-usage", [existingMsg]);
+
+ const provider: ProviderContract = {
+ id: "p",
+ stream: async function* () {
+ yield {
+ type: "usage",
+ usage: {
+ inputTokens: 500,
+ outputTokens: 3,
+ cacheReadTokens: 400,
+ cacheWriteTokens: 100,
+ },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ },
+ };
+
+ const deps = {
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ };
+
+ const { activeConversations } = createSessionOrchestrator(deps);
+ const warmService = createWarmService(deps, activeConversations);
+
+ const result = await warmService.warm("conv-usage");
+
+ expect(result).toEqual({
+ inputTokens: 500,
+ outputTokens: 3,
+ cacheReadTokens: 400,
+ cacheWriteTokens: 100,
+ });
+ });
+});
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 2d1bbf5..c39bc06 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -2,19 +2,59 @@ import type { ConversationStore } from "@dispatch/conversation-store";
import type {
AgentEvent,
ChatMessage,
+ EventHookDescriptor,
Logger,
ProviderContract,
+ ProviderEvent,
ProviderStreamOptions,
RunTurnInput,
RunTurnResult,
ToolContract,
ToolDispatchPolicy,
+ UsageEvent,
} from "@dispatch/kernel";
-import { defineService } from "@dispatch/kernel";
+import { defineEventHook, defineService, type ServiceHandle } from "@dispatch/kernel";
import { createMetricsAccumulator } from "./metrics.js";
import { buildUserMessage, defaultDispatchPolicy, generateTurnId } from "./pure.js";
import type { ToolAssembly } from "./tools-filter.js";
+// --- Lifecycle event hooks ---
+
+/** Context carried on turn-lifecycle events, enough to replicate the turn's request prefix. */
+export interface TurnLifecyclePayload {
+ readonly conversationId: string;
+ readonly cwd?: string;
+ readonly modelName?: string;
+}
+
+/** Fired when a turn STARTS driving a conversation (consumers cancel warming timers). */
+export const turnStarted: EventHookDescriptor<TurnLifecyclePayload> =
+ defineEventHook<TurnLifecyclePayload>("session-orchestrator/turn-started");
+
+/** Fired when a turn SETTLES (sealed) for a conversation (consumers arm warming timers). */
+export const turnSettled: EventHookDescriptor<TurnLifecyclePayload> =
+ defineEventHook<TurnLifecyclePayload>("session-orchestrator/turn-settled");
+
+// --- Warm service ---
+
+export interface WarmResult {
+ readonly inputTokens: number;
+ readonly outputTokens: number;
+ readonly cacheReadTokens: number;
+ readonly cacheWriteTokens: number;
+}
+
+export interface WarmService {
+ readonly warm: (
+ conversationId: string,
+ opts?: { readonly cwd?: string; readonly modelName?: string },
+ ) => Promise<WarmResult | { readonly error: string }>;
+}
+
+export const cacheWarmHandle: ServiceHandle<WarmService> = defineService<WarmService>(
+ "session-orchestrator/warm",
+);
+
export interface SessionOrchestrator {
handleMessage(input: {
conversationId: string;
@@ -45,28 +85,129 @@ export interface SessionOrchestratorDeps {
readonly logger?: Logger;
/** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */
readonly now?: () => number;
+ /** Emit a lifecycle event hook to subscribers. Injected from host. */
+ readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void;
}
-export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator {
- return {
+export interface SessionOrchestratorBundle {
+ readonly orchestrator: SessionOrchestrator;
+ /** The shared active-conversations set, for use by createWarmService. */
+ readonly activeConversations: ReadonlySet<string>;
+}
+
+export function createSessionOrchestrator(
+ deps: SessionOrchestratorDeps,
+): SessionOrchestratorBundle {
+ const activeConversations = new Set<string>();
+
+ const orchestrator: SessionOrchestrator = {
async handleMessage({ conversationId, text, onEvent, signal, modelName, cwd }) {
+ const payload: TurnLifecyclePayload = {
+ conversationId,
+ ...(cwd !== undefined ? { cwd } : {}),
+ ...(modelName !== undefined ? { modelName } : {}),
+ };
+ deps.emit?.(turnStarted, payload);
+ activeConversations.add(conversationId);
+
+ try {
+ const history = await deps.conversationStore.load(conversationId);
+ const userMsg = buildUserMessage(text);
+ const turnId = generateTurnId();
+
+ let provider: ProviderContract;
+ let modelOverride: string | undefined;
+
+ if (modelName !== undefined && deps.resolveModel !== undefined) {
+ const resolved = deps.resolveModel(modelName);
+ if (resolved === undefined) {
+ onEvent({
+ type: "error",
+ conversationId,
+ turnId,
+ message: `unknown model: ${modelName}`,
+ });
+ return;
+ }
+ provider = resolved.provider;
+ modelOverride = resolved.model;
+ } else {
+ provider = deps.resolveProvider();
+ }
+
+ const baseTools = deps.resolveTools();
+ const assembled = await deps.applyToolsFilter({
+ tools: baseTools,
+ conversationId,
+ ...(cwd !== undefined ? { cwd } : {}),
+ });
+ const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy();
+ const turnLogger = deps.logger?.child({ conversationId, turnId });
+ const metrics = createMetricsAccumulator();
+
+ const emitAndAccumulate = (event: AgentEvent): void => {
+ metrics.ingest(event);
+ onEvent(event);
+ };
+
+ const opts: RunTurnInput = {
+ provider,
+ messages: [...history, userMsg],
+ tools: assembled.tools,
+ dispatch,
+ emit: emitAndAccumulate,
+ conversationId,
+ turnId,
+ ...(modelOverride !== undefined
+ ? { providerOpts: { model: modelOverride } satisfies ProviderStreamOptions }
+ : {}),
+ ...(turnLogger !== undefined ? { logger: turnLogger } : {}),
+ ...(signal !== undefined ? { signal } : {}),
+ ...(cwd !== undefined ? { cwd } : {}),
+ ...(deps.now !== undefined ? { now: deps.now } : {}),
+ };
+
+ const result = await deps.runTurn(opts);
+
+ const toPersist: ChatMessage[] = [userMsg, ...result.messages];
+ await deps.conversationStore.append(conversationId, toPersist);
+
+ const turnMetrics = metrics.build(turnId);
+ await deps.conversationStore.appendMetrics(conversationId, turnMetrics);
+
+ onEvent({ type: "turn-sealed", conversationId, turnId });
+ } finally {
+ activeConversations.delete(conversationId);
+ deps.emit?.(turnSettled, payload);
+ }
+ },
+ };
+
+ return { orchestrator, activeConversations };
+}
+
+export function createWarmService(
+ deps: SessionOrchestratorDeps,
+ activeConversations: ReadonlySet<string>,
+): WarmService {
+ return {
+ async warm(conversationId, opts) {
+ if (activeConversations.has(conversationId)) {
+ return { error: "conversation is generating" };
+ }
+
const history = await deps.conversationStore.load(conversationId);
- const userMsg = buildUserMessage(text);
- const turnId = generateTurnId();
+ if (history.length === 0) {
+ return { error: "no history" };
+ }
let provider: ProviderContract;
let modelOverride: string | undefined;
- if (modelName !== undefined && deps.resolveModel !== undefined) {
- const resolved = deps.resolveModel(modelName);
+ if (opts?.modelName !== undefined && deps.resolveModel !== undefined) {
+ const resolved = deps.resolveModel(opts.modelName);
if (resolved === undefined) {
- onEvent({
- type: "error",
- conversationId,
- turnId,
- message: `unknown model: ${modelName}`,
- });
- return;
+ return { error: `unknown model: ${opts.modelName}` };
}
provider = resolved.provider;
modelOverride = resolved.model;
@@ -75,46 +216,38 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio
}
const baseTools = deps.resolveTools();
+ const cwd = opts?.cwd;
const assembled = await deps.applyToolsFilter({
tools: baseTools,
conversationId,
...(cwd !== undefined ? { cwd } : {}),
});
- const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy();
- const turnLogger = deps.logger?.child({ conversationId, turnId });
- const metrics = createMetricsAccumulator();
- const emitAndAccumulate = (event: AgentEvent): void => {
- metrics.ingest(event);
- onEvent(event);
+ const probeMsg: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "reply with just a ." }],
};
+ const messages = [...history, probeMsg];
- const opts: RunTurnInput = {
- provider,
- messages: [...history, userMsg],
- tools: assembled.tools,
- dispatch,
- emit: emitAndAccumulate,
- conversationId,
- turnId,
- ...(modelOverride !== undefined
- ? { providerOpts: { model: modelOverride } satisfies ProviderStreamOptions }
- : {}),
- ...(turnLogger !== undefined ? { logger: turnLogger } : {}),
- ...(signal !== undefined ? { signal } : {}),
- ...(cwd !== undefined ? { cwd } : {}),
- ...(deps.now !== undefined ? { now: deps.now } : {}),
- };
-
- const result = await deps.runTurn(opts);
+ const providerOpts: ProviderStreamOptions | undefined =
+ modelOverride !== undefined ? { model: modelOverride, maxTokens: 1 } : { maxTokens: 1 };
- const toPersist: ChatMessage[] = [userMsg, ...result.messages];
- await deps.conversationStore.append(conversationId, toPersist);
+ let inputTokens = 0;
+ let outputTokens = 0;
+ let cacheReadTokens = 0;
+ let cacheWriteTokens = 0;
- const turnMetrics = metrics.build(turnId);
- await deps.conversationStore.appendMetrics(conversationId, turnMetrics);
+ for await (const event of provider.stream(messages, assembled.tools, providerOpts)) {
+ if ((event as ProviderEvent).type === "usage") {
+ const usageEvent = event as UsageEvent;
+ inputTokens = usageEvent.usage.inputTokens;
+ outputTokens = usageEvent.usage.outputTokens;
+ cacheReadTokens = usageEvent.usage.cacheReadTokens ?? 0;
+ cacheWriteTokens = usageEvent.usage.cacheWriteTokens ?? 0;
+ }
+ }
- onEvent({ type: "turn-sealed", conversationId, turnId });
+ return { inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens };
},
};
}
diff --git a/packages/transport-http/src/server.bun.test.ts b/packages/transport-http/src/server.bun.test.ts
index 025042a..5b4c6aa 100644
--- a/packages/transport-http/src/server.bun.test.ts
+++ b/packages/transport-http/src/server.bun.test.ts
@@ -86,6 +86,7 @@ function createFakeHostAPI(configOverrides: Record<string, unknown> = {}): HostA
on() {
return () => {};
},
+ emit() {},
addFilter() {
return () => {};
},
diff --git a/tasks.md b/tasks.md
index 7ccfbcd..0bde08f 100644
--- a/tasks.md
+++ b/tasks.md
@@ -5,7 +5,7 @@
> Keep this lean and current; do not let it re-accrete a step-by-step changelog.
## Status (current)
-`tsc -b` EXIT 0 · biome clean · **734 vitest + 109 bun = 843 tests**.
+`tsc -b` EXIT 0 · biome clean · **760 vitest + 109 bun = 869 tests**.
Built and verified live (full-fidelity: every feature is a manifest-loaded
extension through the host):
@@ -140,6 +140,31 @@ no summary but still loadable. Glossary: added `skill`, `skill summary`, `tools
`tsc -b` EXIT 0, biome clean. (End-to-end load_skill via a real LLM turn not yet exercised —
unit/integration tests cover the filter rewrite + live read.)
+## Cache warming (core DONE; control surface PARTIAL)
+User-gated calls: target the external **Claude** provider (`../claude` provider-anthropic, loaded via
+`DISPATCH_EXTERNAL_EXTENSIONS`); warm-assembly lives in **session-orchestrator** (`warm()` reuses the
+real turn's assembly → byte-identical prefix, provider-agnostic); **surface system** for controls;
+**per-conversation** controls; interval default 4 min, free value. Old-code invariants honored
+(primary-model/full-prefix via reuse; refuse mid-turn; never persist/emit; in-flight invalidation;
+arm-on-settle/cancel-on-start; `pct = round(clamp(cacheRead/input,0,1)*100)`).
+- **Mechanism (2nd use of bus hooks; first event-hook emit):**
+ - [x] **kernel** — exposed `HostAPI.emit` (delegates to bus.emit), counterpart of `on`.
+ - [x] **session-orchestrator** — `turnStarted`/`turnSettled` event hooks (carry conversationId/cwd/
+ modelName) emitted per turn; `warm()` service (`cacheWarmHandle`) reusing assembly, refusing
+ mid-turn, never persisting/emitting; returns Usage.
+ - [x] **cache-warming** (new ext) — per-conversation timers (arm/cancel/in-flight token),
+ calls `warm()`, computes `lastPct`, persists `{enabled,intervalMs}` (default on/240s) in
+ host.storage; registers a controls Surface. 19 tests.
+ - [x] **host-bin** — registered cache-warming; **transport-http** HostAPI stub fixed for `emit`.
+- **Live-verified:** full-graph `tsc -b` EXIT 0, biome clean (boot smoke + live Claude warm pending
+ a restart with the cache-warming ext loaded).
+- **OPEN — surface-system limits (CR from cache-warming):** the surface system has (a) NO
+ per-conversation context (surface reflects most-recently-active conversation; invoke carries
+ conversationId), and (b) NO numeric-input field kind, so the **interval ("set time to refresh")
+ control is not yet a view input** — only the on/off toggle + last-cache-% stat render. Honoring
+ per-conversation controls + free-value interval needs a `NumberField` in `ui-contract` +
+ per-conversation surface scoping (+ FE courier). Decision pending.
+
## Open items
- **`prefix.fingerprint` / `warm|real` cache-bust attributes (deferred):** decoupled
from dedup by the content-addressed decision; also gated on cache-warming being
diff --git a/tsconfig.json b/tsconfig.json
index 8fd2d24..c5e797d 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -21,6 +21,7 @@
{ "path": "./packages/tool-edit-file" },
{ "path": "./packages/tool-write-file" },
{ "path": "./packages/skills" },
+ { "path": "./packages/cache-warming" },
{ "path": "./packages/cli" },
{ "path": "./packages/journal-sink" },
{ "path": "./packages/trace-store" },