diff options
| -rw-r--r-- | packages/cache-warming/src/extension.ts | 99 | ||||
| -rw-r--r-- | packages/cache-warming/src/index.ts | 5 | ||||
| -rw-r--r-- | packages/cache-warming/src/pure.test.ts | 135 | ||||
| -rw-r--r-- | packages/cache-warming/src/pure.ts | 96 | ||||
| -rw-r--r-- | packages/cache-warming/src/warmer.test.ts | 112 | ||||
| -rw-r--r-- | packages/surface-registry/src/index.ts | 2 | ||||
| -rw-r--r-- | packages/surface-registry/src/registry.ts | 13 | ||||
| -rw-r--r-- | packages/transport-ws/src/extension.ts | 65 | ||||
| -rw-r--r-- | packages/transport-ws/src/index.ts | 2 | ||||
| -rw-r--r-- | packages/transport-ws/src/router.test.ts | 155 | ||||
| -rw-r--r-- | packages/transport-ws/src/router.ts | 67 | ||||
| -rw-r--r-- | packages/transport-ws/src/server.bun.test.ts | 13 | ||||
| -rw-r--r-- | packages/ui-contract/src/index.ts | 51 | ||||
| -rw-r--r-- | tasks.md | 16 |
14 files changed, 720 insertions, 111 deletions
diff --git a/packages/cache-warming/src/extension.ts b/packages/cache-warming/src/extension.ts index 16515a8..26d429b 100644 --- a/packages/cache-warming/src/extension.ts +++ b/packages/cache-warming/src/extension.ts @@ -1,8 +1,14 @@ import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; import { cacheWarmHandle, turnSettled, turnStarted } from "@dispatch/session-orchestrator"; -import type { SurfaceProvider } from "@dispatch/surface-registry"; +import type { SurfaceContext, SurfaceProvider } from "@dispatch/surface-registry"; import { surfaceRegistryHandle } from "@dispatch/surface-registry"; import type { SurfaceSpec } from "@dispatch/ui-contract"; +import { + buildConversationSpec, + buildDefaultSpec, + parseIntervalPayload, + secondsToMs, +} from "./pure.js"; import { createCacheWarmer } from "./warmer.js"; export const manifest: Manifest = { @@ -19,41 +25,13 @@ export const manifest: Manifest = { }, }; -function buildSurfaceSpec( - _conversationId: string | undefined, - enabled: boolean, - lastPct: number | null, -): SurfaceSpec { - const pctDisplay = lastPct === null ? "—" : `${lastPct}%`; - return { - id: "cache-warming", - region: "side", - title: "Cache Warming", - fields: [ - { - kind: "toggle", - label: "Enabled", - value: enabled, - action: { actionId: "cache-warming/toggle" }, - }, - { - kind: "stat", - label: "Last Cache %", - value: pctDisplay, - }, - ], - }; -} - export function activate(host: HostAPI): void { const warmService = host.getService(cacheWarmHandle); const registry = host.getService(surfaceRegistryHandle); const storage = host.storage("cache-warming"); - let currentConversationId: string | undefined; + const subscribers = new Set<() => void>(); - // Timer wrapper: setTimeout/clearTimeout return Timeout in Node types, - // but our TimerDeps uses number ids. Map between them. const timeoutMap = new Map<number, ReturnType<typeof setTimeout>>(); let nextTimerId = 1; @@ -76,46 +54,67 @@ export function activate(host: HostAPI): void { }, }, onSurfaceChange: () => { - // Surface subscribers will re-fetch on next getSpec() + for (const notify of subscribers) { + notify(); + } }, }); host.on(turnStarted, (payload) => { - currentConversationId = payload.conversationId; warmer.onTurnStarted(payload.conversationId); }); host.on(turnSettled, (payload) => { - currentConversationId = payload.conversationId; warmer.onTurnSettled(payload.conversationId, { ...(payload.cwd !== undefined ? { cwd: payload.cwd } : {}), ...(payload.modelName !== undefined ? { modelName: payload.modelName } : {}), }); }); + function getSpec(context?: SurfaceContext): SurfaceSpec { + const convId = context?.conversationId; + if (convId === undefined) { + return buildDefaultSpec(); + } + const state = warmer.getState(convId); + return buildConversationSpec(state.enabled, state.intervalMs, state.lastPct); + } + + async function invoke( + actionId: string, + payload?: unknown, + context?: SurfaceContext, + ): Promise<void> { + const convId = context?.conversationId; + if (convId === undefined) return; + + if (actionId === "cache-warming/toggle") { + const current = warmer.getState(convId); + await warmer.setEnabled(convId, !current.enabled); + } + + if (actionId === "cache-warming/set-interval") { + const seconds = parseIntervalPayload(payload); + if (seconds === null) return; + const ms = secondsToMs(seconds); + if (ms === null) return; + await warmer.setIntervalMs(convId, ms); + } + } + const provider: SurfaceProvider = { catalogEntry: { id: "cache-warming", region: "side", title: "Cache Warming", }, - getSpec() { - const convId = currentConversationId; - const state = - convId !== undefined ? warmer.getState(convId) : { enabled: true, lastPct: null }; - return buildSurfaceSpec(convId, state.enabled, state.lastPct); - }, - async invoke(actionId, payload) { - const pl = payload as Record<string, unknown> | undefined; - const convId = - (typeof pl?.conversationId === "string" ? pl.conversationId : undefined) ?? - currentConversationId; - if (convId === undefined) return; - - if (actionId === "cache-warming/toggle") { - const current = warmer.getState(convId); - await warmer.setEnabled(convId, !current.enabled); - } + getSpec, + invoke, + subscribe(onChange) { + subscribers.add(onChange); + return () => { + subscribers.delete(onChange); + }; }, }; diff --git a/packages/cache-warming/src/index.ts b/packages/cache-warming/src/index.ts index 8670dc5..d77f4ec 100644 --- a/packages/cache-warming/src/index.ts +++ b/packages/cache-warming/src/index.ts @@ -1,12 +1,17 @@ export { extension, manifest } from "./extension.js"; export { + buildConversationSpec, + buildDefaultSpec, type ConversationSettings, type ConversationState, computeCachePct, DEFAULT_INTERVAL_MS, isTokenCurrent, MIN_INTERVAL_MS, + msToSeconds, + parseIntervalPayload, parseSettings, + secondsToMs, serializeSettings, settingsKey, shouldWarm, diff --git a/packages/cache-warming/src/pure.test.ts b/packages/cache-warming/src/pure.test.ts index 820260b..1c912f2 100644 --- a/packages/cache-warming/src/pure.test.ts +++ b/packages/cache-warming/src/pure.test.ts @@ -1,9 +1,15 @@ import { describe, expect, it } from "vitest"; import type { ConversationState } from "./pure.js"; import { + buildConversationSpec, + buildDefaultSpec, computeCachePct, isTokenCurrent, + MIN_INTERVAL_MS, + msToSeconds, + parseIntervalPayload, parseSettings, + secondsToMs, serializeSettings, shouldWarm, } from "./pure.js"; @@ -107,3 +113,132 @@ describe("parseSettings/serializeSettings round-trip", () => { expect(parsed.intervalMs).toBe(240_000); }); }); + +describe("msToSeconds", () => { + it("converts ms to seconds, rounded", () => { + expect(msToSeconds(240_000)).toBe(240); + expect(msToSeconds(1500)).toBe(2); + expect(msToSeconds(1000)).toBe(1); + expect(msToSeconds(0)).toBe(0); + }); +}); + +describe("secondsToMs", () => { + it("converts seconds to ms, floors at MIN_INTERVAL_MS", () => { + expect(secondsToMs(240)).toBe(240_000); + expect(secondsToMs(1)).toBe(1000); + expect(secondsToMs(0.5)).toBe(MIN_INTERVAL_MS); + }); + + it("returns null for NaN / non-positive", () => { + expect(secondsToMs(Number.NaN)).toBeNull(); + expect(secondsToMs(0)).toBeNull(); + expect(secondsToMs(-5)).toBeNull(); + expect(secondsToMs(Number.POSITIVE_INFINITY)).toBeNull(); + }); +}); + +describe("parseIntervalPayload", () => { + it("accepts a bare positive number", () => { + expect(parseIntervalPayload(30)).toBe(30); + expect(parseIntervalPayload(1)).toBe(1); + }); + + it("accepts { value: number }", () => { + expect(parseIntervalPayload({ value: 30 })).toBe(30); + expect(parseIntervalPayload({ value: 1 })).toBe(1); + }); + + it("returns null for NaN / non-positive / wrong shape", () => { + expect(parseIntervalPayload(Number.NaN)).toBeNull(); + expect(parseIntervalPayload(0)).toBeNull(); + expect(parseIntervalPayload(-5)).toBeNull(); + expect(parseIntervalPayload("30")).toBeNull(); + expect(parseIntervalPayload({ value: "30" })).toBeNull(); + expect(parseIntervalPayload({})).toBeNull(); + expect(parseIntervalPayload(null)).toBeNull(); + expect(parseIntervalPayload(undefined)).toBeNull(); + }); +}); + +describe("buildConversationSpec", () => { + it("builds a per-conversation spec with toggle + number(interval) + last-% fields", () => { + const spec = buildConversationSpec(true, 240_000, 80); + expect(spec.id).toBe("cache-warming"); + expect(spec.region).toBe("side"); + expect(spec.title).toBe("Cache Warming"); + expect(spec.fields).toHaveLength(3); + + const toggle = spec.fields[0]; + expect(toggle).toEqual({ + kind: "toggle", + label: "Enabled", + value: true, + action: { actionId: "cache-warming/toggle" }, + }); + + const number = spec.fields[1]; + expect(number).toEqual({ + kind: "number", + label: "Refresh Interval", + value: 240, + min: 1, + step: 1, + unit: "s", + action: { actionId: "cache-warming/set-interval" }, + }); + + const stat = spec.fields[2]; + expect(stat).toEqual({ + kind: "stat", + label: "Last Cache %", + value: "80%", + }); + }); + + it("shows — when lastPct is null", () => { + const spec = buildConversationSpec(true, 240_000, null); + const stat = spec.fields[2]; + expect(stat).toEqual({ + kind: "stat", + label: "Last Cache %", + value: "—", + }); + }); + + it("reflects disabled state", () => { + const spec = buildConversationSpec(false, 120_000, 50); + const toggle = spec.fields[0]; + expect(toggle).toEqual({ + kind: "toggle", + label: "Enabled", + value: false, + action: { actionId: "cache-warming/toggle" }, + }); + const number = spec.fields[1]; + expect(number).toEqual({ + kind: "number", + label: "Refresh Interval", + value: 120, + min: 1, + step: 1, + unit: "s", + action: { actionId: "cache-warming/set-interval" }, + }); + }); +}); + +describe("buildDefaultSpec", () => { + it("returns a default spec with no conversationId", () => { + const spec = buildDefaultSpec(); + expect(spec.id).toBe("cache-warming"); + expect(spec.region).toBe("side"); + expect(spec.title).toBe("Cache Warming"); + expect(spec.fields).toHaveLength(1); + expect(spec.fields[0]).toEqual({ + kind: "stat", + label: "Status", + value: "No conversation focused", + }); + }); +}); diff --git a/packages/cache-warming/src/pure.ts b/packages/cache-warming/src/pure.ts index 2b00dab..7b91b11 100644 --- a/packages/cache-warming/src/pure.ts +++ b/packages/cache-warming/src/pure.ts @@ -3,6 +3,8 @@ * Every function is input → output; testable without mocks. */ +import type { NumberField, StatField, SurfaceSpec, ToggleField } from "@dispatch/ui-contract"; + // --- Types --- /** Persisted per-conversation settings (storage-facing). */ @@ -93,3 +95,97 @@ export function serializeSettings(settings: ConversationSettings): string { export function settingsKey(conversationId: string): string { return `${SETTINGS_KEY}:${conversationId}`; } + +// --- Surface spec builders (pure) --- + +/** Convert intervalMs to display seconds (rounded). */ +export function msToSeconds(intervalMs: number): number { + return Math.round(intervalMs / 1000); +} + +/** + * Convert seconds (from the UI) to intervalMs, flooring at MIN_INTERVAL_MS. + * Returns null for NaN / non-positive (caller should ignore). + */ +export function secondsToMs(seconds: number): number | null { + if (!Number.isFinite(seconds) || seconds <= 0) return null; + return Math.max(MIN_INTERVAL_MS, Math.round(seconds * 1000)); +} + +/** + * Build a per-conversation surface spec with toggle + number(interval) + stat fields. + * Pure — no I/O. + */ +export function buildConversationSpec( + enabled: boolean, + intervalMs: number, + lastPct: number | null, +): SurfaceSpec { + const pctDisplay = lastPct === null ? "—" : `${lastPct}%`; + const toggle: ToggleField = { + kind: "toggle", + label: "Enabled", + value: enabled, + action: { actionId: "cache-warming/toggle" }, + }; + const interval: NumberField = { + kind: "number", + label: "Refresh Interval", + value: msToSeconds(intervalMs), + min: 1, + step: 1, + unit: "s", + action: { actionId: "cache-warming/set-interval" }, + }; + const stat: StatField = { + kind: "stat", + label: "Last Cache %", + value: pctDisplay, + }; + return { + id: "cache-warming", + region: "side", + title: "Cache Warming", + fields: [toggle, interval, stat], + }; +} + +/** + * Build a default surface spec when no conversation is in focus. + * Pure — no I/O. + */ +export function buildDefaultSpec(): SurfaceSpec { + return { + id: "cache-warming", + region: "side", + title: "Cache Warming", + fields: [ + { + kind: "stat", + label: "Status", + value: "No conversation focused", + }, + ], + }; +} + +/** + * Parse the payload for a set-interval action. + * Accepts a bare number OR { value: number }. Returns the seconds value, or + * null if the payload is invalid (NaN / non-positive / wrong shape). + */ +export function parseIntervalPayload(payload: unknown): number | null { + if (typeof payload === "number" && Number.isFinite(payload) && payload > 0) { + return payload; + } + if ( + typeof payload === "object" && + payload !== null && + "value" in payload && + typeof (payload as Record<string, unknown>).value === "number" + ) { + const v = (payload as Record<string, unknown>).value as number; + if (Number.isFinite(v) && v > 0) return v; + } + return null; +} diff --git a/packages/cache-warming/src/warmer.test.ts b/packages/cache-warming/src/warmer.test.ts index 9b9ba93..9865877 100644 --- a/packages/cache-warming/src/warmer.test.ts +++ b/packages/cache-warming/src/warmer.test.ts @@ -1,6 +1,7 @@ import type { Logger, Span } from "@dispatch/kernel"; import type { WarmResult } from "@dispatch/session-orchestrator"; import { describe, expect, it } from "vitest"; +import { MIN_INTERVAL_MS } from "./pure.js"; import { createCacheWarmer, type TimerDeps } from "./warmer.js"; function memStorage(): StorageNamespace { @@ -204,4 +205,115 @@ describe("CacheWarmer", () => { expect(warmCount).toBe(2); }); + + it("setIntervalMs converts seconds→ms, floors at MIN_INTERVAL_MS, and re-arms", async () => { + const timers = fakeTimers(); + const warmCalls: string[] = []; + const warmer = createCacheWarmer({ + warm: async (convId) => { + warmCalls.push(convId); + return WARM_RESULT; + }, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + // Enable and settle to arm the timer + warmer.onTurnSettled("conv-1", {}); + + // Set interval to 30 seconds (30000ms) + const settings = await warmer.setIntervalMs("conv-1", 30_000); + expect(settings.intervalMs).toBe(30_000); + + const state = warmer.getState("conv-1"); + expect(state.intervalMs).toBe(30_000); + + // Timer should still be armed — flush fires it + timers.flush(); + await new Promise((r) => setTimeout(r, 10)); + expect(warmCalls).toContain("conv-1"); + }); + + it("setIntervalMs clamps values below MIN_INTERVAL_MS", async () => { + const timers = fakeTimers(); + const warmer = createCacheWarmer({ + warm: async () => WARM_RESULT, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + warmer.onTurnSettled("conv-1", {}); + + // Set interval to 500ms — should clamp to MIN_INTERVAL_MS (1000) + const settings = await warmer.setIntervalMs("conv-1", 500); + expect(settings.intervalMs).toBe(1000); + }); + + it("setIntervalMs ignores NaN / non-positive (clamps to MIN_INTERVAL_MS)", async () => { + const timers = fakeTimers(); + const warmer = createCacheWarmer({ + warm: async () => WARM_RESULT, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + warmer.onTurnSettled("conv-1", {}); + + const settings1 = await warmer.setIntervalMs("conv-1", Number.NaN); + expect(settings1.intervalMs).toBe(MIN_INTERVAL_MS); + + const settings2 = await warmer.setIntervalMs("conv-1", -5000); + expect(settings2.intervalMs).toBe(MIN_INTERVAL_MS); + + const settings3 = await warmer.setIntervalMs("conv-1", 0); + expect(settings3.intervalMs).toBe(MIN_INTERVAL_MS); + }); + + it("setEnabled flips enabled for a conversation", async () => { + const timers = fakeTimers(); + const warmer = createCacheWarmer({ + warm: async () => WARM_RESULT, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => {}, + }); + + // Default is enabled + expect(warmer.getState("conv-1").enabled).toBe(true); + + // Toggle off + await warmer.setEnabled("conv-1", false); + expect(warmer.getState("conv-1").enabled).toBe(false); + + // Toggle on + await warmer.setEnabled("conv-1", true); + expect(warmer.getState("conv-1").enabled).toBe(true); + }); + + it("onSurfaceChange is called when settings change", async () => { + const timers = fakeTimers(); + let changeCount = 0; + const warmer = createCacheWarmer({ + warm: async () => WARM_RESULT, + storage: memStorage(), + logger: makeLogger(), + timers, + onSurfaceChange: () => { + changeCount++; + }, + }); + + await warmer.setEnabled("conv-1", false); + expect(changeCount).toBe(1); + + await warmer.setIntervalMs("conv-1", 30_000); + expect(changeCount).toBe(2); + }); }); diff --git a/packages/surface-registry/src/index.ts b/packages/surface-registry/src/index.ts index cdfcf7e..da52c92 100644 --- a/packages/surface-registry/src/index.ts +++ b/packages/surface-registry/src/index.ts @@ -1,4 +1,4 @@ export { createSurfaceRegistryExtension, manifest } from "./extension.js"; -export type { SurfaceProvider, SurfaceRegistry } from "./registry.js"; +export type { SurfaceContext, SurfaceProvider, SurfaceRegistry } from "./registry.js"; export { createSurfaceRegistry } from "./registry.js"; export { surfaceRegistryHandle } from "./service.js"; diff --git a/packages/surface-registry/src/registry.ts b/packages/surface-registry/src/registry.ts index b1c8116..5780910 100644 --- a/packages/surface-registry/src/registry.ts +++ b/packages/surface-registry/src/registry.ts @@ -1,6 +1,15 @@ import type { SurfaceCatalog, SurfaceCatalogEntry, SurfaceSpec } from "@dispatch/ui-contract"; /** + * Optional context threaded by the transport when calling a surface provider. + * Providers may use this to scope per-conversation state; omitting it yields + * the default/global behaviour. + */ +export interface SurfaceContext { + readonly conversationId?: string; +} + +/** * What a surface-contributing extension registers with the surface registry. * Each provider owns one surface identified by its catalog entry id. */ @@ -9,10 +18,10 @@ export interface SurfaceProvider { readonly catalogEntry: SurfaceCatalogEntry; /** Build the current surface spec (may be async for dynamic surfaces). */ - getSpec(): SurfaceSpec | Promise<SurfaceSpec>; + getSpec(context?: SurfaceContext): SurfaceSpec | Promise<SurfaceSpec>; /** Run a backend action by id with an optional payload. */ - invoke(actionId: string, payload?: unknown): void | Promise<void>; + invoke(actionId: string, payload?: unknown, context?: SurfaceContext): void | Promise<void>; /** * Optional: subscribe to spec changes. Returns an unsubscribe disposer. diff --git a/packages/transport-ws/src/extension.ts b/packages/transport-ws/src/extension.ts index 0f1a397..10981a5 100644 --- a/packages/transport-ws/src/extension.ts +++ b/packages/transport-ws/src/extension.ts @@ -9,11 +9,11 @@ import type { Extension, HostAPI } from "@dispatch/kernel"; import type { SessionOrchestrator } from "@dispatch/session-orchestrator"; import { sessionOrchestratorHandle } from "@dispatch/session-orchestrator"; -import type { SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; +import type { SurfaceContext, SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; import { surfaceRegistryHandle } from "@dispatch/surface-registry"; import type { WsClientMessage, WsServerMessage } from "@dispatch/transport-contract"; import { manifest } from "./manifest.js"; -import { catalogMessage, routeClientMessage } from "./router.js"; +import { catalogMessage, routeClientMessage, subKey } from "./router.js"; /** Active provider subscriptions + chat abort controller for a single WS connection. */ interface ConnectionState { @@ -48,33 +48,53 @@ export function createTransportWsExtension(): Extension { ws: Ws, provider: SurfaceProvider, surfaceId: string, + conversationId: string | undefined, state: ConnectionState, ): void { - if (!provider.subscribe || state.providerDisposers.has(surfaceId)) { + const key = subKey(surfaceId, conversationId); + if (!provider.subscribe || state.providerDisposers.has(key)) { return; } + const context: SurfaceContext | undefined = + conversationId !== undefined ? { conversationId } : undefined; const dispose = provider.subscribe(() => { try { - const spec = provider.getSpec(); + const spec = provider.getSpec(context); if (spec instanceof Promise) { spec - .then((s) => send(ws, { type: "update", update: { surfaceId, spec: s } })) + .then((s) => + send(ws, { + type: "update", + update: { + surfaceId, + spec: s, + ...(conversationId !== undefined ? { conversationId } : {}), + }, + }), + ) .catch(() => {}); } else { - send(ws, { type: "update", update: { surfaceId, spec } }); + send(ws, { + type: "update", + update: { + surfaceId, + spec, + ...(conversationId !== undefined ? { conversationId } : {}), + }, + }); } } catch { // Provider threw — log but don't kill the connection. } }); - state.providerDisposers.set(surfaceId, dispose); + state.providerDisposers.set(key, dispose); } - function unsubscribeFromProvider(state: ConnectionState, surfaceId: string): void { - const dispose = state.providerDisposers.get(surfaceId); + function unsubscribeFromProvider(state: ConnectionState, key: string): void { + const dispose = state.providerDisposers.get(key); if (dispose) { dispose(); - state.providerDisposers.delete(surfaceId); + state.providerDisposers.delete(key); } } @@ -158,15 +178,22 @@ export function createTransportWsExtension(): Extension { // Apply sub change. if (result.subChange) { + const key = subKey(result.subChange.surfaceId, result.subChange.conversationId); if (result.subChange.op === "add") { - state.subs.add(result.subChange.surfaceId); + state.subs.add(key); const provider = registry.getSurface(result.subChange.surfaceId); if (provider) { - subscribeToProvider(ws, provider, result.subChange.surfaceId, state); + subscribeToProvider( + ws, + provider, + result.subChange.surfaceId, + result.subChange.conversationId, + state, + ); } } else { - state.subs.delete(result.subChange.surfaceId); - unsubscribeFromProvider(state, result.subChange.surfaceId); + state.subs.delete(key); + unsubscribeFromProvider(state, key); } } @@ -179,8 +206,16 @@ export function createTransportWsExtension(): Extension { if (result.invoke) { const provider = registry.getSurface(result.invoke.surfaceId); if (provider) { + const context: SurfaceContext | undefined = + result.invoke.conversationId !== undefined + ? { conversationId: result.invoke.conversationId } + : undefined; try { - const r = provider.invoke(result.invoke.actionId, result.invoke.payload); + const r = provider.invoke( + result.invoke.actionId, + result.invoke.payload, + context, + ); if (r instanceof Promise) { r.catch(() => {}); } diff --git a/packages/transport-ws/src/index.ts b/packages/transport-ws/src/index.ts index f4355c0..600519a 100644 --- a/packages/transport-ws/src/index.ts +++ b/packages/transport-ws/src/index.ts @@ -6,4 +6,4 @@ export type { RouteResult, SurfaceRouteResult, } from "./router.js"; -export { catalogMessage, routeClientMessage } from "./router.js"; +export { catalogMessage, routeClientMessage, subKey } from "./router.js"; diff --git a/packages/transport-ws/src/router.test.ts b/packages/transport-ws/src/router.test.ts index ae76c5d..afd7b2f 100644 --- a/packages/transport-ws/src/router.test.ts +++ b/packages/transport-ws/src/router.test.ts @@ -1,32 +1,61 @@ -import type { SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; +import type { SurfaceContext, SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; import type { SurfaceCatalogEntry, SurfaceSpec } from "@dispatch/ui-contract"; import { describe, expect, it } from "vitest"; -import { catalogMessage, routeClientMessage } from "./router.js"; +import { catalogMessage, routeClientMessage, subKey } from "./router.js"; // ── Fake in-memory registry (no mocks — just a plain implementation) ──────── -function fakeProvider(id: string, title?: string, actions?: readonly string[]): SurfaceProvider { +interface FakeProviderOpts { + readonly id: string; + readonly title?: string; + readonly actions?: readonly string[]; + /** Called with the context that getSpec receives — for test assertions. */ + readonly onGetSpec?: (context: SurfaceContext | undefined) => void; + /** Called with the context that invoke receives — for test assertions. */ + readonly onInvoke?: ( + actionId: string, + payload: unknown, + context: SurfaceContext | undefined, + ) => void; +} + +function fakeProvider( + idOrOpts: string | FakeProviderOpts, + title?: string, + actions?: readonly string[], +): SurfaceProvider { + const opts: FakeProviderOpts = + typeof idOrOpts === "string" + ? { + id: idOrOpts, + ...(title !== undefined ? { title } : {}), + ...(actions !== undefined ? { actions } : {}), + } + : idOrOpts; const catalogEntry: SurfaceCatalogEntry = { - id, + id: opts.id, region: "default", - title: title ?? `Surface ${id}`, + title: opts.title ?? `Surface ${opts.id}`, }; return { catalogEntry, - getSpec(): SurfaceSpec { + getSpec(context?: SurfaceContext): SurfaceSpec { + opts.onGetSpec?.(context); return { - id, + id: opts.id, region: "default", title: catalogEntry.title, fields: - actions?.map((a) => ({ + opts.actions?.map((a) => ({ kind: "button" as const, label: a, action: { actionId: a }, })) ?? [], }; }, - invoke(_actionId: string, _payload?: unknown) {}, + invoke(actionId: string, _payload?: unknown, context?: SurfaceContext) { + opts.onInvoke?.(actionId, _payload, context); + }, }; } @@ -77,7 +106,7 @@ describe("routeClientMessage", () => { it("is idempotent — subscribing twice does not duplicate the subChange", () => { const provider = fakeProvider("a"); const registry = fakeRegistry([provider]); - const connSubs = new Set<string>(["a"]); // already subscribed + const connSubs = new Set<string>([subKey("a")]); // already subscribed (global) const result = routeClientMessage(registry, connSubs, { type: "subscribe", @@ -110,12 +139,71 @@ describe("routeClientMessage", () => { }); expect(result.subChange).toBeUndefined(); }); + + it("subscribe with conversationId fetches the provider spec for that conversation and tags the reply", () => { + let receivedContext: SurfaceContext | undefined; + const provider = fakeProvider({ + id: "cache-warm", + title: "Cache Warming", + onGetSpec(ctx) { + receivedContext = ctx; + }, + }); + const registry = fakeRegistry([provider]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "subscribe", + surfaceId: "cache-warm", + conversationId: "conv-42", + }); + + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); + expect(receivedContext).toEqual({ conversationId: "conv-42" }); + expect(result.replies).toHaveLength(1); + const reply = result.replies[0]; + if (reply?.type !== "surface") throw new Error("expected surface reply"); + expect(reply.conversationId).toBe("conv-42"); + expect(reply.spec.id).toBe("cache-warm"); + expect(result.subChange).toEqual({ + op: "add", + surfaceId: "cache-warm", + conversationId: "conv-42", + }); + }); + + it("subscribe without conversationId behaves as before (global surface unaffected)", () => { + let receivedContext: SurfaceContext | undefined; + const provider = fakeProvider({ + id: "global-surf", + title: "Global Surface", + onGetSpec(ctx) { + receivedContext = ctx; + }, + }); + const registry = fakeRegistry([provider]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "subscribe", + surfaceId: "global-surf", + }); + + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); + expect(receivedContext).toBeUndefined(); + const reply = result.replies[0]; + if (reply?.type !== "surface") throw new Error("expected surface reply"); + expect(reply.conversationId).toBeUndefined(); + expect(result.subChange).toEqual({ op: "add", surfaceId: "global-surf" }); + }); }); describe("unsubscribe", () => { it("emits a remove subChange and no replies", () => { const registry = fakeRegistry([]); - const connSubs = new Set<string>(["a"]); + const connSubs = new Set<string>([subKey("a")]); const result = routeClientMessage(registry, connSubs, { type: "unsubscribe", @@ -187,6 +275,37 @@ describe("routeClientMessage", () => { }); expect(result.invoke).toBeUndefined(); }); + + it("invoke forwards the conversationId to the provider", () => { + let _receivedContext: SurfaceContext | undefined; + const provider = fakeProvider({ + id: "cache-warm", + title: "Cache Warming", + actions: ["warm"], + onInvoke(_actionId, _payload, ctx) { + _receivedContext = ctx; + }, + }); + const registry = fakeRegistry([provider]); + const connSubs = new Set<string>(); + + const result = routeClientMessage(registry, connSubs, { + type: "invoke", + surfaceId: "cache-warm", + actionId: "warm", + payload: { force: true }, + conversationId: "conv-99", + }); + + expect(result.kind).toBe("surface"); + if (result.kind !== "surface") throw new Error("expected surface"); + expect(result.invoke).toEqual({ + surfaceId: "cache-warm", + actionId: "warm", + payload: { force: true }, + conversationId: "conv-99", + }); + }); }); describe("chat.send", () => { @@ -282,3 +401,17 @@ describe("catalogMessage", () => { expect(msg).toEqual({ type: "catalog", catalog: [] }); }); }); + +describe("subKey", () => { + it("builds a global key when conversationId is undefined", () => { + expect(subKey("surf-a")).toBe("surf-a::"); + }); + + it("builds a conversation-scoped key when conversationId is provided", () => { + expect(subKey("surf-a", "conv-42")).toBe("surf-a::conv-42"); + }); + + it("global and conversation-scoped keys are distinct", () => { + expect(subKey("surf-a")).not.toBe(subKey("surf-a", "conv-42")); + }); +}); diff --git a/packages/transport-ws/src/router.ts b/packages/transport-ws/src/router.ts index 1a90e86..d1b03ac 100644 --- a/packages/transport-ws/src/router.ts +++ b/packages/transport-ws/src/router.ts @@ -7,7 +7,7 @@ * provider.invoke, drives the orchestrator. */ -import type { SurfaceRegistry } from "@dispatch/surface-registry"; +import type { SurfaceContext, SurfaceRegistry } from "@dispatch/surface-registry"; import type { ChatSendMessage, WsClientMessage } from "@dispatch/transport-contract"; import type { SurfaceServerMessage } from "@dispatch/ui-contract"; @@ -19,12 +19,17 @@ export interface SurfaceRouteResult { /** Server messages to send back to this connection. */ readonly replies: readonly SurfaceServerMessage[]; /** Whether to add or remove the surface id from connSubs. */ - readonly subChange?: { readonly op: "add" | "remove"; readonly surfaceId: string }; - /** If set, the shell must call `provider.invoke(actionId, payload)`. */ + readonly subChange?: { + readonly op: "add" | "remove"; + readonly surfaceId: string; + readonly conversationId?: string; + }; + /** If set, the shell must call `provider.invoke(actionId, payload, context)`. */ readonly invoke?: { readonly surfaceId: string; readonly actionId: string; readonly payload?: unknown; + readonly conversationId?: string; }; } @@ -49,6 +54,14 @@ export type RouteResult = SurfaceRouteResult | ChatRouteResult | ChatRouteError; // ── Helpers ───────────────────────────────────────────────────────────────── +/** + * Build a subscription key from a surface id and optional conversation id. + * The shell uses this same function so both layers agree on key format. + */ +export function subKey(surfaceId: string, conversationId?: string): string { + return conversationId !== undefined ? `${surfaceId}::${conversationId}` : `${surfaceId}::`; +} + /** Build the catalog `SurfaceServerMessage` from the registry. */ export function catalogMessage(registry: SurfaceRegistry): SurfaceServerMessage { return { type: "catalog", catalog: registry.getCatalog() }; @@ -60,7 +73,7 @@ export function catalogMessage(registry: SurfaceRegistry): SurfaceServerMessage * Route a single client message into a pure effect description. * * @param registry The surface registry (looked up once, injected). - * @param connSubs This connection's current subscribed surface ids. + * @param connSubs This connection's current subscription keys (via `subKey`). * @param msg The parsed client message (surface or chat). */ export function routeClientMessage( @@ -70,11 +83,11 @@ export function routeClientMessage( ): RouteResult { switch (msg.type) { case "subscribe": - return handleSubscribe(registry, connSubs, msg.surfaceId); + return handleSubscribe(registry, connSubs, msg.surfaceId, msg.conversationId); case "unsubscribe": - return handleUnsubscribe(msg.surfaceId); + return handleUnsubscribe(msg.surfaceId, msg.conversationId); case "invoke": - return handleInvoke(registry, msg.surfaceId, msg.actionId, msg.payload); + return handleInvoke(registry, msg.surfaceId, msg.actionId, msg.payload, msg.conversationId); case "chat.send": return handleChatSend(msg); } @@ -105,6 +118,7 @@ function handleSubscribe( registry: SurfaceRegistry, connSubs: ReadonlySet<string>, surfaceId: string, + conversationId?: string, ): SurfaceRouteResult { const provider = registry.getSurface(surfaceId); if (!provider) { @@ -114,7 +128,9 @@ function handleSubscribe( }; } - const spec = provider.getSpec(); + const context: SurfaceContext | undefined = + conversationId !== undefined ? { conversationId } : undefined; + const spec = provider.getSpec(context); // getSpec may be sync or async — the pure core treats it as a value the // shell will resolve. We return the spec directly (it's a SurfaceSpec). @@ -123,21 +139,38 @@ function handleSubscribe( const specValue = spec as import("@dispatch/ui-contract").SurfaceSpec; const replies: import("@dispatch/ui-contract").SurfaceServerMessage[] = [ - { type: "surface", spec: specValue }, + { + type: "surface", + spec: specValue, + ...(conversationId !== undefined ? { conversationId } : {}), + }, ]; // Idempotent: only emit subChange if not already subscribed. - if (!connSubs.has(surfaceId)) { - return { kind: "surface", replies, subChange: { op: "add", surfaceId } }; + const key = subKey(surfaceId, conversationId); + if (!connSubs.has(key)) { + return { + kind: "surface", + replies, + subChange: { + op: "add", + surfaceId, + ...(conversationId !== undefined ? { conversationId } : {}), + }, + }; } return { kind: "surface", replies }; } -function handleUnsubscribe(surfaceId: string): SurfaceRouteResult { +function handleUnsubscribe(surfaceId: string, conversationId?: string): SurfaceRouteResult { return { kind: "surface", replies: [], - subChange: { op: "remove", surfaceId }, + subChange: { + op: "remove", + surfaceId, + ...(conversationId !== undefined ? { conversationId } : {}), + }, }; } @@ -146,6 +179,7 @@ function handleInvoke( surfaceId: string, actionId: string, payload?: unknown, + conversationId?: string, ): SurfaceRouteResult { const provider = registry.getSurface(surfaceId); if (!provider) { @@ -157,6 +191,11 @@ function handleInvoke( return { kind: "surface", replies: [], - invoke: { surfaceId, actionId, payload }, + invoke: { + surfaceId, + actionId, + payload, + ...(conversationId !== undefined ? { conversationId } : {}), + }, }; } diff --git a/packages/transport-ws/src/server.bun.test.ts b/packages/transport-ws/src/server.bun.test.ts index ea9d80c..8d6f0b8 100644 --- a/packages/transport-ws/src/server.bun.test.ts +++ b/packages/transport-ws/src/server.bun.test.ts @@ -1,10 +1,10 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import type { AgentEvent, Attributes, ErrorAttributes, Logger } from "@dispatch/kernel"; import type { SessionOrchestrator } from "@dispatch/session-orchestrator"; -import type { SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; +import type { SurfaceContext, SurfaceProvider, SurfaceRegistry } from "@dispatch/surface-registry"; import type { WsServerMessage } from "@dispatch/transport-contract"; import type { SurfaceCatalogEntry, SurfaceClientMessage, SurfaceSpec } from "@dispatch/ui-contract"; -import { catalogMessage, routeClientMessage } from "./router.js"; +import { catalogMessage, routeClientMessage, subKey } from "./router.js"; // ── Fake Logger (captures records for assertions) ─────────────────────────── @@ -58,7 +58,7 @@ function fakeProvider(id: string, title?: string): SurfaceProvider { }; return { catalogEntry, - getSpec(): SurfaceSpec { + getSpec(_context?: SurfaceContext): SurfaceSpec { return { id, region: "default", @@ -66,7 +66,7 @@ function fakeProvider(id: string, title?: string): SurfaceProvider { fields: [], }; }, - invoke(_actionId: string, _payload?: unknown) {}, + invoke(_actionId: string, _payload?: unknown, _context?: SurfaceContext) {}, }; } @@ -151,10 +151,11 @@ function startServer( } if (result.subChange) { + const key = subKey(result.subChange.surfaceId, result.subChange.conversationId); if (result.subChange.op === "add") { - state.subs.add(result.subChange.surfaceId); + state.subs.add(key); } else { - state.subs.delete(result.subChange.surfaceId); + state.subs.delete(key); } } diff --git a/packages/ui-contract/src/index.ts b/packages/ui-contract/src/index.ts index ea0fc26..a7943aa 100644 --- a/packages/ui-contract/src/index.ts +++ b/packages/ui-contract/src/index.ts @@ -46,6 +46,7 @@ export type SurfaceField = | ProgressField | SelectorField | StatField + | NumberField | ButtonField | CustomField; @@ -80,6 +81,24 @@ export interface StatField { readonly value: string; } +/** + * A settable numeric value plus the action that sets it — the free-value + * counterpart to `selector` (which is a fixed enum). Optional `min`/`max`/`step` + * are SEMANTIC bounds a client may use to validate/step input; `unit` is a + * display hint (e.g. "ms", "min"). The client posts the new number as the action + * payload. Unlike `progress`/`stat` (read-only), this field is interactive. + */ +export interface NumberField { + readonly kind: "number"; + readonly label: string; + readonly value: number; + readonly min?: number; + readonly max?: number; + readonly step?: number; + readonly unit?: string; + readonly action: ActionRef; +} + /** A labelled action trigger. */ export interface ButtonField { readonly kind: "button"; @@ -129,10 +148,15 @@ export type SurfaceCatalog = readonly SurfaceCatalogEntry[]; * A live update for a subscribed surface (pushed over the WS channel — §5). v1 * carries the full new spec (the simplest "patch"); granular field-level patches are * deferred until a real surface needs them (P4). + * + * `conversationId` is present only for a CONVERSATION-SCOPED surface (one whose + * spec/values differ per conversation, e.g. cache-warming controls): it tells the + * client which conversation this update pertains to. A global surface omits it. */ export interface SurfaceUpdate { readonly surfaceId: string; readonly spec: SurfaceSpec; + readonly conversationId?: string; } // ───────────────────────────────────────────────────────────────────────────── @@ -145,24 +169,38 @@ export interface SurfaceUpdate { /** A client → server message on the surface channel. */ export type SurfaceClientMessage = SubscribeMessage | UnsubscribeMessage | InvokeMessage; -/** Begin receiving live updates for a surface (server replies with `surface`, then `update`s). */ +/** + * Begin receiving live updates for a surface (server replies with `surface`, then `update`s). + * + * For a CONVERSATION-SCOPED surface, include the `conversationId` whose state you + * want — the server resolves the spec for that conversation and pushes its updates. + * Omit it for a global surface (or to view a conversation-scoped surface with no + * conversation in focus → the surface decides its default/empty state). + */ export interface SubscribeMessage { readonly type: "subscribe"; readonly surfaceId: string; + readonly conversationId?: string; } -/** Stop receiving updates for a surface. */ +/** Stop receiving updates for a surface (and the same `conversationId`, if scoped). */ export interface UnsubscribeMessage { readonly type: "unsubscribe"; readonly surfaceId: string; + readonly conversationId?: string; } -/** Invoke a field's action; `payload` is the new value (e.g. a toggle's boolean). */ +/** + * Invoke a field's action; `payload` is the new value (e.g. a toggle's boolean, a + * `number` field's new number). For a conversation-scoped surface, include the + * `conversationId` the action targets. + */ export interface InvokeMessage { readonly type: "invoke"; readonly surfaceId: string; readonly actionId: string; readonly payload?: unknown; + readonly conversationId?: string; } /** A server → client message on the surface channel. */ @@ -178,10 +216,15 @@ export interface CatalogMessage { readonly catalog: SurfaceCatalog; } -/** The full current spec for a surface the client just subscribed to. */ +/** + * The full current spec for a surface the client just subscribed to. + * `conversationId` echoes the subscribe's conversation for a conversation-scoped + * surface (so the client routes it), and is absent for a global surface. + */ export interface SurfaceMessage { readonly type: "surface"; readonly spec: SurfaceSpec; + readonly conversationId?: string; } /** A live update for a subscribed surface. */ @@ -5,7 +5,7 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **760 vitest + 109 bun = 869 tests**. +`tsc -b` EXIT 0 · biome clean · **784 vitest + 109 bun = 893 tests**. Built and verified live (full-fidelity: every feature is a manifest-loaded extension through the host): @@ -162,12 +162,14 @@ arm-on-settle/cancel-on-start; `pct = round(clamp(cacheRead/input,0,1)*100)`). - **LIVE-VERIFIED against Claude haiku:** automatic timer warm → journal `warm complete pct:100`; manual `POST /chat/warm` → `cacheReadTokens:6799, cachePct:100` (100% hit), HTTP 200. The external `../claude` provider-anthropic is loaded via `bin/up` (`DISPATCH_EXTERNAL_EXTENSIONS`). -- **OPEN — surface-system limits (CR from cache-warming):** the surface system has (a) NO - per-conversation context (surface reflects most-recently-active conversation; invoke carries - conversationId), and (b) NO numeric-input field kind, so the **interval ("set time to refresh") - control is not yet a view input** — only the on/off toggle + last-cache-% stat render. Honoring - per-conversation controls + free-value interval needs a `NumberField` in `ui-contract` + - per-conversation surface scoping (+ FE courier). Decision pending. +- **Surface framework extended (DONE):** added `NumberField` to `ui-contract` + per-conversation + surface scoping (optional `conversationId` on subscribe/unsubscribe/invoke + surface/update; new + `SurfaceContext` on `SurfaceProvider.getSpec/invoke`; transport-ws keys subscriptions by + `(surfaceId, conversationId)` and tags updates). cache-warming now serves a PER-CONVERSATION + surface: `Toggle`(enabled) · `Number`(interval, seconds, `cache-warming/set-interval`) · + `Stat`(last cache %). All backward-compatible (global surfaces like `surface-loaded-extensions` + unchanged). **FE courier:** `frontend-cache-warming-handoff.md` (this repo) — the web must render + the `number` field kind + send/handle `conversationId` on the surface WS protocol. ## Open items - **`prefix.fingerprint` / `warm|real` cache-bust attributes (deferred):** decoupled |
