diff options
| -rw-r--r-- | packages/transport-contract/src/index.ts | 44 | ||||
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 106 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 56 | ||||
| -rw-r--r-- | packages/transport-http/src/extension.ts | 12 | ||||
| -rw-r--r-- | packages/transport-http/src/index.ts | 17 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.ts | 42 | ||||
| -rw-r--r-- | packages/transport-http/src/seam.ts | 4 | ||||
| -rw-r--r-- | tasks.md | 8 |
8 files changed, 281 insertions, 8 deletions
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index 5b59a2d..fbb61fc 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -154,6 +154,50 @@ export interface ThroughputResponse { readonly models: readonly ThroughputModelStat[]; } +/** + * Request body for `POST /chat/warm` — manually trigger a prompt-cache WARMING + * request for a conversation (e.g. a frontend "warm now" button, or fast tests + * that don't want to wait for the automatic warming timer). + * + * The warm replays the conversation's existing prefix to the provider to refresh + * its prompt cache; it is NEVER persisted and NEVER streamed (no `AgentEvent`s). + * Pass the same `model`/`cwd` the conversation chats with so the warm request's + * prefix is byte-identical to a real turn (which is what makes the cache hit). + */ +export interface WarmRequest { + /** The conversation whose prompt cache to warm. */ + readonly conversationId: string; + + /** + * The model name in `<credentialName>/<model>` form the conversation uses, so + * the warm resolves the same provider + prefix. Omit to use the server default. + */ + readonly model?: string; + + /** Working directory matching the conversation's turns (for cwd-aware tool assembly). */ + readonly cwd?: string; +} + +/** + * Response body for `POST /chat/warm` (HTTP 200). The warm request's usage — + * never folded into the conversation's real usage. A client surfaces `cachePct` + * as the "last warming" cache-hit indicator. + * + * When warming cannot run because the conversation is currently generating, the + * server responds `409` with `{ error }` instead of this body. + */ +export interface WarmResponse { + readonly inputTokens: number; + readonly outputTokens: number; + readonly cacheReadTokens: number; + readonly cacheWriteTokens: number; + /** + * Cache-hit percent: `round(clamp(cacheReadTokens / inputTokens, 0, 1) * 100)` + * (0 when `inputTokens <= 0`). + */ + readonly cachePct: number; +} + // ─── WebSocket chat ops ─────────────────────────────────────────────────────── // The persistent WS connection multiplexes chat ops (below) with surface ops // (`@dispatch/ui-contract`). The unified unions at the bottom compose both. Chat diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index e634e19..7352b5d 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -10,7 +10,12 @@ import { createThroughputStore, dayKeyOf } from "@dispatch/throughput-store"; import type { ThroughputResponse } from "@dispatch/transport-contract"; import { describe, expect, it } from "vitest"; import { createApp } from "./app.js"; -import type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js"; +import type { + ConversationStore, + CredentialStore, + SessionOrchestrator, + WarmService, +} from "./seam.js"; function createMemStorage(): StorageNamespace { const map = new Map<string, string>(); @@ -147,6 +152,23 @@ function createThrowingCredentialStore(error: Error): CredentialStore { }; } +function createFakeWarmService( + result: + | { + inputTokens: number; + outputTokens: number; + cacheReadTokens: number; + cacheWriteTokens: number; + } + | { error: string }, +): WarmService { + return { + async warm() { + return result; + }, + }; +} + const noopLogger = createFakeLogger(); describe("GET /health", () => { @@ -399,6 +421,88 @@ describe("POST /chat", () => { }); }); +describe("POST /chat/warm", () => { + it("POST /chat/warm returns 200 with cachePct from the warm usage", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + warmService: createFakeWarmService({ + inputTokens: 1000, + outputTokens: 200, + cacheReadTokens: 800, + cacheWriteTokens: 100, + }), + logger: noopLogger, + }); + + const res = await app.request("/chat/warm", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ conversationId: "conv1" }), + }); + + expect(res.status).toBe(200); + const body = (await res.json()) as { + inputTokens: number; + outputTokens: number; + cacheReadTokens: number; + cacheWriteTokens: number; + cachePct: number; + }; + expect(body.inputTokens).toBe(1000); + expect(body.outputTokens).toBe(200); + expect(body.cacheReadTokens).toBe(800); + expect(body.cacheWriteTokens).toBe(100); + expect(body.cachePct).toBe(80); + }); + + it("POST /chat/warm returns 409 when the warm service reports the conversation is generating", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + warmService: createFakeWarmService({ error: "conversation is generating" }), + logger: noopLogger, + }); + + const res = await app.request("/chat/warm", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ conversationId: "conv1" }), + }); + + expect(res.status).toBe(409); + const body = (await res.json()) as { error: string }; + expect(body.error).toBe("conversation is generating"); + }); + + it("POST /chat/warm returns 400 when conversationId is missing", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + warmService: createFakeWarmService({ + inputTokens: 0, + outputTokens: 0, + cacheReadTokens: 0, + cacheWriteTokens: 0, + }), + logger: noopLogger, + }); + + const res = await app.request("/chat/warm", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }); + + expect(res.status).toBe(400); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("conversationId"); + }); +}); + describe("GET /conversations/:id", () => { const sampleChunks: StoredChunk[] = [ { seq: 1, role: "user", chunk: { type: "text", text: "hello" } }, diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 3c9ae85..a8cef51 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -4,14 +4,17 @@ import type { ConversationMetricsResponse, ModelsResponse, ThroughputResponse, + WarmResponse, } from "@dispatch/transport-contract"; import { Hono } from "hono"; import { cors } from "hono/cors"; import { + computeCachePct, isParseError, isSinceSeqError, parseChatBody, parseSinceSeq, + parseWarmBody, serializeEventLine, } from "./logic.js"; import { @@ -20,12 +23,14 @@ import { type SessionOrchestrator, ThroughputQueryError, type ThroughputStore, + type WarmService, } from "./seam.js"; export interface CreateServerOptions { readonly conversationStore: ConversationStore; readonly orchestrator: SessionOrchestrator; readonly credentialStore: CredentialStore; + readonly warmService?: WarmService; /** Optional — defaults to a no-op store (recording disabled, empty reports). */ readonly throughputStore?: ThroughputStore; readonly logger?: Logger; @@ -232,6 +237,57 @@ export function createApp(opts: CreateServerOptions): Hono { }); }); + app.post("/chat/warm", async (c) => { + if (opts.warmService === undefined) { + return c.json({ error: "Warm service not available" }, 503); + } + + let body: unknown; + try { + body = await c.req.json(); + } catch { + log.warn("chat/warm: invalid JSON body"); + return c.json({ error: "Invalid JSON body" }, 400); + } + + const parsed = parseWarmBody(body); + if ("error" in parsed) { + log.warn("chat/warm: validation failed", { reason: parsed.error }); + return c.json({ error: parsed.error }, 400); + } + + const { conversationId, model, cwd } = parsed; + log.info("chat/warm: request accepted", { + conversationId, + hasModel: model !== undefined, + hasCwd: cwd !== undefined, + }); + + const warmOpts: { readonly cwd?: string; readonly modelName?: string } | undefined = + model !== undefined || cwd !== undefined + ? { + ...(cwd !== undefined ? { cwd } : {}), + ...(model !== undefined ? { modelName: model } : {}), + } + : undefined; + + const result = await opts.warmService.warm(conversationId, warmOpts); + + if ("error" in result) { + log.warn("chat/warm: service returned error", { conversationId, error: result.error }); + return c.json({ error: result.error }, 409); + } + + const response: WarmResponse = { + inputTokens: result.inputTokens, + outputTokens: result.outputTokens, + cacheReadTokens: result.cacheReadTokens, + cacheWriteTokens: result.cacheWriteTokens, + cachePct: computeCachePct(result.inputTokens, result.cacheReadTokens), + }; + return c.json(response, 200); + }); + app.get("/metrics/throughput", async (c) => { const period = c.req.query("period"); const date = c.req.query("date"); diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts index 4abd7aa..5274033 100644 --- a/packages/transport-http/src/extension.ts +++ b/packages/transport-http/src/extension.ts @@ -1,6 +1,7 @@ import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; import { createApp } from "./app.js"; import { + cacheWarmHandle, conversationStoreHandle, credentialStoreHandle, sessionOrchestratorHandle, @@ -16,7 +17,14 @@ export const manifest: Manifest = { dependsOn: ["conversation-store", "credential-store", "session-orchestrator", "throughput-store"], capabilities: { network: true }, contributes: { - routes: ["/chat", "/conversations/:id", "/health", "/models", "/metrics/throughput"], + routes: [ + "/chat", + "/chat/warm", + "/conversations/:id", + "/health", + "/models", + "/metrics/throughput", + ], }, activation: "eager", }; @@ -36,6 +44,7 @@ export function createTransportHttpExtension(): Extension & { const orchestrator = host.getService(sessionOrchestratorHandle); const credentialStore = host.getService(credentialStoreHandle); const throughputStore = host.getService(throughputStoreHandle); + const warmService = host.getService(cacheWarmHandle); const logger = host.logger; const app = createApp({ @@ -43,6 +52,7 @@ export function createTransportHttpExtension(): Extension & { orchestrator, credentialStore, throughputStore, + warmService, logger, }); diff --git a/packages/transport-http/src/index.ts b/packages/transport-http/src/index.ts index 171cf6a..64929fc 100644 --- a/packages/transport-http/src/index.ts +++ b/packages/transport-http/src/index.ts @@ -1,16 +1,29 @@ export type { CreateServerOptions } from "./app.js"; export { createApp } from "./app.js"; export { createTransportHttpExtension, manifest } from "./extension.js"; -export type { ChatCommand, ParseError, ParseResult, SinceSeqResult } from "./logic.js"; +export type { + ChatCommand, + ParseError, + ParseResult, + SinceSeqResult, + WarmBodyParsed, +} from "./logic.js"; export { + computeCachePct, isParseError, isSinceSeqError, parseChatBody, parseSinceSeq, serializeEventLine, } from "./logic.js"; -export type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js"; +export type { + ConversationStore, + CredentialStore, + SessionOrchestrator, + WarmService, +} from "./seam.js"; export { + cacheWarmHandle, conversationStoreHandle, credentialStoreHandle, sessionOrchestratorHandle, diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts index f1d5b8c..bb827e2 100644 --- a/packages/transport-http/src/logic.ts +++ b/packages/transport-http/src/logic.ts @@ -71,3 +71,45 @@ export function parseSinceSeq(raw: string | undefined): SinceSeqResult { export function isSinceSeqError(result: SinceSeqResult): result is ParseError { return typeof result === "object"; } + +export interface WarmBodyParsed { + readonly conversationId: string; + readonly model?: string; + readonly cwd?: string; +} + +export function parseWarmBody(body: unknown): WarmBodyParsed | ParseError { + if (body === null || typeof body !== "object") { + return { error: "Request body must be a JSON object" }; + } + + const obj = body as Record<string, unknown>; + + const conversationId = obj.conversationId; + if (typeof conversationId !== "string" || conversationId.length === 0) { + return { error: "Field 'conversationId' is required and must be a non-empty string" }; + } + + const result: Record<string, unknown> = { conversationId }; + + if (obj.model !== undefined) { + if (typeof obj.model !== "string") { + return { error: "Field 'model' must be a string" }; + } + result.model = obj.model; + } + + if (obj.cwd !== undefined) { + if (typeof obj.cwd !== "string") { + return { error: "Field 'cwd' must be a string" }; + } + result.cwd = obj.cwd; + } + + return result as unknown as WarmBodyParsed; +} + +export function computeCachePct(inputTokens: number, cacheReadTokens: number): number { + if (inputTokens <= 0) return 0; + return Math.round(Math.max(0, Math.min(1, cacheReadTokens / inputTokens)) * 100); +} diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts index 7dbaa1b..e89370e 100644 --- a/packages/transport-http/src/seam.ts +++ b/packages/transport-http/src/seam.ts @@ -2,7 +2,7 @@ export type { ConversationStore } from "@dispatch/conversation-store"; export { conversationStoreHandle } from "@dispatch/conversation-store"; export type { CredentialStore } from "@dispatch/credential-store"; export { credentialStoreHandle } from "@dispatch/credential-store"; -export type { SessionOrchestrator } from "@dispatch/session-orchestrator"; -export { sessionOrchestratorHandle } from "@dispatch/session-orchestrator"; +export type { SessionOrchestrator, WarmService } from "@dispatch/session-orchestrator"; +export { cacheWarmHandle, sessionOrchestratorHandle } from "@dispatch/session-orchestrator"; export type { ThroughputStore } from "@dispatch/throughput-store"; export { ThroughputQueryError, throughputStoreHandle } from "@dispatch/throughput-store"; @@ -156,8 +156,12 @@ arm-on-settle/cancel-on-start; `pct = round(clamp(cacheRead/input,0,1)*100)`). calls `warm()`, computes `lastPct`, persists `{enabled,intervalMs}` (default on/240s) in host.storage; registers a controls Surface. 19 tests. - [x] **host-bin** — registered cache-warming; **transport-http** HostAPI stub fixed for `emit`. -- **Live-verified:** full-graph `tsc -b` EXIT 0, biome clean (boot smoke + live Claude warm pending - a restart with the cache-warming ext loaded). +- **Manual trigger endpoint:** `POST /chat/warm {conversationId, model?, cwd?}` → `WarmResponse` + `{inputTokens,outputTokens,cacheReadTokens,cacheWriteTokens,cachePct}` (409 if generating). Powers a + FE "warm now" button + fast tests. Types in `@dispatch/transport-contract`; route in transport-http. +- **LIVE-VERIFIED against Claude haiku:** automatic timer warm → journal `warm complete pct:100`; + manual `POST /chat/warm` → `cacheReadTokens:6799, cachePct:100` (100% hit), HTTP 200. The external + `../claude` provider-anthropic is loaded via `bin/up` (`DISPATCH_EXTERNAL_EXTENSIONS`). - **OPEN — surface-system limits (CR from cache-warming):** the surface system has (a) NO per-conversation context (surface reflects most-recently-active conversation; invoke carries conversationId), and (b) NO numeric-input field kind, so the **interval ("set time to refresh") |
