diff options
| author | Adam Malczewski <[email protected]> | 2026-06-28 12:43:29 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-28 12:43:29 +0900 |
| commit | 2d276669a0cb41959fc67d17bc58e77853dc3eb5 (patch) | |
| tree | b70893b7450522fe9d5b7e627423498ae972e191 | |
| parent | f9d1ca533ad2c5d71a3bc349934d54c09de305bf (diff) | |
| download | dispatch-2d276669a0cb41959fc67d17bc58e77853dc3eb5.tar.gz dispatch-2d276669a0cb41959fc67d17bc58e77853dc3eb5.zip | |
feat(concurrency-fixes): usage-gate + adaptive headroom + configurable cooldown
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/provider.ts | 27 | ||||
| -rw-r--r-- | packages/openai-stream/src/getUsage.test.ts | 139 | ||||
| -rw-r--r-- | packages/openai-stream/src/getUsage.ts | 73 | ||||
| -rw-r--r-- | packages/openai-stream/src/index.ts | 1 | ||||
| -rw-r--r-- | packages/openai-stream/src/provider.test.ts | 21 | ||||
| -rw-r--r-- | packages/openai-stream/src/provider.ts | 9 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/concurrency-manager.test.ts | 315 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/concurrency-manager.ts | 350 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/extension.ts | 103 | ||||
| -rw-r--r-- | packages/transport-contract/src/index.ts | 38 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 49 |
12 files changed, 1073 insertions, 53 deletions
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index 28e0a0b..fc19267 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -103,6 +103,7 @@ export type { ProviderEvent, ProviderStreamOptions, ProviderToolCallEvent, + ProviderUsage, ReasoningDeltaEvent, ReasoningEffort, TextDeltaEvent, diff --git a/packages/kernel/src/contracts/provider.ts b/packages/kernel/src/contracts/provider.ts index 3137073..dea6c17 100644 --- a/packages/kernel/src/contracts/provider.ts +++ b/packages/kernel/src/contracts/provider.ts @@ -104,6 +104,19 @@ export interface ProviderStreamOptions { } /** + * A snapshot of the provider's current upstream usage. Returned by a + * provider's optional `getUsage` so a concurrency limiter can gate slot grants + * on the REAL upstream in-flight count (not just the limiter's local accounting, + * which lags the upstream `concurrent_sessions` counter by the release + * cooldown). `concurrentSessions` is the number of requests the provider counts + * as currently in flight. + */ +export interface ProviderUsage { + /** Upstream count of currently in-flight (generating) sessions. */ + readonly concurrentSessions: number; +} + +/** * Metadata describing a single model a provider can serve. Returned by * `listModels` so a catalog (e.g. the credential-store) can enumerate the * `<credentialName>/<model>` choices a client may select. Kept minimal — `id` @@ -154,4 +167,18 @@ export interface ProviderContract { * credentials in; today the provider uses the key it resolved at activate. */ readonly listModels?: () => Promise<readonly ModelInfo[]>; + + /** + * Fetch the provider's current upstream usage snapshot. Optional: a provider + * that cannot (or chooses not to) report usage omits it, and a concurrency + * limiter falls back to cooldown-only slot recycling (no usage gate). When + * present, the limiter polls this before admitting a QUEUED agent and grants + * only when `concurrentSessions` is below the configured limit — preventing an + * N+1 overshoot from the upstream accounting lag. + * + * May return `undefined` (e.g. the endpoint returned an unexpected shape or a + * non-200) — the limiter treats `undefined` as "no usage info available" and + * falls back to granting (cooldown-only behavior) for that poll. + */ + readonly getUsage?: () => Promise<ProviderUsage | undefined>; } diff --git a/packages/openai-stream/src/getUsage.test.ts b/packages/openai-stream/src/getUsage.test.ts new file mode 100644 index 0000000..c319b73 --- /dev/null +++ b/packages/openai-stream/src/getUsage.test.ts @@ -0,0 +1,139 @@ +import type { ProviderUsage } from "@dispatch/kernel"; +import type { FetchLike } from "@dispatch/trace-replay"; +import { describe, expect, it, vi } from "vitest"; +import { getUsage } from "./getUsage.js"; + +function jsonResponse(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "Content-Type": "application/json" }, + }); +} + +describe("getUsage", () => { + it("extracts concurrent_sessions from the Umans /v1/usage shape", async () => { + const fetchFn = vi.fn( + () => + jsonResponse({ + usage: { concurrent_sessions: 3 }, + limits: { concurrency: { limit: 4, hard_cap: 5 } }, + }) as unknown as ReturnType<FetchLike>, + ); + + const result = await getUsage({ + baseURL: "https://api.umans.ai/v1", + apiKey: "sk-test-1234567890abcdef", + providerId: "umans", + fetchFn, + }); + + expect(result).toEqual<ProviderUsage>({ concurrentSessions: 3 }); + expect(fetchFn).toHaveBeenCalledOnce(); + const call = fetchFn.mock.calls[0]; + expect(call?.[0]).toBe("https://api.umans.ai/v1/usage"); + const init = call?.[1] as RequestInit; + expect(init.method).toBe("GET"); + expect((init.headers as Record<string, string>).Authorization).toBe( + "Bearer sk-test-1234567890abcdef", + ); + }); + + it("returns undefined on non-200 (endpoint unsupported)", async () => { + const fetchFn = vi.fn( + () => jsonResponse({ error: "not found" }, 404) as unknown as ReturnType<FetchLike>, + ); + + const result = await getUsage({ + baseURL: "https://api.example.com/v1", + apiKey: "sk-test", + providerId: "openai", + fetchFn, + }); + + expect(result).toBeUndefined(); + }); + + it("returns undefined on a network error (never throws)", async () => { + const fetchFn = vi.fn( + () => Promise.reject(new Error("ECONNREFUSED")) as unknown as Promise<Response>, + ); + + const result = await getUsage({ + baseURL: "https://api.example.com/v1", + apiKey: "sk-test", + providerId: "openai", + fetchFn, + }); + + expect(result).toBeUndefined(); + }); + + it("returns undefined when the response shape is unexpected", async () => { + const fetchFn = vi.fn( + () => jsonResponse({ unexpected: true }) as unknown as ReturnType<FetchLike>, + ); + + const result = await getUsage({ + baseURL: "https://api.example.com/v1", + apiKey: "sk-test", + providerId: "openai", + fetchFn, + }); + + expect(result).toBeUndefined(); + }); + + it("returns undefined when concurrent_sessions is not a number", async () => { + const fetchFn = vi.fn( + () => + jsonResponse({ + usage: { concurrent_sessions: "three" }, + }) as unknown as ReturnType<FetchLike>, + ); + + const result = await getUsage({ + baseURL: "https://api.example.com/v1", + apiKey: "sk-test", + providerId: "openai", + fetchFn, + }); + + expect(result).toBeUndefined(); + }); + + it("truncates a fractional concurrent_sessions to an integer", async () => { + const fetchFn = vi.fn( + () => + jsonResponse({ usage: { concurrent_sessions: 2.9 } }) as unknown as ReturnType<FetchLike>, + ); + + const result = await getUsage({ + baseURL: "https://api.example.com/v1", + apiKey: "sk-test", + providerId: "openai", + fetchFn, + }); + + expect(result).toEqual<ProviderUsage>({ concurrentSessions: 2 }); + }); + + it("falls back to globalThis.fetch when fetchFn is absent", async () => { + const original = globalThis.fetch; + const stub = vi.fn( + () => jsonResponse({ usage: { concurrent_sessions: 1 } }) as unknown as ReturnType<FetchLike>, + ); + globalThis.fetch = stub as unknown as typeof globalThis.fetch; + + try { + const result = await getUsage({ + baseURL: "https://api.example.com/v1", + apiKey: "sk-test", + providerId: "openai", + }); + expect(result).toEqual<ProviderUsage>({ concurrentSessions: 1 }); + expect(stub).toHaveBeenCalledOnce(); + } finally { + globalThis.fetch = original; + } + }); +}); diff --git a/packages/openai-stream/src/getUsage.ts b/packages/openai-stream/src/getUsage.ts new file mode 100644 index 0000000..5da7fd7 --- /dev/null +++ b/packages/openai-stream/src/getUsage.ts @@ -0,0 +1,73 @@ +import type { ProviderUsage } from "@dispatch/kernel"; +import type { FetchLike } from "@dispatch/trace-replay"; + +/** + * Generic OpenAI-compatible usage fetch. The Umans `/v1/usage` endpoint returns: + * + * { usage: { concurrent_sessions: number }, limits: { concurrency: { limit, hard_cap } } } + * + * We extract only `concurrent_sessions` (the count a concurrency limiter gates + * on). Lives in this library (`@dispatch/openai-stream`) so any OpenAI-compatible + * provider extension reuses it without cross-extension code import + * (isolation-over-DRY: coupling is via this typed library surface). A provider + * supplies its own `id` (used in error labels) via `createOpenAICompatProvider`. + */ + +/** The raw shape of the `/v1/usage` response (only the fields we read). */ +interface UsageResponse { + readonly usage?: { + readonly concurrent_sessions?: number; + }; +} + +export interface GetUsageConfig { + readonly baseURL: string; + readonly apiKey: string; + readonly fetchFn?: FetchLike; + readonly providerId: string; +} + +/** + * Fetch + map the upstream usage snapshot. Returns `undefined` on any error, + * non-200, or unexpected shape so the caller (the concurrency limiter) falls + * back to cooldown-only slot recycling (no usage gate) — never throws. + * + * Pure-ish I/O wrapper: the only effect is the injected fetch. Extracted for + * direct unit testing with a fake fetch. + */ +export async function getUsage(config: GetUsageConfig): Promise<ProviderUsage | undefined> { + const effectiveFetch: FetchLike = config.fetchFn ?? fetch; + const url = `${config.baseURL}/usage`; + + let response: Response; + try { + response = await effectiveFetch(url, { + method: "GET", + headers: { + Authorization: `Bearer ${config.apiKey}`, + }, + }); + } catch { + // Network error — the upstream is unreachable; treat as "no usage info". + return undefined; + } + + if (!response.ok) { + // 404 / 401 / 5xx — the endpoint is unsupported or rejected the request. + return undefined; + } + + let body: UsageResponse; + try { + body = (await response.json()) as UsageResponse; + } catch { + return undefined; + } + + const raw = body.usage?.concurrent_sessions; + if (typeof raw !== "number" || !Number.isFinite(raw) || raw < 0) { + return undefined; + } + + return { concurrentSessions: Math.trunc(raw) }; +} diff --git a/packages/openai-stream/src/index.ts b/packages/openai-stream/src/index.ts index 3f76b99..ff6b4f5 100644 --- a/packages/openai-stream/src/index.ts +++ b/packages/openai-stream/src/index.ts @@ -8,6 +8,7 @@ export type { export { convertMessages } from "./convert-messages.js"; export type { OpenAITool } from "./convert-tools.js"; export { convertTools } from "./convert-tools.js"; +export { getUsage } from "./getUsage.js"; export { isVisionModelId, parseModelList } from "./listModels.js"; export { parseSSELines } from "./parse-sse.js"; export type { CreateOpenAICompatProviderOpts } from "./provider.js"; diff --git a/packages/openai-stream/src/provider.test.ts b/packages/openai-stream/src/provider.test.ts index 8bc6e98..13a303e 100644 --- a/packages/openai-stream/src/provider.test.ts +++ b/packages/openai-stream/src/provider.test.ts @@ -81,6 +81,27 @@ describe("createOpenAICompatProvider stamps the given id on the ProviderContract await expect(listModels()).rejects.toThrow("listModels[my-custom-id]: HTTP 401 — Unauthorized"); }); + + it("exposes getUsage that returns the upstream concurrent_sessions", async () => { + const fetchFn = vi.fn( + () => + new Response(JSON.stringify({ usage: { concurrent_sessions: 2 } }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }) as unknown as ReturnType<FetchLike>, + ); + const provider = createOpenAICompatProvider({ + credentials: makeCreds(), + model: "test-model", + id: "umans", + fetchFn, + }); + const getUsage = provider.getUsage; + if (!getUsage) throw new Error("getUsage not defined"); + + const usage = await getUsage(); + expect(usage).toEqual({ concurrentSessions: 2 }); + }); }); describe("transformBody", () => { diff --git a/packages/openai-stream/src/provider.ts b/packages/openai-stream/src/provider.ts index df5a4ed..9a9369f 100644 --- a/packages/openai-stream/src/provider.ts +++ b/packages/openai-stream/src/provider.ts @@ -4,9 +4,11 @@ import type { ModelInfo, ProviderContract, ProviderStreamOptions, + ProviderUsage, ToolContract, } from "@dispatch/kernel"; import type { FetchLike } from "@dispatch/trace-replay"; +import { getUsage as fetchUsage } from "./getUsage.js"; import { listModels as fetchModels } from "./listModels.js"; import { streamChat } from "./stream.js"; @@ -69,5 +71,12 @@ export function createOpenAICompatProvider(opts: CreateOpenAICompatProviderOpts) providerId: opts.id, ...(fetchFn !== undefined ? { fetchFn } : {}), }), + getUsage: (): Promise<ProviderUsage | undefined> => + fetchUsage({ + baseURL, + apiKey, + providerId: opts.id, + ...(fetchFn !== undefined ? { fetchFn } : {}), + }), }; } diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts index 62f202f..6db8964 100644 --- a/packages/provider-concurrency/src/concurrency-manager.test.ts +++ b/packages/provider-concurrency/src/concurrency-manager.test.ts @@ -1,3 +1,4 @@ +import type { ProviderUsage } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; import { type ConcurrencyService, createConcurrencyManager } from "./concurrency-manager.js"; @@ -59,7 +60,11 @@ function createFakeTimers() { }; } -function createManager(opts?: { releaseCooldownMs?: number }): { +function createManager(opts?: { + releaseCooldownMs?: number; + fetchUsage?: (providerId: string) => Promise<ProviderUsage | undefined>; + onLimitReduced?: (providerId: string, newLimit: number, oldLimit: number) => void; +}): { manager: ConcurrencyService; timers: ReturnType<typeof createFakeTimers>; } { @@ -70,6 +75,8 @@ function createManager(opts?: { releaseCooldownMs?: number }): { watchdogIntervalMs: 1000, defaultPauseMs: 30000, ...(opts?.releaseCooldownMs !== undefined ? { releaseCooldownMs: opts.releaseCooldownMs } : {}), + ...(opts?.fetchUsage !== undefined ? { fetchUsage: opts.fetchUsage } : {}), + ...(opts?.onLimitReduced !== undefined ? { onLimitReduced: opts.onLimitReduced } : {}), setTimeout: timers.setTimeout, clearTimeout: timers.clearTimeout, setInterval: timers.setInterval, @@ -100,6 +107,8 @@ describe("createConcurrencyManager", () => { inFlight: 1, queued: 0, paused: false, + cooldownMs: 0, + autoReduced: false, }); release1(); expect(manager.getStatus("umans")?.inFlight).toBe(0); @@ -342,6 +351,8 @@ describe("createConcurrencyManager", () => { inFlight: 0, queued: 0, paused: false, + cooldownMs: 0, + autoReduced: false, }); }); @@ -484,4 +495,306 @@ describe("createConcurrencyManager", () => { expect(queuedCalled).toBe(false); release(); }); + + // ─── Configurable cooldown ────────────────────────────────────────────── + + it("setCooldown changes the cooldown applied to subsequently recycled slots", async () => { + const { manager, timers } = createManager({ releaseCooldownMs: 200 }); + manager.setLimit("umans", 1); + + const release1 = await manager.acquire("umans", "conv1", 0); + expect(manager.getStatus("umans")?.cooldownMs).toBe(200); + + // Bump the cooldown to 500ms. + manager.setCooldown("umans", 500); + expect(manager.getStatus("umans")?.cooldownMs).toBe(500); + + // Queue a waiter. + let resolved = false; + const promise2 = manager.acquire("umans", "conv2", 100).then((r) => { + resolved = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // Release — the NEW cooldown (500ms) applies. + release1(); + expect(resolved).toBe(false); + timers.advance(200); // old cooldown elapsed — still cooling (500ms now). + expect(resolved).toBe(false); + timers.advance(300); // 500ms total → slot recycled, waiter granted. + const release2 = await promise2; + expect(resolved).toBe(true); + release2(); + }); + + it("getCooldowns returns all configured cooldowns", () => { + const { manager } = createManager({ releaseCooldownMs: 350 }); + manager.setLimit("umans", 4); + manager.setCooldown("openai-compat", 100); + + const cooldowns = manager.getCooldowns(); + expect(cooldowns).toContainEqual({ providerId: "umans", cooldownMs: 350 }); + expect(cooldowns).toContainEqual({ providerId: "openai-compat", cooldownMs: 100 }); + }); + + it("setCooldown does NOT impose a limit when none is configured (override seeds on setLimit)", async () => { + const { manager } = createManager({ releaseCooldownMs: 350 }); + + // Set a cooldown with NO limit configured yet. + manager.setCooldown("umans", 500); + expect(manager.getCooldown("umans")).toBe(500); + // No limit → acquire must be unlimited (no state with a limit imposed). + const release = await manager.acquire("umans", "conv1", 0); + expect(typeof release).toBe("function"); + release(); + expect(manager.getStatus("umans")).toBeUndefined(); // no limit state created + expect(manager.getLimit("umans")).toBeUndefined(); + + // Now set a limit — the pending cooldown override seeds the new state. + manager.setLimit("umans", 4); + expect(manager.getStatus("umans")?.cooldownMs).toBe(500); + }); + + // ─── Adaptive headroom (reduce limit by 1 on 429) ──────────────────────── + + it("reportRateLimit reduces the limit by 1 (one-way) and sets autoReduced notice", () => { + const reduced: { providerId: string; newLimit: number; oldLimit: number }[] = []; + const { manager } = createManager({ + onLimitReduced: (p, n, o) => reduced.push({ providerId: p, newLimit: n, oldLimit: o }), + }); + manager.setLimit("umans", 4); + + manager.reportRateLimit("umans"); + + expect(manager.getLimit("umans")).toBe(3); + const status = manager.getStatus("umans"); + expect(status?.autoReduced).toBe(true); + expect(status?.autoReducedFrom).toBe(4); + expect(status?.notice).toContain("auto-reduced to 3"); + expect(reduced).toEqual([{ providerId: "umans", newLimit: 3, oldLimit: 4 }]); + }); + + it("repeated 429s keep reducing (4 -> 3 -> 2 -> 1) and never go below 1", () => { + const { manager } = createManager(); + manager.setLimit("umans", 4); + + manager.reportRateLimit("umans"); + expect(manager.getLimit("umans")).toBe(3); + manager.reportRateLimit("umans"); + expect(manager.getLimit("umans")).toBe(2); + manager.reportRateLimit("umans"); + expect(manager.getLimit("umans")).toBe(1); + // Already at the floor — stays 1. + manager.reportRateLimit("umans"); + expect(manager.getLimit("umans")).toBe(1); + const status = manager.getStatus("umans"); + expect(status?.autoReduced).toBe(true); + // autoReducedFrom records the FIRST reduction's original limit (4). + expect(status?.autoReducedFrom).toBe(4); + }); + + it("a MANUAL setLimit clears the auto-reduce notice", () => { + const { manager } = createManager(); + manager.setLimit("umans", 4); + manager.reportRateLimit("umans"); // 4 -> 3, autoReduced + expect(manager.getStatus("umans")?.autoReduced).toBe(true); + + // User restores the limit manually. + manager.setLimit("umans", 4); + const status = manager.getStatus("umans"); + expect(status?.autoReduced).toBe(false); + expect(status?.autoReducedFrom).toBeUndefined(); + expect(status?.notice).toBeUndefined(); + }); + + it("removeLimit clears the auto-reduce state", () => { + const { manager } = createManager(); + manager.setLimit("umans", 4); + manager.reportRateLimit("umans"); // auto-reduced + expect(manager.getStatus("umans")?.autoReduced).toBe(true); + + manager.removeLimit("umans"); + expect(manager.getStatus("umans")).toBeUndefined(); + }); + + // ─── Usage gate (poll concurrent_sessions before granting queued agents) ─ + + it("usage gate blocks a queued waiter while upstream concurrent_sessions >= limit", async () => { + // Upstream always reports AT the limit (4) → the gate never admits. + const { manager, timers } = createManager({ + fetchUsage: async () => ({ concurrentSessions: 4 }), + }); + manager.setLimit("umans", 4); + manager.setCooldown("umans", 0); // no cooldown — isolate the gate + + // Fill all 4 slots (fast-path, no gate). + const releases = await Promise.all([ + manager.acquire("umans", "c1", 0), + manager.acquire("umans", "c2", 0), + manager.acquire("umans", "c3", 0), + manager.acquire("umans", "c4", 0), + ]); + expect(manager.getStatus("umans")?.inFlight).toBe(4); + + // 5th agent queues. + let resolved = false; + const promise5 = manager.acquire("umans", "c5", 10).then((r) => { + resolved = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + expect(manager.getStatus("umans")?.queued).toBe(1); + + // Release one slot. Cooldown is 0 → recycle polls upstream → 4 >= 4 → NOT granted. + releases[0]?.(); + // Let the async poll settle. + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + expect(manager.getStatus("umans")?.queued).toBe(1); + + // Advance past the 1s fallback repoll — still 4 → still blocked. + timers.advance(1000); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + for (const r of releases.slice(1)) r?.(); + void promise5; + }); + + it("usage gate admits a queued waiter once upstream concurrent_sessions < limit", async () => { + // Upstream starts at the limit; drops to 3 after the release. + let upstream = 4; + const { manager } = createManager({ + fetchUsage: async () => ({ concurrentSessions: upstream }), + }); + manager.setLimit("umans", 4); + manager.setCooldown("umans", 0); + + const releases = await Promise.all([ + manager.acquire("umans", "c1", 0), + manager.acquire("umans", "c2", 0), + manager.acquire("umans", "c3", 0), + manager.acquire("umans", "c4", 0), + ]); + + let resolved = false; + const promise5 = manager.acquire("umans", "c5", 10).then((r) => { + resolved = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // Upstream now drops to 3 (the released session finally decremented). + upstream = 3; + // Release a slot → cooldown 0 → poll → 3 < 4 → admit the waiter. + releases[0]?.(); + const release5 = await promise5; + expect(resolved).toBe(true); + expect(manager.getStatus("umans")?.queued).toBe(0); + + release5(); + for (const r of releases.slice(1)) r?.(); + }); + + it("usage gate falls back to granting when fetchUsage returns undefined", async () => { + const { manager } = createManager({ + fetchUsage: async () => undefined, // no usage info available + }); + manager.setLimit("umans", 1); + manager.setCooldown("umans", 0); + + const release1 = await manager.acquire("umans", "c1", 0); + let resolved = false; + const promise2 = manager.acquire("umans", "c2", 10).then((r) => { + resolved = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // Release → poll returns undefined → fall back to cooldown-only (grant). + release1(); + const release2 = await promise2; + expect(resolved).toBe(true); + release2(); + }); + + it("usage gate admits at most ONE queued waiter per successful poll", async () => { + let upstream = 4; + const { manager } = createManager({ + fetchUsage: async () => ({ concurrentSessions: upstream }), + }); + manager.setLimit("umans", 4); + manager.setCooldown("umans", 0); + + const releases = await Promise.all([ + manager.acquire("umans", "c1", 0), + manager.acquire("umans", "c2", 0), + manager.acquire("umans", "c3", 0), + manager.acquire("umans", "c4", 0), + ]); + + // Queue two waiters. + let r5 = false; + let r6 = false; + const p5 = manager.acquire("umans", "c5", 10).then((r) => { + r5 = true; + return r; + }); + const p6 = manager.acquire("umans", "c6", 20).then((r) => { + r6 = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(manager.getStatus("umans")?.queued).toBe(2); + + // Upstream drops to 3. Release one slot → poll 3 < 4 → admit ONE (c5). + upstream = 3; + releases[0]?.(); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + expect(r5).toBe(true); + expect(r6).toBe(false); // c6 still queued — needs another poll. + expect(manager.getStatus("umans")?.queued).toBe(1); + + const release5 = await p5; + release5(); + void p6; + for (const r of releases.slice(1)) r?.(); + }); + + it("usage gate clears the fallback repoll timer when the queue drains", () => { + const { manager, timers } = createManager({ + fetchUsage: async () => ({ concurrentSessions: 0 }), + }); + manager.setLimit("umans", 1); + manager.setCooldown("umans", 0); + + return manager.acquire("umans", "c1", 0).then(async (release1) => { + // Queue a waiter (arms the 1s fallback timer). + const p2 = manager.acquire("umans", "c2", 10); + await Promise.resolve(); + await Promise.resolve(); + + // Release → poll 0 < 1 → grant → queue drains → fallback timer cleared. + release1(); + const release2 = await p2; + release2(); + + // Advancing past 1s must NOT throw or fire at a drained state. + expect(() => timers.advance(1000)).not.toThrow(); + }); + }); }); diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts index d4516b2..2d3d857 100644 --- a/packages/provider-concurrency/src/concurrency-manager.ts +++ b/packages/provider-concurrency/src/concurrency-manager.ts @@ -7,14 +7,32 @@ * (the agent whose current prompt started the longest ago wins the next slot). * * A watchdog reclaims slots held beyond a timeout (deadlock / stuck-agent - * recovery). 429 backoff pauses a provider's queue for a configurable duration. + * recovery). 429 backoff pauses a provider's queue for a configurable duration + * AND adaptively reduces the effective limit by 1 (one-way, persisted) so the + * resumed queue runs with headroom instead of re-overshooting. * - * This module is the PURE decision logic. It takes an injected clock (`now`) - * and injected timers (`setTimeout`/`clearTimeout`/`setInterval`/`clearInterval`) - * so it is fully testable with deterministic fake time. The extension layer - * wires real timers. + * ── Usage gate (anti-overshoot) ── + * When a `fetchUsage` callback is injected, before admitting a QUEUED agent the + * manager polls the provider's upstream `concurrent_sessions` count and grants + * only when it is below the configured limit. This composes with the release + * cooldown: release → cooldown delay → usage-gate poll → grant (only if upstream + * has room). A waiter is re-checked on two triggers (either one): another agent + * releases a slot (immediate re-poll, restarting the 1s countdown) or a 1s + * fallback timer elapses (in case the upstream count drops on its own). Each + * successful poll admits at most ONE queued waiter (each admission pushes the + * upstream count back toward the limit), so additional waiters are admitted on + * subsequent repolls. When `fetchUsage` is absent or returns `undefined`, the + * gate is skipped and the manager falls back to cooldown-only recycling. + * + * This module is the PURE decision logic. It takes an injected clock (`now`), + * injected timers (`setTimeout`/`clearTimeout`/`setInterval`/`clearInterval`), + * and an injected usage-poll effect (`fetchUsage`) so it is fully testable with + * deterministic fake time + a fake fetcher. The extension layer wires real + * timers + the host's provider registry. */ +import type { ProviderUsage } from "@dispatch/kernel"; + // ─── Types ─────────────────────────────────────────────────────────────────── /** Status snapshot for a single provider's concurrency state. */ @@ -30,6 +48,25 @@ export interface ProviderConcurrencyStatus { readonly paused: boolean; /** When the pause expires (epoch-ms). Present only when paused. */ readonly pausedUntil?: number; + /** + * Per-slot release cooldown (ms) — how long a recycled slot is held before the + * next waiter is admitted. Covers the upstream provider's accounting lag. + * Configurable + persisted per provider. + */ + readonly cooldownMs: number; + /** + * Whether the limit was auto-reduced by a 429 (adaptive headroom). The user + * restores the limit manually (PUT /concurrency/limits/:providerId) which + * clears this flag. The frontend renders a visible notice when `true`. + */ + readonly autoReduced: boolean; + /** The original limit before auto-reduction (present only when autoReduced). */ + readonly autoReducedFrom?: number; + /** + * A human-readable notice string for the frontend to render as a banner when + * the limit was auto-reduced. Present only when `autoReduced` is true. + */ + readonly notice?: string; } /** @@ -66,8 +103,10 @@ export interface ConcurrencyLimiter { /** * Report a 429 from a provider. Pauses the queue for that provider for - * `retryAfterMs` (or a default duration when omitted). Queued and in-flight - * requests are unaffected; new `acquire` calls block until the pause expires. + * `retryAfterMs` (or a default duration when omitted), AND reduces the + * provider's effective limit by 1 (one-way, down to a minimum of 1) so the + * resumed queue runs with headroom. Queued and in-flight requests are + * otherwise unaffected; new `acquire` calls block until the pause expires. */ reportRateLimit(providerId: string, retryAfterMs?: number): void; } @@ -76,7 +115,7 @@ export interface ConcurrencyLimiter { * The full service surface (limiter + config + status) for HTTP routes. */ export interface ConcurrencyService extends ConcurrencyLimiter { - /** Set the concurrency limit for a provider. Creates the state if new. */ + /** Set the concurrency limit for a provider (MANUAL — clears the auto-reduce notice). Creates the state if new. */ setLimit(providerId: string, limit: number): void; /** Get the configured limit, or `undefined` when none. */ getLimit(providerId: string): number | undefined; @@ -84,6 +123,16 @@ export interface ConcurrencyService extends ConcurrencyLimiter { removeLimit(providerId: string): void; /** All configured limits as `{ providerId, limit }` entries. */ getLimits(): readonly { providerId: string; limit: number }[]; + /** + * Set the release cooldown (ms) for a provider. Applied to subsequently + * recycled slots; in-flight cooldown timers keep their original duration. + * Creates the state if new (with no limit — unlimited but cooldown-gated). + */ + setCooldown(providerId: string, cooldownMs: number): void; + /** Get the configured cooldown (ms), or `undefined` when none was set. */ + getCooldown(providerId: string): number | undefined; + /** All configured cooldowns as `{ providerId, cooldownMs }` entries. */ + getCooldowns(): readonly { providerId: string; cooldownMs: number }[]; /** Status for one provider, or `undefined` when no limit is configured. */ getStatus(providerId: string): ProviderConcurrencyStatus | undefined; /** Status for every provider with a configured limit. */ @@ -115,6 +164,19 @@ interface ProviderState { paused: boolean; pausedUntil: number | undefined; pauseTimer: ReturnType<typeof setTimeout> | undefined; + /** Per-provider release cooldown (ms). Defaults to the manager opt; settable at runtime. */ + cooldownMs: number; + // ── Adaptive headroom ── + autoReduced: boolean; + autoReducedFrom: number | undefined; + notice: string | undefined; + // ── Usage-gate state ── + /** A usage poll is in flight for this provider (prevents overlapping polls). */ + gatePolling: boolean; + /** Another repoll trigger fired while a poll was in flight → re-poll on completion. */ + gateRepollRequested: boolean; + /** The 1s fallback repoll timer (re-checked periodically even without releases). */ + gateRepollTimer: ReturnType<typeof setTimeout> | undefined; } export interface ConcurrencyManagerOpts { @@ -127,24 +189,40 @@ export interface ConcurrencyManagerOpts { /** Default pause duration when a 429 arrives without Retry-After (ms). */ readonly defaultPauseMs: number; /** - * Delay after a slot is released before the slot is recycled (ms). During - * this window `inFlight` stays incremented — a new `acquire` sees the slot - * as still held and queues. This covers the upstream provider's accounting - * lag: the provider's `concurrent_sessions` counter may not decrement the - * instant our stream completes, so re-admitting immediately risks an N+1 - * overshoot. 0 = instant re-admission (no cooldown). Default: 0. + * Default delay after a slot is released before the slot is recycled (ms). + * During this window `inFlight` stays incremented — a new `acquire` sees the + * slot as still held and queues. This covers the upstream provider's + * accounting lag: the provider's `concurrent_sessions` counter may not + * decrement the instant our stream completes, so re-admitting immediately + * risks an N+1 overshoot. 0 = instant re-admission (no cooldown). Default: 0. + * Per-provider overrides via `setCooldown`. */ readonly releaseCooldownMs?: number; + /** + * Injected usage-poll effect. When present, before admitting a QUEUED agent + * the manager calls this and grants only when `concurrentSessions` is below + * the configured limit (usage gate). When absent, the manager falls back to + * cooldown-only slot recycling. Injected (like `now`/`setTimeout`) so the + * manager stays unit-testable with a fake fetcher; never hardcodes `fetch`. + */ + readonly fetchUsage?: (providerId: string) => Promise<ProviderUsage | undefined>; /** Injected timers (default: global). Override in tests for deterministic time. */ readonly setTimeout?: typeof setTimeout; readonly clearTimeout?: typeof clearTimeout; readonly setInterval?: typeof setInterval; readonly clearInterval?: typeof clearInterval; - /** Optional logger for watchdog + pause events. */ + /** Optional logger for watchdog + pause + auto-reduce events. */ readonly onWatchdogReclaim?: (providerId: string, conversationId: string, heldMs: number) => void; readonly onPause?: (providerId: string, durationMs: number) => void; + /** Fired when a 429 adaptively reduces a provider's limit (for persistence + logging). */ + readonly onLimitReduced?: (providerId: string, newLimit: number, oldLimit: number) => void; } +/** Min interval between usage-gate fallback repolls (ms). The release trigger is immediate. */ +const USAGE_REPOLL_INTERVAL_MS = 1000; +/** Minimum the limit may be auto-reduced to (never 0). */ +const MIN_LIMIT = 1; + function noopRelease(): void { // No limit configured → nothing to release. } @@ -153,16 +231,42 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre const now = opts.now; const slotTimeoutMs = opts.slotTimeoutMs; const defaultPauseMs = opts.defaultPauseMs; - const releaseCooldownMs = opts.releaseCooldownMs ?? 0; + const defaultCooldownMs = opts.releaseCooldownMs ?? 0; + const fetchUsage = opts.fetchUsage; const setTimeout = opts.setTimeout ?? globalThis.setTimeout.bind(globalThis); const clearTimeout = opts.clearTimeout ?? globalThis.clearTimeout.bind(globalThis); const setInterval = opts.setInterval ?? globalThis.setInterval.bind(globalThis); const clearInterval = opts.clearInterval ?? globalThis.clearInterval.bind(globalThis); const states = new Map<string, ProviderState>(); + const cooldownOverrides = new Map<string, number>(); const cooldownTimers = new Set<ReturnType<typeof setTimeout>>(); let slotIdCounter = 0; + function makeState(limit: number, cooldownMs: number): ProviderState { + return { + limit, + inFlight: 0, + slots: new Map(), + queue: [], + paused: false, + pausedUntil: undefined, + pauseTimer: undefined, + cooldownMs, + autoReduced: false, + autoReducedFrom: undefined, + notice: undefined, + gatePolling: false, + gateRepollRequested: false, + gateRepollTimer: undefined, + }; + } + + /** Seed the cooldown for new state from any pending override (else the default). */ + function seedCooldown(providerId: string): number { + return cooldownOverrides.get(providerId) ?? defaultCooldownMs; + } + // ── Slot granting ────────────────────────────────────────────────────────── function grantSlot(state: ProviderState, providerId: string, conversationId: string): () => void { @@ -173,20 +277,20 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre released = true; state.slots.delete(id); - // Recycle the slot: decrement inFlight + grant the next waiter. + // Recycle the slot: decrement inFlight + attempt to grant the next waiter. // With a release cooldown > 0, defer this by the cooldown duration so // the upstream provider has time to decrement its concurrent_sessions // counter — preventing an N+1 overshoot from accounting lag. During the // cooldown, inFlight stays incremented, so new acquires queue. const recycle = () => { state.inFlight--; - tryGrantNext(providerId); + void tryGrantNext(providerId); }; - if (releaseCooldownMs > 0) { + if (state.cooldownMs > 0) { const timer = setTimeout(() => { cooldownTimers.delete(timer); recycle(); - }, releaseCooldownMs); + }, state.cooldownMs); cooldownTimers.add(timer); } else { recycle(); @@ -201,10 +305,12 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre return releaseFn; } - function tryGrantNext(providerId: string): void { - const state = states.get(providerId); - if (state === undefined) return; - if (state.paused) return; + /** + * Grant queued waiters WITHOUT the usage gate (the fast path used when no + * `fetchUsage` is configured, or after a poll confirmed upstream has room). + * Grants while there is internal room (`inFlight < limit`). Synchronous. + */ + function grantLoop(state: ProviderState, providerId: string): void { while (state.queue.length > 0 && state.inFlight < state.limit) { const waiter = state.queue[0]; if (waiter === undefined) break; @@ -212,6 +318,105 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre const releaseFn = grantSlot(state, providerId, waiter.conversationId); waiter.resolve(releaseFn); } + // If the queue drained, no need to keep the usage-gate fallback timer armed. + if (state.queue.length === 0 && state.gateRepollTimer !== undefined) { + clearTimeout(state.gateRepollTimer); + state.gateRepollTimer = undefined; + } + } + + /** + * Drain the queue, gated on the upstream usage poll when `fetchUsage` is + * configured. Called from release (post-cooldown), setLimit, pause-expiry, + * and the repoll timer. Async because the usage poll is an injected I/O + * effect; callers fire-and-forget the returned promise. + * + * Only QUEUED agents are gated. The fast-path immediate grant in `acquire` + * (when `inFlight < limit`) is NOT gated — it fires only when there is + * clearly internal room, and the cooldown keeps `inFlight` inflated during + * the accounting-lag window so the fast-path does not fire then. + * + * Each successful poll admits at most ONE queued waiter (each admission pushes + * the upstream count back toward the limit); additional waiters are admitted + * on subsequent repolls (release triggers an immediate re-poll; the 1s + * fallback timer covers an upstream count that drops on its own). + */ + async function tryGrantNext(providerId: string): Promise<void> { + const state = states.get(providerId); + if (state === undefined) return; + if (state.paused) return; + if (state.queue.length === 0) return; + if (state.inFlight >= state.limit) return; // no internal room + + // No usage gate → immediate grant loop (original behavior). + if (fetchUsage === undefined) { + grantLoop(state, providerId); + return; + } + + // Avoid overlapping polls for this provider. A poll is already in flight; + // mark that another trigger fired so it re-polls on completion. + if (state.gatePolling) { + state.gateRepollRequested = true; + return; + } + + await pollAndGrant(providerId, state); + } + + /** Poll upstream usage, then grant if there is headroom. */ + async function pollAndGrant(providerId: string, state: ProviderState): Promise<void> { + state.gatePolling = true; + try { + const snapshot = await fetchUsage?.(providerId); + + // Conditions may have changed during the async poll — re-check. + if (state.paused) return; + if (state.queue.length === 0) return; + if (state.inFlight >= state.limit) return; + + if (snapshot === undefined) { + // No usage info available → fall back to cooldown-only (grant). + grantLoop(state, providerId); + return; + } + + if (snapshot.concurrentSessions < state.limit) { + // Upstream has room — admit exactly ONE queued waiter (grantLoop will + // stop after one because granting increments inFlight toward limit, and + // each admission pushes upstream back toward the limit). + grantLoop(state, providerId); + } + // else: upstream at/over limit → keep queued; repoll timer handles retry. + } finally { + state.gatePolling = false; + // (Re)arm the 1s fallback timer while waiters remain queued, so an + // upstream count that drops on its own is still detected. + armGateRepoll(providerId, state); + if (state.gateRepollRequested) { + state.gateRepollRequested = false; + // A release (or other trigger) fired during the poll → re-poll now. + void tryGrantNext(providerId); + } + } + } + + function armGateRepoll(providerId: string, state: ProviderState): void { + // Only arm while there are queued waiters (otherwise no work to re-check). + if (state.queue.length === 0) { + if (state.gateRepollTimer !== undefined) { + clearTimeout(state.gateRepollTimer); + state.gateRepollTimer = undefined; + } + return; + } + if (state.gateRepollTimer !== undefined) { + clearTimeout(state.gateRepollTimer); + } + state.gateRepollTimer = setTimeout(() => { + state.gateRepollTimer = undefined; + void tryGrantNext(providerId); + }, USAGE_REPOLL_INTERVAL_MS); } // ── Watchdog ────────────────────────────────────────────────────────────────── @@ -231,6 +436,14 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre const watchdogTimer = setInterval(sweep, opts.watchdogIntervalMs); + // ── Adaptive headroom ────────────────────────────────────────────────────── + + function clearAutoReduce(state: ProviderState): void { + state.autoReduced = false; + state.autoReducedFrom = undefined; + state.notice = undefined; + } + // ── Public API ───────────────────────────────────────────────────────────── const manager: ConcurrencyService = { @@ -254,9 +467,13 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre return new Promise<() => void>((resolve) => { state.queue.push({ conversationId, promptStartedAt, resolve }); // Keep sorted ascending by promptStartedAt (oldest first). - // Insertion sort would be O(n), but the queue is typically tiny (<20), - // so a simple sort is fine and keeps the code simple. state.queue.sort((a, b) => a.promptStartedAt - b.promptStartedAt); + // If the usage gate is active, ensure the fallback repoll timer is + // armed (a release may not come for a while; the 1s timer covers an + // upstream count that drops on its own). + if (fetchUsage !== undefined) { + armGateRepoll(providerId, state); + } }); }, @@ -272,32 +489,43 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre clearTimeout(state.pauseTimer); } opts.onPause?.(providerId, pauseDuration); + + // Adaptive headroom: reduce the effective limit by 1 (one-way, min 1) so + // the resumed queue runs with headroom instead of re-overshooting. The + // reduction is persisted + surfaced (via onLimitReduced + status). + if (state.limit > MIN_LIMIT) { + const oldLimit = state.limit; + state.limit = Math.max(MIN_LIMIT, state.limit - 1); + if (!state.autoReduced) { + state.autoReduced = true; + state.autoReducedFrom = oldLimit; + } + state.notice = + `Concurrency limit auto-reduced to ${state.limit} after a 429 — ` + + "restore manually when ready."; + opts.onLimitReduced?.(providerId, state.limit, oldLimit); + } + state.pauseTimer = setTimeout(() => { state.paused = false; state.pausedUntil = undefined; state.pauseTimer = undefined; - tryGrantNext(providerId); + void tryGrantNext(providerId); }, pauseDuration); }, setLimit(providerId, limit) { let state = states.get(providerId); if (state === undefined) { - state = { - limit, - inFlight: 0, - slots: new Map(), - queue: [], - paused: false, - pausedUntil: undefined, - pauseTimer: undefined, - }; + state = makeState(limit, seedCooldown(providerId)); states.set(providerId, state); } else { state.limit = limit; + // A MANUAL limit set clears the auto-reduce notice (the user took control). + clearAutoReduce(state); } // A higher limit may let queued requests through. - tryGrantNext(providerId); + void tryGrantNext(providerId); }, getLimit(providerId) { @@ -315,6 +543,12 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre clearTimeout(state.pauseTimer); state.pauseTimer = undefined; } + // Clear usage-gate fallback timer. + if (state.gateRepollTimer !== undefined) { + clearTimeout(state.gateRepollTimer); + state.gateRepollTimer = undefined; + } + clearAutoReduce(state); // Grant all queued requests (they become unlimited now). while (state.queue.length > 0) { @@ -338,6 +572,41 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre })); }, + setCooldown(providerId, cooldownMs) { + // A cooldown is only meaningful WITH a limit (it gates slot recycling, + // which only happens under a limit). But we store the override regardless + // so it applies the moment a limit IS set — and so a persisted cooldown + // restored before a limit does NOT impose a limit (setCooldown never + // creates a state). If a state already exists, update it live. + cooldownOverrides.set(providerId, cooldownMs); + const state = states.get(providerId); + if (state !== undefined) { + state.cooldownMs = cooldownMs; + } + }, + + getCooldown(providerId) { + const state = states.get(providerId); + if (state !== undefined) return state.cooldownMs; + return cooldownOverrides.get(providerId); + }, + + getCooldowns() { + // Merge: states (cooldown from state.cooldownMs) + pending overrides with no state. + const seen = new Set<string>(); + const out: { providerId: string; cooldownMs: number }[] = []; + for (const [providerId, s] of states) { + seen.add(providerId); + out.push({ providerId, cooldownMs: s.cooldownMs }); + } + for (const [providerId, cooldownMs] of cooldownOverrides) { + if (!seen.has(providerId)) { + out.push({ providerId, cooldownMs }); + } + } + return out; + }, + getStatus(providerId) { const state = states.get(providerId); if (state === undefined) return undefined; @@ -347,7 +616,11 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre inFlight: state.inFlight, queued: state.queue.length, paused: state.paused, + cooldownMs: state.cooldownMs, + autoReduced: state.autoReduced, ...(state.pausedUntil !== undefined ? { pausedUntil: state.pausedUntil } : {}), + ...(state.autoReducedFrom !== undefined ? { autoReducedFrom: state.autoReducedFrom } : {}), + ...(state.notice !== undefined ? { notice: state.notice } : {}), }; }, @@ -367,6 +640,9 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre if (state.pauseTimer !== undefined) { clearTimeout(state.pauseTimer); } + if (state.gateRepollTimer !== undefined) { + clearTimeout(state.gateRepollTimer); + } } states.clear(); }, diff --git a/packages/provider-concurrency/src/extension.ts b/packages/provider-concurrency/src/extension.ts index 4af4f9a..261d531 100644 --- a/packages/provider-concurrency/src/extension.ts +++ b/packages/provider-concurrency/src/extension.ts @@ -24,24 +24,31 @@ export const manifest: Manifest = { * - `DEFAULT_PAUSE_MS` (30s): default 429 backoff when no Retry-After is given. * Umans docs note each concurrency 429 deprioritizes the account for ~30 min, * but a 30s queue pause prevents immediate re-overshoot while still allowing - * recovery. - * - `RELEASE_COOLDOWN_MS` (200ms): after a slot is released, hold it for this + * recovery. Combined with adaptive headroom (limit reduced by 1) + the usage + * gate, the resumed queue no longer re-overshoots — so the pause is kept + * (gives upstream a breather) rather than dropped. + * - `RELEASE_COOLDOWN_MS` (350ms): after a slot is released, hold it for this * duration before recycling it to the next waiter. Covers the upstream - * provider's accounting lag — the provider's concurrent_sessions counter - * may not decrement the instant our stream completes, so re-admitting - * immediately risks an N+1 overshoot that triggers a 429. 200ms is the - * default most concurrency proxies use for AI/LLM APIs. + * provider's accounting lag — the provider's concurrent_sessions counter may + * not decrement the instant our stream completes, so re-admitting immediately + * risks an N+1 overshoot that triggers a 429. Raised from 200ms to 350ms + * (Umans's accounting lag exceeded the 200ms cooldown, causing overshoot at 4 + * connections). Configurable + persisted per provider (PUT + * /concurrency/cooldown/:providerId). */ const SLOT_TIMEOUT_MS = 5 * 60 * 1000; const WATCHDOG_INTERVAL_MS = 30 * 1000; const DEFAULT_PAUSE_MS = 30 * 1000; -const RELEASE_COOLDOWN_MS = 200; +const RELEASE_COOLDOWN_MS = 350; + +/** Storage key prefix for persisted cooldowns (limits are stored under the bare providerId). */ +const COOLDOWN_KEY_PREFIX = "cooldown:"; /** - * Wrap a `ConcurrencyService` so `setLimit`/`removeLimit` persist to the - * given `StorageNamespace`. All other methods delegate directly to the inner - * service. Persistence is fire-and-forget — a storage write failure logs a - * warning but does NOT fail the API call (the in-memory limit is already set). + * Wrap a `ConcurrencyService` so `setLimit`/`removeLimit`/`setCooldown` persist + * to the given `StorageNamespace`. All other methods delegate directly to the + * inner service. Persistence is fire-and-forget — a storage write failure logs + * a warning but does NOT fail the API call (the in-memory value is already set). */ function createPersistedService( inner: ConcurrencyService, @@ -69,8 +76,19 @@ function createPersistedService( }), ); }, + setCooldown(providerId, cooldownMs) { + inner.setCooldown(providerId, cooldownMs); + storage.set(`${COOLDOWN_KEY_PREFIX}${providerId}`, String(cooldownMs)).catch((err) => + logger.warn("provider-concurrency: failed to persist cooldown", { + providerId, + err: err instanceof Error ? err.message : String(err), + }), + ); + }, getLimit: inner.getLimit.bind(inner), getLimits: inner.getLimits.bind(inner), + getCooldown: inner.getCooldown.bind(inner), + getCooldowns: inner.getCooldowns.bind(inner), getStatus: inner.getStatus.bind(inner), getStatusAll: inner.getStatusAll.bind(inner), destroy: inner.destroy.bind(inner), @@ -79,7 +97,8 @@ function createPersistedService( /** * Load saved limits from storage and apply them to the manager. - * Called during activate, before the service is registered. + * Called during activate, before the service is registered. Skips cooldown + * keys (prefixed `cooldown:`) — those are loaded by {@link loadCooldowns}. */ async function loadLimits( storage: StorageNamespace, @@ -87,7 +106,9 @@ async function loadLimits( logger: Logger, ): Promise<void> { const keys = await storage.keys(); - for (const providerId of keys) { + for (const key of keys) { + if (key.startsWith(COOLDOWN_KEY_PREFIX)) continue; // cooldown settings + const providerId = key; const raw = await storage.get(providerId); if (raw === null) continue; const limit = Number.parseInt(raw, 10); @@ -98,16 +119,53 @@ async function loadLimits( } } +/** + * Load saved cooldowns from storage and apply them to the manager. + * Cooldowns are stored under `cooldown:<providerId>` keys (distinct from the + * bare-`<providerId>` limit keys) so the two settings persist independently. + */ +async function loadCooldowns( + storage: StorageNamespace, + manager: ConcurrencyService, + logger: Logger, +): Promise<void> { + const keys = await storage.keys(COOLDOWN_KEY_PREFIX); + for (const key of keys) { + const providerId = key.slice(COOLDOWN_KEY_PREFIX.length); + if (providerId.length === 0) continue; + const raw = await storage.get(key); + if (raw === null) continue; + const cooldownMs = Number.parseInt(raw, 10); + if (!Number.isNaN(cooldownMs) && cooldownMs >= 0) { + manager.setCooldown(providerId, cooldownMs); + logger.info(`provider-concurrency: restored cooldown ${cooldownMs}ms for "${providerId}"`); + } + } +} + export async function activate(host: HostAPI): Promise<void> { const logger = host.logger; const storage = host.storage("provider-concurrency"); + // Build the injected usage-poll effect from the host's provider registry. + // Lazy (called at poll time, not activate time) so activation order with the + // provider extensions doesn't matter. A provider that doesn't expose + // `getUsage` (or isn't registered) → returns undefined → the manager's usage + // gate falls back to cooldown-only recycling for that provider. This keeps + // the manager pure (the HTTP poll is an injected effect, not hardcoded fetch). + const fetchUsage = async (providerId: string) => { + const provider = host.getProviders().get(providerId); + if (provider === undefined || provider.getUsage === undefined) return undefined; + return provider.getUsage(); + }; + const managerOpts: ConcurrencyManagerOpts = { now: () => Date.now(), slotTimeoutMs: SLOT_TIMEOUT_MS, watchdogIntervalMs: WATCHDOG_INTERVAL_MS, defaultPauseMs: DEFAULT_PAUSE_MS, releaseCooldownMs: RELEASE_COOLDOWN_MS, + fetchUsage, onWatchdogReclaim: (providerId, conversationId, heldMs) => { logger.warn("provider-concurrency: watchdog reclaimed stale slot", { providerId, @@ -121,13 +179,28 @@ export async function activate(host: HostAPI): Promise<void> { durationMs, }); }, + onLimitReduced: (providerId, newLimit, oldLimit) => { + logger.warn("provider-concurrency: 429 adaptive headroom — limit reduced", { + providerId, + oldLimit, + newLimit, + }); + // Persist the reduced (one-way) limit so it survives a restart. + storage.set(providerId, String(newLimit)).catch((err) => + logger.warn("provider-concurrency: failed to persist auto-reduced limit", { + providerId, + err: err instanceof Error ? err.message : String(err), + }), + ); + }, }; const inner = createConcurrencyManager(managerOpts); - // Restore persisted limits before registering the service so the first - // request sees the correct configuration. + // Restore persisted limits + cooldowns before registering the service so the + // first request sees the correct configuration. await loadLimits(storage, inner, logger); + await loadCooldowns(storage, inner, logger); const service = createPersistedService(inner, storage, logger); host.provideService(concurrencyServiceHandle, service); diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index d5f3000..797ad22 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -1062,6 +1062,17 @@ export interface ConcurrencyLimitResponse { * - `queued`: how many agents are waiting for a slot. * - `paused`: whether the queue is paused due to a 429 backoff. * - `pausedUntil`: when the pause expires (epoch-ms), present only when paused. + * - `cooldownMs`: the per-slot release cooldown (ms). A recycled slot is held + * this long before the next waiter is admitted — covers the upstream + * provider's accounting lag. Configurable + persisted per provider. + * - `autoReduced`: whether the limit was auto-reduced by 1 after a 429 + * (adaptive headroom, one-way, persisted). The user restores the limit + * manually via `PUT /concurrency/limits/:providerId`, which clears the flag. + * When `true`, the frontend renders a visible notice/banner. + * - `autoReducedFrom`: the original limit before auto-reduction (present only + * when `autoReduced` is true). + * - `notice`: a human-readable notice string for the frontend to render as a + * banner when the limit was auto-reduced (present only when `autoReduced`). */ export interface ConcurrencyStatusEntry { readonly providerId: string; @@ -1070,6 +1081,10 @@ export interface ConcurrencyStatusEntry { readonly queued: number; readonly paused: boolean; readonly pausedUntil?: number; + readonly cooldownMs: number; + readonly autoReduced: boolean; + readonly autoReducedFrom?: number; + readonly notice?: string; } /** @@ -1079,3 +1094,26 @@ export interface ConcurrencyStatusEntry { export interface ConcurrencyStatusResponse { readonly providers: readonly ConcurrencyStatusEntry[]; } + +// ─── Provider concurrency cooldown ──────────────────────────────────────────── + +/** + * Response of `GET /concurrency/cooldown/:providerId` — the per-slot release + * cooldown (ms) for a provider. A recycled slot is held this long before the + * next waiter is admitted, covering the upstream provider's accounting lag. + * When no cooldown was explicitly set, the server default (350ms) is returned. + */ +export interface ConcurrencyCooldownResponse { + readonly providerId: string; + readonly cooldownMs: number; +} + +/** + * Body of `PUT /concurrency/cooldown/:providerId` — set the release cooldown + * (ms) for a provider. `cooldownMs` must be a non-negative integer (0 = no + * cooldown, instant re-admission). The value is persisted and applied to + * subsequently recycled slots. + */ +export interface SetConcurrencyCooldownRequest { + readonly cooldownMs: number; +} diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 0fcc8f0..ee7b2de 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -8,6 +8,7 @@ import type { ComputerListResponse, ComputerResponse, ComputerStatusResponse, + ConcurrencyCooldownResponse, ConcurrencyLimitResponse, ConcurrencyLimitsResponse, ConcurrencyStatusResponse, @@ -31,6 +32,7 @@ import type { QueueResponse, ReasoningEffortResponse, SetCompactPercentRequest, + SetConcurrencyCooldownRequest, SetConcurrencyLimitRequest, SetConversationComputerRequest, SetSystemPromptTemplateRequest, @@ -687,6 +689,53 @@ export function createApp(opts: CreateServerOptions): Hono { return c.json({ ok: true, providerId }, 200); }); + app.get("/concurrency/cooldown/:providerId", (c) => { + const providerId = c.req.param("providerId"); + if (opts.concurrencyService === undefined) { + return c.json({ error: "Concurrency service not available" }, 503); + } + // A cooldown may be the default (when a limit is configured but no explicit + // cooldown was set) or explicitly set. getCooldown returns undefined only + // when the provider has NO state at all (no limit, no cooldown) — treat that + // as "not configured". + const cooldownMs = opts.concurrencyService.getCooldown(providerId); + if (cooldownMs === undefined) { + return c.json({ error: "No concurrency configuration for this provider" }, 404); + } + const body: ConcurrencyCooldownResponse = { providerId, cooldownMs }; + return c.json(body, 200); + }); + + app.put("/concurrency/cooldown/:providerId", async (c) => { + const providerId = c.req.param("providerId"); + if (opts.concurrencyService === undefined) { + return c.json({ error: "Concurrency service not available" }, 503); + } + + let body: unknown; + try { + body = await c.req.json(); + } catch { + log.warn("concurrency: invalid JSON body"); + return c.json({ error: "Invalid JSON body" }, 400); + } + + const parsed = body as SetConcurrencyCooldownRequest; + if ( + parsed === null || + typeof parsed !== "object" || + typeof parsed.cooldownMs !== "number" || + !Number.isInteger(parsed.cooldownMs) || + parsed.cooldownMs < 0 + ) { + return c.json({ error: "Body must be { cooldownMs: <non-negative integer> }" }, 400); + } + + opts.concurrencyService.setCooldown(providerId, parsed.cooldownMs); + const responseBody: ConcurrencyCooldownResponse = { providerId, cooldownMs: parsed.cooldownMs }; + return c.json(responseBody, 200); + }); + app.get("/concurrency/status", (c) => { if (opts.concurrencyService === undefined) { const body: ConcurrencyStatusResponse = { providers: [] }; |
