diff options
22 files changed, 1422 insertions, 69 deletions
@@ -18,6 +18,16 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/cache-warming": { + "name": "@dispatch/cache-warming", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*", + "@dispatch/surface-registry": "workspace:*", + "@dispatch/ui-contract": "workspace:*", + }, + }, "packages/cli": { "name": "@dispatch/cli", "version": "0.0.0", @@ -45,6 +55,7 @@ "version": "0.0.0", "dependencies": { "@dispatch/auth-apikey": "workspace:*", + "@dispatch/cache-warming": "workspace:*", "@dispatch/conversation-store": "workspace:*", "@dispatch/credential-store": "workspace:*", "@dispatch/journal-sink": "workspace:*", @@ -243,6 +254,8 @@ "@dispatch/auth-apikey": ["@dispatch/auth-apikey@workspace:packages/auth-apikey"], + "@dispatch/cache-warming": ["@dispatch/cache-warming@workspace:packages/cache-warming"], + "@dispatch/cli": ["@dispatch/cli@workspace:packages/cli"], "@dispatch/conversation-store": ["@dispatch/conversation-store@workspace:packages/conversation-store"], diff --git a/packages/cache-warming/package.json b/packages/cache-warming/package.json new file mode 100644 index 0000000..eaf3fda --- /dev/null +++ b/packages/cache-warming/package.json @@ -0,0 +1,14 @@ +{ + "name": "@dispatch/cache-warming", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*", + "@dispatch/session-orchestrator": "workspace:*", + "@dispatch/surface-registry": "workspace:*", + "@dispatch/ui-contract": "workspace:*" + } +} diff --git a/packages/cache-warming/src/extension.ts b/packages/cache-warming/src/extension.ts new file mode 100644 index 0000000..16515a8 --- /dev/null +++ b/packages/cache-warming/src/extension.ts @@ -0,0 +1,130 @@ +import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; +import { cacheWarmHandle, turnSettled, turnStarted } from "@dispatch/session-orchestrator"; +import type { SurfaceProvider } from "@dispatch/surface-registry"; +import { surfaceRegistryHandle } from "@dispatch/surface-registry"; +import type { SurfaceSpec } from "@dispatch/ui-contract"; +import { createCacheWarmer } from "./warmer.js"; + +export const manifest: Manifest = { + id: "cache-warming", + name: "Cache Warming", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + activation: "eager", + dependsOn: ["session-orchestrator", "surface-registry"], + capabilities: {}, + contributes: { + services: ["cache-warming/surface"], + }, +}; + +function buildSurfaceSpec( + _conversationId: string | undefined, + enabled: boolean, + lastPct: number | null, +): SurfaceSpec { + const pctDisplay = lastPct === null ? "—" : `${lastPct}%`; + return { + id: "cache-warming", + region: "side", + title: "Cache Warming", + fields: [ + { + kind: "toggle", + label: "Enabled", + value: enabled, + action: { actionId: "cache-warming/toggle" }, + }, + { + kind: "stat", + label: "Last Cache %", + value: pctDisplay, + }, + ], + }; +} + +export function activate(host: HostAPI): void { + const warmService = host.getService(cacheWarmHandle); + const registry = host.getService(surfaceRegistryHandle); + const storage = host.storage("cache-warming"); + + let currentConversationId: string | undefined; + + // Timer wrapper: setTimeout/clearTimeout return Timeout in Node types, + // but our TimerDeps uses number ids. Map between them. + const timeoutMap = new Map<number, ReturnType<typeof setTimeout>>(); + let nextTimerId = 1; + + const warmer = createCacheWarmer({ + warm: warmService.warm, + storage, + logger: host.logger, + timers: { + setTimer(fn, ms) { + const id = nextTimerId++; + timeoutMap.set(id, setTimeout(fn, ms)); + return id; + }, + clearTimer(id) { + const timeout = timeoutMap.get(id); + if (timeout !== undefined) { + clearTimeout(timeout); + timeoutMap.delete(id); + } + }, + }, + onSurfaceChange: () => { + // Surface subscribers will re-fetch on next getSpec() + }, + }); + + host.on(turnStarted, (payload) => { + currentConversationId = payload.conversationId; + warmer.onTurnStarted(payload.conversationId); + }); + + host.on(turnSettled, (payload) => { + currentConversationId = payload.conversationId; + warmer.onTurnSettled(payload.conversationId, { + ...(payload.cwd !== undefined ? { cwd: payload.cwd } : {}), + ...(payload.modelName !== undefined ? { modelName: payload.modelName } : {}), + }); + }); + + const provider: SurfaceProvider = { + catalogEntry: { + id: "cache-warming", + region: "side", + title: "Cache Warming", + }, + getSpec() { + const convId = currentConversationId; + const state = + convId !== undefined ? warmer.getState(convId) : { enabled: true, lastPct: null }; + return buildSurfaceSpec(convId, state.enabled, state.lastPct); + }, + async invoke(actionId, payload) { + const pl = payload as Record<string, unknown> | undefined; + const convId = + (typeof pl?.conversationId === "string" ? pl.conversationId : undefined) ?? + currentConversationId; + if (convId === undefined) return; + + if (actionId === "cache-warming/toggle") { + const current = warmer.getState(convId); + await warmer.setEnabled(convId, !current.enabled); + } + }, + }; + + registry.register(provider); + + host.logger.info("cache-warming: registered"); +} + +export const extension: Extension = { + manifest, + activate, +}; diff --git a/packages/cache-warming/src/index.ts b/packages/cache-warming/src/index.ts new file mode 100644 index 0000000..8670dc5 --- /dev/null +++ b/packages/cache-warming/src/index.ts @@ -0,0 +1,19 @@ +export { extension, manifest } from "./extension.js"; +export { + type ConversationSettings, + type ConversationState, + computeCachePct, + DEFAULT_INTERVAL_MS, + isTokenCurrent, + MIN_INTERVAL_MS, + parseSettings, + serializeSettings, + settingsKey, + shouldWarm, +} from "./pure.js"; +export { + type CacheWarmer, + type CacheWarmerDeps, + createCacheWarmer, + type TimerDeps, +} from "./warmer.js"; diff --git a/packages/cache-warming/src/pure.test.ts b/packages/cache-warming/src/pure.test.ts new file mode 100644 index 0000000..820260b --- /dev/null +++ b/packages/cache-warming/src/pure.test.ts @@ -0,0 +1,109 @@ +import { describe, expect, it } from "vitest"; +import type { ConversationState } from "./pure.js"; +import { + computeCachePct, + isTokenCurrent, + parseSettings, + serializeSettings, + shouldWarm, +} from "./pure.js"; + +describe("computeCachePct", () => { + it("cacheRead/input rounded and clamped to 0..100", () => { + expect(computeCachePct(1000, 800)).toBe(80); + expect(computeCachePct(1000, 1200)).toBe(100); + expect(computeCachePct(1000, -100)).toBe(0); + expect(computeCachePct(1000, 0)).toBe(0); + expect(computeCachePct(1000, 333)).toBe(33); + }); + + it("zero input tokens → 0", () => { + expect(computeCachePct(0, 500)).toBe(0); + expect(computeCachePct(-1, 500)).toBe(0); + }); +}); + +describe("shouldWarm", () => { + it("returns true when enabled, idle, and token matches", () => { + const state: ConversationState = { + enabled: true, + intervalMs: 240_000, + active: false, + lastPct: null, + token: 5, + }; + expect(shouldWarm(state, 5)).toBe(true); + }); + + it("returns false when disabled", () => { + const state: ConversationState = { + enabled: false, + intervalMs: 240_000, + active: false, + lastPct: null, + token: 5, + }; + expect(shouldWarm(state, 5)).toBe(false); + }); + + it("returns false when active", () => { + const state: ConversationState = { + enabled: true, + intervalMs: 240_000, + active: true, + lastPct: null, + token: 5, + }; + expect(shouldWarm(state, 5)).toBe(false); + }); + + it("returns false when token is superseded", () => { + const state: ConversationState = { + enabled: true, + intervalMs: 240_000, + active: false, + lastPct: null, + token: 5, + }; + expect(shouldWarm(state, 6)).toBe(false); + }); +}); + +describe("isTokenCurrent", () => { + it("returns true when tokens match", () => { + expect(isTokenCurrent(5, 5)).toBe(true); + }); + + it("returns false when tokens differ", () => { + expect(isTokenCurrent(5, 6)).toBe(false); + }); +}); + +describe("parseSettings/serializeSettings round-trip", () => { + it("round-trips enabled + intervalMs", () => { + const original = { enabled: false, intervalMs: 120_000 }; + const serialized = serializeSettings(original); + const parsed = parseSettings(serialized); + expect(parsed).toEqual(original); + }); + + it("returns defaults for null input", () => { + const parsed = parseSettings(null); + expect(parsed).toEqual({ enabled: true, intervalMs: 240_000 }); + }); + + it("returns defaults for malformed JSON", () => { + const parsed = parseSettings("not-json{{{"); + expect(parsed).toEqual({ enabled: true, intervalMs: 240_000 }); + }); + + it("clamps non-positive interval to MIN_INTERVAL_MS", () => { + const parsed = parseSettings(JSON.stringify({ enabled: true, intervalMs: -500 })); + expect(parsed.intervalMs).toBe(1000); + }); + + it("uses default for NaN interval", () => { + const parsed = parseSettings(JSON.stringify({ enabled: true, intervalMs: Number.NaN })); + expect(parsed.intervalMs).toBe(240_000); + }); +}); diff --git a/packages/cache-warming/src/pure.ts b/packages/cache-warming/src/pure.ts new file mode 100644 index 0000000..2b00dab --- /dev/null +++ b/packages/cache-warming/src/pure.ts @@ -0,0 +1,95 @@ +/** + * Pure core for cache-warming — zero I/O, zero ambient state. + * Every function is input → output; testable without mocks. + */ + +// --- Types --- + +/** Persisted per-conversation settings (storage-facing). */ +export interface ConversationSettings { + readonly enabled: boolean; + readonly intervalMs: number; +} + +/** Full per-conversation runtime state (in-memory, not persisted). */ +export interface ConversationState extends ConversationSettings { + readonly active: boolean; + readonly lastPct: number | null; + readonly token: number; +} + +/** Context stored per-conversation from the latest lifecycle event. */ +export interface ConversationContext { + readonly cwd?: string; + readonly modelName?: string; +} + +export const DEFAULT_INTERVAL_MS = 240_000; +export const MIN_INTERVAL_MS = 1000; + +// --- Pure functions --- + +/** + * Compute cache-hit percentage from token counts. + * Returns an integer in [0, 100]. inputTokens ≤ 0 → 0. + */ +export function computeCachePct(inputTokens: number, cacheReadTokens: number): number { + if (inputTokens <= 0) return 0; + const ratio = cacheReadTokens / inputTokens; + const clamped = Math.max(0, Math.min(1, ratio)); + return Math.round(clamped * 100); +} + +/** + * Decide whether a conversation should be warmed right now. + * Requires: enabled, idle (not active), and the token is current (not superseded). + */ +export function shouldWarm(state: ConversationState, currentToken: number): boolean { + return state.enabled && !state.active && state.token === currentToken; +} + +/** + * Check whether a token is still current (not superseded by a newer cancel/fire). + */ +export function isTokenCurrent(current: number, expected: number): boolean { + return current === expected; +} + +const SETTINGS_KEY = "settings"; + +/** + * Parse settings from a raw storage string. + * Returns defaults if null or malformed. + */ +export function parseSettings(raw: string | null): ConversationSettings { + if (raw === null) return { enabled: true, intervalMs: DEFAULT_INTERVAL_MS }; + try { + const parsed: unknown = JSON.parse(raw); + if (typeof parsed !== "object" || parsed === null) { + return { enabled: true, intervalMs: DEFAULT_INTERVAL_MS }; + } + const obj = parsed as Record<string, unknown>; + const enabled = typeof obj.enabled === "boolean" ? obj.enabled : true; + const rawInterval = obj.intervalMs; + let intervalMs = DEFAULT_INTERVAL_MS; + if (typeof rawInterval === "number" && Number.isFinite(rawInterval)) { + intervalMs = + rawInterval <= 0 ? MIN_INTERVAL_MS : Math.max(MIN_INTERVAL_MS, Math.round(rawInterval)); + } + return { enabled, intervalMs }; + } catch { + return { enabled: true, intervalMs: DEFAULT_INTERVAL_MS }; + } +} + +/** + * Serialize settings for storage. + */ +export function serializeSettings(settings: ConversationSettings): string { + return JSON.stringify(settings); +} + +/** The storage key for a conversation's settings. */ +export function settingsKey(conversationId: string): string { + return `${SETTINGS_KEY}:${conversationId}`; +} diff --git a/packages/cache-warming/src/warmer.test.ts b/packages/cache-warming/src/warmer.test.ts new file mode 100644 index 0000000..9b9ba93 --- /dev/null +++ b/packages/cache-warming/src/warmer.test.ts @@ -0,0 +1,207 @@ +import type { Logger, Span } from "@dispatch/kernel"; +import type { WarmResult } from "@dispatch/session-orchestrator"; +import { describe, expect, it } from "vitest"; +import { createCacheWarmer, type TimerDeps } from "./warmer.js"; + +function memStorage(): StorageNamespace { + const map = new Map<string, string>(); + return { + get: async (k) => map.get(k) ?? null, + set: async (k, v) => { + map.set(k, v); + }, + delete: async (k) => { + map.delete(k); + }, + has: async (k) => map.has(k), + keys: async (prefix) => + [...map.keys()].filter((k) => (prefix === undefined ? true : k.startsWith(prefix))), + }; +} + +function makeSpan(): Span { + const span: Span = { + id: "span", + log: makeLogger(), + setAttributes: () => {}, + addLink: () => {}, + child: () => makeSpan(), + end: () => {}, + }; + return span; +} + +function makeLogger(): Logger { + return { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + child: () => makeLogger(), + span: () => makeSpan(), + }; +} + +function fakeTimers(): TimerDeps & { flush: () => void } { + let nextId = 1; + const pending = new Map<number, () => void>(); + return { + setTimer(fn, _ms) { + const id = nextId++; + pending.set(id, fn); + return id; + }, + clearTimer(id) { + pending.delete(id); + }, + flush() { + const fns = [...pending.values()]; + pending.clear(); + for (const fn of fns) fn(); + }, + }; +} + +const WARM_RESULT: WarmResult = { + inputTokens: 1000, + outputTokens: 10, + cacheReadTokens: 800, + cacheWriteTokens: 0, +}; + +import type { StorageNamespace } from "@dispatch/kernel"; + +describe("CacheWarmer", () => { + it("arms a timer on turnSettled and warms when it fires (enabled)", async () => { + const timers = fakeTimers(); + const warmCalls: string[] = []; + const warmer = createCacheWarmer({ + warm: async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + warmer.onTurnSettled("conv-1", {}); + timers.flush(); + + await new Promise((r) => setTimeout(r, 10)); + expect(warmCalls).toContain("conv-1"); + }); + + it("cancels the timer on turnStarted (no warm while generating)", () => { + const timers = fakeTimers(); + const warmCalls: string[] = []; + const warmer = createCacheWarmer({ + warm: async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + warmer.onTurnSettled("conv-1", {}); + warmer.onTurnStarted("conv-1"); + timers.flush(); + + expect(warmCalls).toHaveLength(0); + }); + + it("in-flight warm result is dropped when superseded (token mismatch)", async () => { + const timers = fakeTimers(); + let resolveWarm: (v: WarmResult) => void = () => {}; + const warmPromise = new Promise<WarmResult>((r) => { + resolveWarm = r; + }); + const warmer = createCacheWarmer({ + warm: () => warmPromise, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + warmer.onTurnSettled("conv-1", {}); + timers.flush(); + + warmer.onTurnStarted("conv-1"); + warmer.onTurnSettled("conv-1", {}); + + resolveWarm?.(WARM_RESULT); + await new Promise((r) => setTimeout(r, 10)); + + const state = warmer.getState("conv-1"); + expect(state.lastPct).toBeNull(); + }); + + it("disabled conversation does not warm", async () => { + const timers = fakeTimers(); + const warmCalls: string[] = []; + const warmer = createCacheWarmer({ + warm: async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + await warmer.setEnabled("conv-1", false); + warmer.onTurnSettled("conv-1", {}); + timers.flush(); + + await new Promise((r) => setTimeout(r, 10)); + expect(warmCalls).toHaveLength(0); + }); + + it("stores lastPct from the warm result", async () => { + const timers = fakeTimers(); + const warmer = createCacheWarmer({ + warm: async () => WARM_RESULT, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + warmer.onTurnSettled("conv-1", {}); + timers.flush(); + + await new Promise((r) => setTimeout(r, 10)); + const state = warmer.getState("conv-1"); + expect(state.lastPct).toBe(80); + }); + + it("re-arms timer after warm completes", async () => { + const timers = fakeTimers(); + let warmCount = 0; + const warmer = createCacheWarmer({ + warm: async () => { + warmCount++; + return WARM_RESULT; + }, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + warmer.onTurnSettled("conv-1", {}); + timers.flush(); + await new Promise((r) => setTimeout(r, 10)); + + timers.flush(); + await new Promise((r) => setTimeout(r, 10)); + + expect(warmCount).toBe(2); + }); +}); diff --git a/packages/cache-warming/src/warmer.ts b/packages/cache-warming/src/warmer.ts new file mode 100644 index 0000000..31dd41e --- /dev/null +++ b/packages/cache-warming/src/warmer.ts @@ -0,0 +1,248 @@ +import type { Logger, StorageNamespace } from "@dispatch/kernel"; +import type { WarmService } from "@dispatch/session-orchestrator"; +import { + type ConversationContext, + type ConversationSettings, + type ConversationState, + computeCachePct, + DEFAULT_INTERVAL_MS, + isTokenCurrent, + MIN_INTERVAL_MS, + parseSettings, + serializeSettings, + settingsKey, + shouldWarm, +} from "./pure.js"; + +// --- Timer abstraction (injectable for tests) --- + +export interface TimerDeps { + readonly setTimer: (fn: () => void, ms: number) => number; + readonly clearTimer: (id: number) => void; +} + +// --- Warmer interface --- + +export interface CacheWarmer { + /** Handle a turnStarted event — mark conversation active, cancel pending warm. */ + readonly onTurnStarted: (conversationId: string) => void; + + /** Handle a turnSettled event — mark idle, store context, arm timer if enabled. */ + readonly onTurnSettled: (conversationId: string, ctx: ConversationContext) => void; + + /** Get the current state for a conversation (for surface rendering). */ + readonly getState: (conversationId: string) => ConversationState; + + /** Get the stored context for a conversation. */ + readonly getContext: (conversationId: string) => ConversationContext; + + /** Toggle enabled for a conversation. Returns updated settings. */ + readonly setEnabled: (conversationId: string, enabled: boolean) => Promise<ConversationSettings>; + + /** Set the refresh interval for a conversation. Returns updated settings. */ + readonly setIntervalMs: ( + conversationId: string, + intervalMs: number, + ) => Promise<ConversationSettings>; + + /** Dispose all timers (for cleanup). */ + readonly dispose: () => void; +} + +export interface CacheWarmerDeps { + readonly warm: WarmService["warm"]; + readonly storage: StorageNamespace; + readonly logger: Logger; + readonly timers: TimerDeps; + /** Called when surface subscribers should re-fetch the spec. */ + readonly onSurfaceChange: () => void; +} + +const DEFAULT_STATE: ConversationState = { + enabled: true, + intervalMs: DEFAULT_INTERVAL_MS, + active: false, + lastPct: null, + token: 0, +}; + +export function createCacheWarmer(deps: CacheWarmerDeps): CacheWarmer { + // Per-conversation runtime state (not persisted — reconstructed from storage + events) + const states = new Map<string, ConversationState>(); + // Per-conversation context from latest lifecycle event + const contexts = new Map<string, ConversationContext>(); + // Per-conversation pending timer id + const timers = new Map<string, number>(); + // Monotonic token per conversation for in-flight invalidation + let nextToken = 1; + + function getState(conversationId: string): ConversationState { + return states.get(conversationId) ?? DEFAULT_STATE; + } + + function getContext(conversationId: string): ConversationContext { + return contexts.get(conversationId) ?? {}; + } + + function setState(conversationId: string, state: ConversationState): void { + states.set(conversationId, state); + } + + function cancelTimer(conversationId: string): void { + const existing = timers.get(conversationId); + if (existing !== undefined) { + deps.timers.clearTimer(existing); + timers.delete(conversationId); + } + } + + function armTimer(conversationId: string): void { + cancelTimer(conversationId); + const state = getState(conversationId); + if (!state.enabled || state.active) return; + + const token = nextToken++; + setState(conversationId, { ...state, token }); + + const timerId = deps.timers.setTimer(() => { + timers.delete(conversationId); + void fireWarm(conversationId, token); + }, state.intervalMs); + + timers.set(conversationId, timerId); + } + + async function fireWarm(conversationId: string, token: number): Promise<void> { + const state = getState(conversationId); + if (!shouldWarm(state, token)) { + deps.logger.debug("cache-warming: skip warm (superseded or disabled)", { + conversationId, + }); + return; + } + + const ctx = getContext(conversationId); + deps.logger.debug("cache-warming: firing warm", { conversationId }); + + const result = await deps.warm(conversationId, { + ...(ctx.cwd !== undefined ? { cwd: ctx.cwd } : {}), + ...(ctx.modelName !== undefined ? { modelName: ctx.modelName } : {}), + }); + + // Re-check token after async warm — result may be stale + const currentState = getState(conversationId); + if (!isTokenCurrent(currentState.token, token)) { + deps.logger.debug("cache-warming: discarding stale warm result", { + conversationId, + }); + return; + } + + if ("error" in result) { + deps.logger.debug("cache-warming: warm returned error (normal)", { + conversationId, + error: result.error, + }); + } else { + const pct = computeCachePct(result.inputTokens, result.cacheReadTokens); + setState(conversationId, { ...currentState, lastPct: pct }); + deps.onSurfaceChange(); + deps.logger.debug("cache-warming: warm complete", { + conversationId, + pct, + }); + } + + // Re-arm for next cycle + armTimer(conversationId); + } + + async function loadSettings(conversationId: string): Promise<ConversationSettings> { + const raw = await deps.storage.get(settingsKey(conversationId)); + return parseSettings(raw); + } + + async function persistSettings( + conversationId: string, + settings: ConversationSettings, + ): Promise<void> { + await deps.storage.set(settingsKey(conversationId), serializeSettings(settings)); + } + + function mergeState( + conversationId: string, + partial: Partial<ConversationState>, + ): ConversationState { + const current = getState(conversationId); + const updated = { ...current, ...partial }; + setState(conversationId, updated); + return updated; + } + + return { + onTurnStarted(conversationId) { + deps.logger.debug("cache-warming: turn started", { conversationId }); + mergeState(conversationId, { active: true }); + cancelTimer(conversationId); + }, + + onTurnSettled(conversationId, ctx) { + deps.logger.debug("cache-warming: turn settled", { conversationId }); + contexts.set(conversationId, ctx); + mergeState(conversationId, { active: false }); + + const state = getState(conversationId); + if (state.enabled) { + armTimer(conversationId); + } + }, + + getState, + getContext, + + async setEnabled(conversationId, enabled) { + const settings = await loadSettings(conversationId); + const updated = { ...settings, enabled }; + await persistSettings(conversationId, updated); + mergeState(conversationId, { enabled }); + + if (enabled) { + const state = getState(conversationId); + if (!state.active) { + armTimer(conversationId); + } + } else { + cancelTimer(conversationId); + } + + deps.onSurfaceChange(); + return updated; + }, + + async setIntervalMs(conversationId, intervalMs) { + const clamped = + !Number.isFinite(intervalMs) || intervalMs <= 0 + ? MIN_INTERVAL_MS + : Math.max(MIN_INTERVAL_MS, Math.round(intervalMs)); + const settings = await loadSettings(conversationId); + const updated = { ...settings, intervalMs: clamped }; + await persistSettings(conversationId, updated); + mergeState(conversationId, { intervalMs: clamped }); + + // Re-arm with new interval if currently armed + const state = getState(conversationId); + if (state.enabled && !state.active && timers.has(conversationId)) { + armTimer(conversationId); + } + + deps.onSurfaceChange(); + return updated; + }, + + dispose() { + for (const [conversationId] of timers) { + cancelTimer(conversationId); + } + }, + }; +} diff --git a/packages/cache-warming/tsconfig.json b/packages/cache-warming/tsconfig.json new file mode 100644 index 0000000..6557731 --- /dev/null +++ b/packages/cache-warming/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [ + { "path": "../kernel" }, + { "path": "../session-orchestrator" }, + { "path": "../surface-registry" }, + { "path": "../ui-contract" } + ] +} diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json index 6a1d24a..0d2b817 100644 --- a/packages/host-bin/package.json +++ b/packages/host-bin/package.json @@ -8,6 +8,7 @@ "@dispatch/storage-sqlite": "workspace:*", "@dispatch/conversation-store": "workspace:*", "@dispatch/auth-apikey": "workspace:*", + "@dispatch/cache-warming": "workspace:*", "@dispatch/credential-store": "workspace:*", "@dispatch/provider-openai-compat": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index ef75f55..1594dcc 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -1,6 +1,7 @@ import { mkdirSync } from "node:fs"; import { dirname } from "node:path"; import { extension as authApikeyExt } from "@dispatch/auth-apikey"; +import { extension as cacheWarmingExt } from "@dispatch/cache-warming"; import { extension as conversationStoreExt } from "@dispatch/conversation-store"; import { createCredentialStoreExtension } from "@dispatch/credential-store"; import { createJournalSink } from "@dispatch/journal-sink"; @@ -73,6 +74,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [ throughputStoreExt, sessionOrchestratorExt, skillsExt, + cacheWarmingExt, createTransportHttpExtension(), // Surface extensions — dependency order: surface-registry first, then consumers. createSurfaceRegistryExtension(), diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json index b357c51..77de667 100644 --- a/packages/host-bin/tsconfig.json +++ b/packages/host-bin/tsconfig.json @@ -3,6 +3,7 @@ "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, "include": ["src/**/*.ts"], "references": [ + { "path": "../cache-warming" }, { "path": "../kernel" }, { "path": "../storage-sqlite" }, { "path": "../surface-loaded-extensions" }, diff --git a/packages/kernel/src/contracts/extension.ts b/packages/kernel/src/contracts/extension.ts index 5a7821b..4d6cf07 100644 --- a/packages/kernel/src/contracts/extension.ts +++ b/packages/kernel/src/contracts/extension.ts @@ -190,6 +190,18 @@ export interface HostAPI { handler: EventHandler<TPayload>, ) => () => void; + /** + * Emit an event hook: fire-and-forget dispatch to every `on` subscriber, + * error-isolated per handler (a thrown handler is caught + logged, never + * breaks the caller). The counterpart of `on`. + * + * This lets a core extension that OWNS a lifecycle publish typed events that + * standard extensions react to — e.g. the session-orchestrator emitting + * per-turn start/settle events a cache-warming extension subscribes to. The + * kernel owns the mechanism; the owner declares the typed `EventHookDescriptor`. + */ + readonly emit: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void; + /** Add a filter to a filter hook chain. Filters are awaited in-band. */ readonly addFilter: <TValue>( hook: FilterDescriptor<TValue>, diff --git a/packages/kernel/src/host/host.test.ts b/packages/kernel/src/host/host.test.ts index 669d093..0067091 100644 --- a/packages/kernel/src/host/host.test.ts +++ b/packages/kernel/src/host/host.test.ts @@ -617,6 +617,49 @@ describe("createHost", () => { expect(received).toEqual(["hello"]); }); + it("emit dispatches to handlers registered via on", async () => { + const hook = defineEventHook<string>("test/emit-dispatch"); + const received: string[] = []; + + const ext = createExtension("emit-ext", { + activate: (host) => { + host.on(hook, (payload) => { + received.push(payload); + }); + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + const api = host.getHostAPI(); + api.emit(hook, "world"); + expect(received).toEqual(["world"]); + }); + + it("emit isolates a throwing handler (does not propagate)", async () => { + const hook = defineEventHook<string>("test/emit-isolation"); + const received: string[] = []; + + const ext = createExtension("emit-isolation-ext", { + activate: (host) => { + host.on(hook, () => { + throw new Error("handler boom"); + }); + host.on(hook, (payload) => { + received.push(payload); + }); + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + const api = host.getHostAPI(); + expect(() => api.emit(hook, "safe")).not.toThrow(); + expect(received).toEqual(["safe"]); + }); + it("applyFilters threads a value through registered filters in order", async () => { const hook = defineFilter<string>("test/text-transform"); diff --git a/packages/kernel/src/host/host.ts b/packages/kernel/src/host/host.ts index a6396a9..2a262be 100644 --- a/packages/kernel/src/host/host.ts +++ b/packages/kernel/src/host/host.ts @@ -122,6 +122,9 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho on<TPayload>(hook: EventHookDescriptor<TPayload>, handler: EventHandler<TPayload>) { return deps.bus.on(hook, handler); }, + emit<TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) { + deps.bus.emit(hook, payload); + }, addFilter<TValue>(hook: FilterDescriptor<TValue>, fn: FilterHandler<TValue>) { return deps.bus.addFilter(hook, fn); }, diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index 697eb4a..12d387c 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -3,8 +3,9 @@ import { credentialStoreHandle } from "@dispatch/credential-store"; import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; import { runTurn } from "@dispatch/kernel"; import { + cacheWarmHandle, createSessionOrchestrator, - type SessionOrchestrator, + createWarmService, sessionOrchestratorHandle, } from "./orchestrator.js"; import { selectFirstProvider } from "./pure.js"; @@ -19,14 +20,15 @@ export const manifest: Manifest = { dependsOn: ["conversation-store", "credential-store"], activation: "eager", contributes: { - services: ["session-orchestrator/orchestrator"], + services: ["session-orchestrator/orchestrator", "session-orchestrator/warm"], + hooks: ["session-orchestrator/turn-started", "session-orchestrator/turn-settled"], }, }; export function activate(host: HostAPI): void { const conversationStore = host.getService(conversationStoreHandle); - const orchestrator: SessionOrchestrator = createSessionOrchestrator({ + const { orchestrator, activeConversations } = createSessionOrchestrator({ conversationStore, resolveProvider: () => selectFirstProvider(host.getProviders()), resolveTools: () => [...host.getTools().values()], @@ -41,9 +43,32 @@ export function activate(host: HostAPI): void { runTurn, logger: host.logger, now: () => Date.now(), + emit: (hook, payload) => host.emit(hook, payload), }); host.provideService(sessionOrchestratorHandle, orchestrator); + + const warmService = createWarmService( + { + conversationStore, + resolveProvider: () => selectFirstProvider(host.getProviders()), + resolveTools: () => [...host.getTools().values()], + resolveModel: (modelName: string) => { + const store = host.getService(credentialStoreHandle); + const r = store.resolve(modelName); + if (r === undefined) return undefined; + const provider = host.getProviders().get(r.providerId); + return provider ? { provider, model: r.model } : undefined; + }, + applyToolsFilter: (assembly) => host.applyFilters(toolsFilter, assembly), + runTurn, + logger: host.logger, + now: () => Date.now(), + }, + activeConversations, + ); + + host.provideService(cacheWarmHandle, warmService); } export const extension: Extension = { diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts index 071b616..37ae5ce 100644 --- a/packages/session-orchestrator/src/index.ts +++ b/packages/session-orchestrator/src/index.ts @@ -1,9 +1,17 @@ export { extension, manifest } from "./extension.js"; export { + cacheWarmHandle, createSessionOrchestrator, + createWarmService, type SessionOrchestrator, + type SessionOrchestratorBundle, type SessionOrchestratorDeps, sessionOrchestratorHandle, + type TurnLifecyclePayload, + turnSettled, + turnStarted, + type WarmResult, + type WarmService, } from "./orchestrator.js"; export { buildUserMessage, diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index dcaad7d..5d512ea 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -2,6 +2,7 @@ import type { ConversationStore } from "@dispatch/conversation-store"; import type { AgentEvent, ChatMessage, + EventHookDescriptor, ProviderContract, ProviderEvent, RunTurnInput, @@ -12,7 +13,11 @@ import type { } from "@dispatch/kernel"; import { runTurn } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; -import { createSessionOrchestrator } from "./orchestrator.js"; +import { + createSessionOrchestrator, + createWarmService, + type TurnLifecyclePayload, +} from "./orchestrator.js"; import type { ToolAssembly } from "./tools-filter.js"; function createInMemoryStore(): ConversationStore & { @@ -104,7 +109,7 @@ describe("handleMessage integration", () => { ], ]); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -156,7 +161,7 @@ describe("handleMessage integration", () => { }, }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -203,7 +208,7 @@ describe("handleMessage integration", () => { ], ]); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -233,7 +238,7 @@ describe("handleMessage integration", () => { ], ]); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -282,7 +287,7 @@ describe("handleMessage model resolution", () => { const fallbackProvider: ProviderContract = { id: "fallback", stream: async function* () {} }; const { captured, captureRunTurn } = createCapturingRunTurn(); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => fallbackProvider, resolveTools: () => [], @@ -314,7 +319,7 @@ describe("handleMessage model resolution", () => { const { captured, captureRunTurn } = createCapturingRunTurn(); const events: AgentEvent[] = []; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => fallbackProvider, resolveTools: () => [], @@ -345,7 +350,7 @@ describe("handleMessage model resolution", () => { const fallbackProvider: ProviderContract = { id: "fallback", stream: async function* () {} }; const { captured, captureRunTurn } = createCapturingRunTurn(); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => fallbackProvider, resolveTools: () => [], @@ -373,7 +378,7 @@ describe("handleMessage model resolution", () => { const provider: ProviderContract = { id: "p", stream: async function* () {} }; const { captured, captureRunTurn } = createCapturingRunTurn(); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -407,7 +412,7 @@ describe("handleMessage model resolution", () => { const { captured, captureRunTurn } = createCapturingRunTurn(); const fakeNow = () => 42; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -432,7 +437,7 @@ describe("handleMessage model resolution", () => { const provider: ProviderContract = { id: "p", stream: async function* () {} }; const { captured, captureRunTurn } = createCapturingRunTurn(); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -461,7 +466,7 @@ describe("turn-sealed event", () => { ], ]); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -514,7 +519,7 @@ describe("turn-sealed event", () => { }, }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: wrappedStore, resolveProvider: () => provider, resolveTools: () => [], @@ -561,7 +566,7 @@ describe("turn-sealed event", () => { }, }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: failingStore, resolveProvider: () => provider, resolveTools: () => [], @@ -595,7 +600,7 @@ describe("turn metrics persistence", () => { ], ]); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -655,7 +660,7 @@ describe("turn metrics persistence", () => { }, }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [tool], @@ -714,7 +719,7 @@ describe("turn metrics persistence", () => { }, }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -781,7 +786,7 @@ describe("turn metrics persistence", () => { }, }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [tool], @@ -835,7 +840,7 @@ describe("turn metrics persistence", () => { }, }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: failingMetricsStore, resolveProvider: () => provider, resolveTools: () => [], @@ -874,7 +879,7 @@ describe("tools filter", () => { return Promise.resolve({ ...assembly, tools: [toolB] }); }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [toolA], @@ -902,7 +907,7 @@ describe("tools filter", () => { const toolA = createFakeTool("tool-a", async () => ({ content: "a" })); const toolB = createFakeTool("tool-b", async () => ({ content: "b" })); - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [toolA, toolB], @@ -933,7 +938,7 @@ describe("tools filter", () => { return Promise.resolve(assembly); }; - const orchestrator = createSessionOrchestrator({ + const { orchestrator } = createSessionOrchestrator({ conversationStore: store, resolveProvider: () => provider, resolveTools: () => [], @@ -964,3 +969,250 @@ function createCounterNow(): { now: () => number; tick: (ms: number) => void } { }, }; } + +describe("lifecycle event hooks", () => { + it("emits turnStarted before and turnSettled after a turn", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const emitted: Array<{ hook: string; payload: TurnLifecyclePayload; order: number }> = []; + let order = 0; + + const fakeEmit = <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload): void => { + emitted.push({ hook: hook.id, payload: payload as TurnLifecyclePayload, order: order++ }); + }; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + emit: fakeEmit, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-lifecycle", + text: "test", + onEvent: () => {}, + cwd: "/work", + modelName: "mymodel", + }); + + expect(emitted).toHaveLength(2); + expect(emitted[0]?.hook).toBe("session-orchestrator/turn-started"); + expect(emitted[0]?.payload.conversationId).toBe("conv-lifecycle"); + expect(emitted[0]?.payload.cwd).toBe("/work"); + expect(emitted[0]?.payload.modelName).toBe("mymodel"); + expect(emitted[0]?.order).toBe(0); + + expect(emitted[1]?.hook).toBe("session-orchestrator/turn-settled"); + expect(emitted[1]?.payload.conversationId).toBe("conv-lifecycle"); + expect(emitted[1]?.payload.cwd).toBe("/work"); + expect(emitted[1]?.payload.modelName).toBe("mymodel"); + expect(emitted[1]?.order).toBe(1); + }); +}); + +describe("warm service", () => { + it("warm reuses the assembled tools + full history and appends the probe turn", async () => { + const store = createInMemoryStore(); + const existingMsg: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "existing" }], + }; + const assistantMsg: ChatMessage = { + role: "assistant", + chunks: [{ type: "text", text: "reply" }], + }; + await store.append("conv-warm-reuse", [existingMsg, assistantMsg]); + + let capturedMessages: readonly ChatMessage[] | undefined; + let capturedTools: readonly ToolContract[] | undefined; + let _capturedOpts: unknown; + + const toolA = createFakeTool("tool-a", async () => ({ content: "a" })); + + const provider: ProviderContract = { + id: "warm-provider", + stream(messages, tools, opts) { + capturedMessages = messages; + capturedTools = tools; + _capturedOpts = opts; + return (async function* () { + yield { + type: "usage", + usage: { inputTokens: 100, outputTokens: 5, cacheReadTokens: 80, cacheWriteTokens: 20 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const deps = { + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [toolA], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + }; + + const { activeConversations } = createSessionOrchestrator(deps); + const warmService = createWarmService(deps, activeConversations); + + const result = await warmService.warm("conv-warm-reuse", { cwd: "/test" }); + + expect(capturedMessages).toBeDefined(); + expect(capturedMessages).toHaveLength(3); + expect(capturedMessages?.[0]?.chunks[0]).toEqual({ type: "text", text: "existing" }); + expect(capturedMessages?.[1]?.chunks[0]).toEqual({ type: "text", text: "reply" }); + expect(capturedMessages?.[2]?.role).toBe("user"); + expect((capturedMessages?.[2]?.chunks[0] as { type: "text"; text: string }).text).toBe( + "reply with just a .", + ); + + expect(capturedTools).toHaveLength(1); + expect(capturedTools?.[0]?.name).toBe("tool-a"); + + if ("inputTokens" in result) { + expect(result.inputTokens).toBe(100); + expect(result.cacheReadTokens).toBe(80); + } + }); + + it("warm refuses while the conversation is generating", async () => { + const store = createInMemoryStore(); + let resolveRunTurn: (() => void) | undefined; + const runTurnBlocker = new Promise<void>((resolve) => { + resolveRunTurn = resolve; + }); + + const provider: ProviderContract = { + id: "p", + stream: async function* () { + yield { type: "text-delta", delta: "slow" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + }, + }; + + const blockingRunTurn = async (_input: RunTurnInput): Promise<RunTurnResult> => { + await runTurnBlocker; + return { + messages: [{ role: "assistant", chunks: [{ type: "text", text: "done" }] }], + usage: { inputTokens: 1, outputTokens: 1 }, + finishReason: "stop", + }; + }; + + const deps = { + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn: blockingRunTurn, + }; + + const { orchestrator, activeConversations } = createSessionOrchestrator(deps); + const warmService = createWarmService(deps, activeConversations); + + const turnPromise = orchestrator.handleMessage({ + conversationId: "conv-blocking", + text: "test", + onEvent: () => {}, + }); + + const warmResult = await warmService.warm("conv-blocking"); + expect(warmResult).toEqual({ error: "conversation is generating" }); + + resolveRunTurn?.(); + await turnPromise; + }); + + it("warm never persists (no append) and emits no AgentEvents", async () => { + const store = createInMemoryStore(); + const existingMsg: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "existing" }], + }; + await store.append("conv-no-persist", [existingMsg]); + + const provider: ProviderContract = { + id: "p", + stream: async function* () { + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 2 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + }, + }; + + const deps = { + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + }; + + const { activeConversations } = createSessionOrchestrator(deps); + const warmService = createWarmService(deps, activeConversations); + + const sizeBefore = store.data.get("conv-no-persist")?.length; + + await warmService.warm("conv-no-persist"); + + const sizeAfter = store.data.get("conv-no-persist")?.length; + expect(sizeAfter).toBe(sizeBefore); + }); + + it("warm returns provider usage (input + cacheReadTokens)", async () => { + const store = createInMemoryStore(); + const existingMsg: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "existing" }], + }; + await store.append("conv-usage", [existingMsg]); + + const provider: ProviderContract = { + id: "p", + stream: async function* () { + yield { + type: "usage", + usage: { + inputTokens: 500, + outputTokens: 3, + cacheReadTokens: 400, + cacheWriteTokens: 100, + }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + }, + }; + + const deps = { + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + }; + + const { activeConversations } = createSessionOrchestrator(deps); + const warmService = createWarmService(deps, activeConversations); + + const result = await warmService.warm("conv-usage"); + + expect(result).toEqual({ + inputTokens: 500, + outputTokens: 3, + cacheReadTokens: 400, + cacheWriteTokens: 100, + }); + }); +}); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 2d1bbf5..c39bc06 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -2,19 +2,59 @@ import type { ConversationStore } from "@dispatch/conversation-store"; import type { AgentEvent, ChatMessage, + EventHookDescriptor, Logger, ProviderContract, + ProviderEvent, ProviderStreamOptions, RunTurnInput, RunTurnResult, ToolContract, ToolDispatchPolicy, + UsageEvent, } from "@dispatch/kernel"; -import { defineService } from "@dispatch/kernel"; +import { defineEventHook, defineService, type ServiceHandle } from "@dispatch/kernel"; import { createMetricsAccumulator } from "./metrics.js"; import { buildUserMessage, defaultDispatchPolicy, generateTurnId } from "./pure.js"; import type { ToolAssembly } from "./tools-filter.js"; +// --- Lifecycle event hooks --- + +/** Context carried on turn-lifecycle events, enough to replicate the turn's request prefix. */ +export interface TurnLifecyclePayload { + readonly conversationId: string; + readonly cwd?: string; + readonly modelName?: string; +} + +/** Fired when a turn STARTS driving a conversation (consumers cancel warming timers). */ +export const turnStarted: EventHookDescriptor<TurnLifecyclePayload> = + defineEventHook<TurnLifecyclePayload>("session-orchestrator/turn-started"); + +/** Fired when a turn SETTLES (sealed) for a conversation (consumers arm warming timers). */ +export const turnSettled: EventHookDescriptor<TurnLifecyclePayload> = + defineEventHook<TurnLifecyclePayload>("session-orchestrator/turn-settled"); + +// --- Warm service --- + +export interface WarmResult { + readonly inputTokens: number; + readonly outputTokens: number; + readonly cacheReadTokens: number; + readonly cacheWriteTokens: number; +} + +export interface WarmService { + readonly warm: ( + conversationId: string, + opts?: { readonly cwd?: string; readonly modelName?: string }, + ) => Promise<WarmResult | { readonly error: string }>; +} + +export const cacheWarmHandle: ServiceHandle<WarmService> = defineService<WarmService>( + "session-orchestrator/warm", +); + export interface SessionOrchestrator { handleMessage(input: { conversationId: string; @@ -45,28 +85,129 @@ export interface SessionOrchestratorDeps { readonly logger?: Logger; /** Injected monotonic-ish clock (ms) forwarded to RunTurnInput for timing events. */ readonly now?: () => number; + /** Emit a lifecycle event hook to subscribers. Injected from host. */ + readonly emit?: <TPayload>(hook: EventHookDescriptor<TPayload>, payload: TPayload) => void; } -export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator { - return { +export interface SessionOrchestratorBundle { + readonly orchestrator: SessionOrchestrator; + /** The shared active-conversations set, for use by createWarmService. */ + readonly activeConversations: ReadonlySet<string>; +} + +export function createSessionOrchestrator( + deps: SessionOrchestratorDeps, +): SessionOrchestratorBundle { + const activeConversations = new Set<string>(); + + const orchestrator: SessionOrchestrator = { async handleMessage({ conversationId, text, onEvent, signal, modelName, cwd }) { + const payload: TurnLifecyclePayload = { + conversationId, + ...(cwd !== undefined ? { cwd } : {}), + ...(modelName !== undefined ? { modelName } : {}), + }; + deps.emit?.(turnStarted, payload); + activeConversations.add(conversationId); + + try { + const history = await deps.conversationStore.load(conversationId); + const userMsg = buildUserMessage(text); + const turnId = generateTurnId(); + + let provider: ProviderContract; + let modelOverride: string | undefined; + + if (modelName !== undefined && deps.resolveModel !== undefined) { + const resolved = deps.resolveModel(modelName); + if (resolved === undefined) { + onEvent({ + type: "error", + conversationId, + turnId, + message: `unknown model: ${modelName}`, + }); + return; + } + provider = resolved.provider; + modelOverride = resolved.model; + } else { + provider = deps.resolveProvider(); + } + + const baseTools = deps.resolveTools(); + const assembled = await deps.applyToolsFilter({ + tools: baseTools, + conversationId, + ...(cwd !== undefined ? { cwd } : {}), + }); + const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy(); + const turnLogger = deps.logger?.child({ conversationId, turnId }); + const metrics = createMetricsAccumulator(); + + const emitAndAccumulate = (event: AgentEvent): void => { + metrics.ingest(event); + onEvent(event); + }; + + const opts: RunTurnInput = { + provider, + messages: [...history, userMsg], + tools: assembled.tools, + dispatch, + emit: emitAndAccumulate, + conversationId, + turnId, + ...(modelOverride !== undefined + ? { providerOpts: { model: modelOverride } satisfies ProviderStreamOptions } + : {}), + ...(turnLogger !== undefined ? { logger: turnLogger } : {}), + ...(signal !== undefined ? { signal } : {}), + ...(cwd !== undefined ? { cwd } : {}), + ...(deps.now !== undefined ? { now: deps.now } : {}), + }; + + const result = await deps.runTurn(opts); + + const toPersist: ChatMessage[] = [userMsg, ...result.messages]; + await deps.conversationStore.append(conversationId, toPersist); + + const turnMetrics = metrics.build(turnId); + await deps.conversationStore.appendMetrics(conversationId, turnMetrics); + + onEvent({ type: "turn-sealed", conversationId, turnId }); + } finally { + activeConversations.delete(conversationId); + deps.emit?.(turnSettled, payload); + } + }, + }; + + return { orchestrator, activeConversations }; +} + +export function createWarmService( + deps: SessionOrchestratorDeps, + activeConversations: ReadonlySet<string>, +): WarmService { + return { + async warm(conversationId, opts) { + if (activeConversations.has(conversationId)) { + return { error: "conversation is generating" }; + } + const history = await deps.conversationStore.load(conversationId); - const userMsg = buildUserMessage(text); - const turnId = generateTurnId(); + if (history.length === 0) { + return { error: "no history" }; + } let provider: ProviderContract; let modelOverride: string | undefined; - if (modelName !== undefined && deps.resolveModel !== undefined) { - const resolved = deps.resolveModel(modelName); + if (opts?.modelName !== undefined && deps.resolveModel !== undefined) { + const resolved = deps.resolveModel(opts.modelName); if (resolved === undefined) { - onEvent({ - type: "error", - conversationId, - turnId, - message: `unknown model: ${modelName}`, - }); - return; + return { error: `unknown model: ${opts.modelName}` }; } provider = resolved.provider; modelOverride = resolved.model; @@ -75,46 +216,38 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio } const baseTools = deps.resolveTools(); + const cwd = opts?.cwd; const assembled = await deps.applyToolsFilter({ tools: baseTools, conversationId, ...(cwd !== undefined ? { cwd } : {}), }); - const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy(); - const turnLogger = deps.logger?.child({ conversationId, turnId }); - const metrics = createMetricsAccumulator(); - const emitAndAccumulate = (event: AgentEvent): void => { - metrics.ingest(event); - onEvent(event); + const probeMsg: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "reply with just a ." }], }; + const messages = [...history, probeMsg]; - const opts: RunTurnInput = { - provider, - messages: [...history, userMsg], - tools: assembled.tools, - dispatch, - emit: emitAndAccumulate, - conversationId, - turnId, - ...(modelOverride !== undefined - ? { providerOpts: { model: modelOverride } satisfies ProviderStreamOptions } - : {}), - ...(turnLogger !== undefined ? { logger: turnLogger } : {}), - ...(signal !== undefined ? { signal } : {}), - ...(cwd !== undefined ? { cwd } : {}), - ...(deps.now !== undefined ? { now: deps.now } : {}), - }; - - const result = await deps.runTurn(opts); + const providerOpts: ProviderStreamOptions | undefined = + modelOverride !== undefined ? { model: modelOverride, maxTokens: 1 } : { maxTokens: 1 }; - const toPersist: ChatMessage[] = [userMsg, ...result.messages]; - await deps.conversationStore.append(conversationId, toPersist); + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; - const turnMetrics = metrics.build(turnId); - await deps.conversationStore.appendMetrics(conversationId, turnMetrics); + for await (const event of provider.stream(messages, assembled.tools, providerOpts)) { + if ((event as ProviderEvent).type === "usage") { + const usageEvent = event as UsageEvent; + inputTokens = usageEvent.usage.inputTokens; + outputTokens = usageEvent.usage.outputTokens; + cacheReadTokens = usageEvent.usage.cacheReadTokens ?? 0; + cacheWriteTokens = usageEvent.usage.cacheWriteTokens ?? 0; + } + } - onEvent({ type: "turn-sealed", conversationId, turnId }); + return { inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens }; }, }; } diff --git a/packages/transport-http/src/server.bun.test.ts b/packages/transport-http/src/server.bun.test.ts index 025042a..5b4c6aa 100644 --- a/packages/transport-http/src/server.bun.test.ts +++ b/packages/transport-http/src/server.bun.test.ts @@ -86,6 +86,7 @@ function createFakeHostAPI(configOverrides: Record<string, unknown> = {}): HostA on() { return () => {}; }, + emit() {}, addFilter() { return () => {}; }, @@ -5,7 +5,7 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **734 vitest + 109 bun = 843 tests**. +`tsc -b` EXIT 0 · biome clean · **760 vitest + 109 bun = 869 tests**. Built and verified live (full-fidelity: every feature is a manifest-loaded extension through the host): @@ -140,6 +140,31 @@ no summary but still loadable. Glossary: added `skill`, `skill summary`, `tools `tsc -b` EXIT 0, biome clean. (End-to-end load_skill via a real LLM turn not yet exercised — unit/integration tests cover the filter rewrite + live read.) +## Cache warming (core DONE; control surface PARTIAL) +User-gated calls: target the external **Claude** provider (`../claude` provider-anthropic, loaded via +`DISPATCH_EXTERNAL_EXTENSIONS`); warm-assembly lives in **session-orchestrator** (`warm()` reuses the +real turn's assembly → byte-identical prefix, provider-agnostic); **surface system** for controls; +**per-conversation** controls; interval default 4 min, free value. Old-code invariants honored +(primary-model/full-prefix via reuse; refuse mid-turn; never persist/emit; in-flight invalidation; +arm-on-settle/cancel-on-start; `pct = round(clamp(cacheRead/input,0,1)*100)`). +- **Mechanism (2nd use of bus hooks; first event-hook emit):** + - [x] **kernel** — exposed `HostAPI.emit` (delegates to bus.emit), counterpart of `on`. + - [x] **session-orchestrator** — `turnStarted`/`turnSettled` event hooks (carry conversationId/cwd/ + modelName) emitted per turn; `warm()` service (`cacheWarmHandle`) reusing assembly, refusing + mid-turn, never persisting/emitting; returns Usage. + - [x] **cache-warming** (new ext) — per-conversation timers (arm/cancel/in-flight token), + calls `warm()`, computes `lastPct`, persists `{enabled,intervalMs}` (default on/240s) in + host.storage; registers a controls Surface. 19 tests. + - [x] **host-bin** — registered cache-warming; **transport-http** HostAPI stub fixed for `emit`. +- **Live-verified:** full-graph `tsc -b` EXIT 0, biome clean (boot smoke + live Claude warm pending + a restart with the cache-warming ext loaded). +- **OPEN — surface-system limits (CR from cache-warming):** the surface system has (a) NO + per-conversation context (surface reflects most-recently-active conversation; invoke carries + conversationId), and (b) NO numeric-input field kind, so the **interval ("set time to refresh") + control is not yet a view input** — only the on/off toggle + last-cache-% stat render. Honoring + per-conversation controls + free-value interval needs a `NumberField` in `ui-contract` + + per-conversation surface scoping (+ FE courier). Decision pending. + ## Open items - **`prefix.fingerprint` / `warm|real` cache-bust attributes (deferred):** decoupled from dedup by the content-addressed decision; also gated on cache-warming being diff --git a/tsconfig.json b/tsconfig.json index 8fd2d24..c5e797d 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -21,6 +21,7 @@ { "path": "./packages/tool-edit-file" }, { "path": "./packages/tool-write-file" }, { "path": "./packages/skills" }, + { "path": "./packages/cache-warming" }, { "path": "./packages/cli" }, { "path": "./packages/journal-sink" }, { "path": "./packages/trace-store" }, |
