summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-28 12:43:29 +0900
committerAdam Malczewski <[email protected]>2026-06-28 12:43:29 +0900
commit2d276669a0cb41959fc67d17bc58e77853dc3eb5 (patch)
treeb70893b7450522fe9d5b7e627423498ae972e191 /packages
parentf9d1ca533ad2c5d71a3bc349934d54c09de305bf (diff)
downloaddispatch-2d276669a0cb41959fc67d17bc58e77853dc3eb5.tar.gz
dispatch-2d276669a0cb41959fc67d17bc58e77853dc3eb5.zip
feat(concurrency-fixes): usage-gate + adaptive headroom + configurable cooldown
Diffstat (limited to 'packages')
-rw-r--r--packages/kernel/src/contracts/index.ts1
-rw-r--r--packages/kernel/src/contracts/provider.ts27
-rw-r--r--packages/openai-stream/src/getUsage.test.ts139
-rw-r--r--packages/openai-stream/src/getUsage.ts73
-rw-r--r--packages/openai-stream/src/index.ts1
-rw-r--r--packages/openai-stream/src/provider.test.ts21
-rw-r--r--packages/openai-stream/src/provider.ts9
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.test.ts315
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.ts350
-rw-r--r--packages/provider-concurrency/src/extension.ts103
-rw-r--r--packages/transport-contract/src/index.ts38
-rw-r--r--packages/transport-http/src/app.ts49
12 files changed, 1073 insertions, 53 deletions
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index 28e0a0b..fc19267 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -103,6 +103,7 @@ export type {
ProviderEvent,
ProviderStreamOptions,
ProviderToolCallEvent,
+ ProviderUsage,
ReasoningDeltaEvent,
ReasoningEffort,
TextDeltaEvent,
diff --git a/packages/kernel/src/contracts/provider.ts b/packages/kernel/src/contracts/provider.ts
index 3137073..dea6c17 100644
--- a/packages/kernel/src/contracts/provider.ts
+++ b/packages/kernel/src/contracts/provider.ts
@@ -104,6 +104,19 @@ export interface ProviderStreamOptions {
}
/**
+ * A snapshot of the provider's current upstream usage. Returned by a
+ * provider's optional `getUsage` so a concurrency limiter can gate slot grants
+ * on the REAL upstream in-flight count (not just the limiter's local accounting,
+ * which lags the upstream `concurrent_sessions` counter by the release
+ * cooldown). `concurrentSessions` is the number of requests the provider counts
+ * as currently in flight.
+ */
+export interface ProviderUsage {
+ /** Upstream count of currently in-flight (generating) sessions. */
+ readonly concurrentSessions: number;
+}
+
+/**
* Metadata describing a single model a provider can serve. Returned by
* `listModels` so a catalog (e.g. the credential-store) can enumerate the
* `<credentialName>/<model>` choices a client may select. Kept minimal — `id`
@@ -154,4 +167,18 @@ export interface ProviderContract {
* credentials in; today the provider uses the key it resolved at activate.
*/
readonly listModels?: () => Promise<readonly ModelInfo[]>;
+
+ /**
+ * Fetch the provider's current upstream usage snapshot. Optional: a provider
+ * that cannot (or chooses not to) report usage omits it, and a concurrency
+ * limiter falls back to cooldown-only slot recycling (no usage gate). When
+ * present, the limiter polls this before admitting a QUEUED agent and grants
+ * only when `concurrentSessions` is below the configured limit — preventing an
+ * N+1 overshoot from the upstream accounting lag.
+ *
+ * May return `undefined` (e.g. the endpoint returned an unexpected shape or a
+ * non-200) — the limiter treats `undefined` as "no usage info available" and
+ * falls back to granting (cooldown-only behavior) for that poll.
+ */
+ readonly getUsage?: () => Promise<ProviderUsage | undefined>;
}
diff --git a/packages/openai-stream/src/getUsage.test.ts b/packages/openai-stream/src/getUsage.test.ts
new file mode 100644
index 0000000..c319b73
--- /dev/null
+++ b/packages/openai-stream/src/getUsage.test.ts
@@ -0,0 +1,139 @@
+import type { ProviderUsage } from "@dispatch/kernel";
+import type { FetchLike } from "@dispatch/trace-replay";
+import { describe, expect, it, vi } from "vitest";
+import { getUsage } from "./getUsage.js";
+
+function jsonResponse(body: unknown, status = 200): Response {
+ return new Response(JSON.stringify(body), {
+ status,
+ headers: { "Content-Type": "application/json" },
+ });
+}
+
+describe("getUsage", () => {
+ it("extracts concurrent_sessions from the Umans /v1/usage shape", async () => {
+ const fetchFn = vi.fn(
+ () =>
+ jsonResponse({
+ usage: { concurrent_sessions: 3 },
+ limits: { concurrency: { limit: 4, hard_cap: 5 } },
+ }) as unknown as ReturnType<FetchLike>,
+ );
+
+ const result = await getUsage({
+ baseURL: "https://api.umans.ai/v1",
+ apiKey: "sk-test-1234567890abcdef",
+ providerId: "umans",
+ fetchFn,
+ });
+
+ expect(result).toEqual<ProviderUsage>({ concurrentSessions: 3 });
+ expect(fetchFn).toHaveBeenCalledOnce();
+ const call = fetchFn.mock.calls[0];
+ expect(call?.[0]).toBe("https://api.umans.ai/v1/usage");
+ const init = call?.[1] as RequestInit;
+ expect(init.method).toBe("GET");
+ expect((init.headers as Record<string, string>).Authorization).toBe(
+ "Bearer sk-test-1234567890abcdef",
+ );
+ });
+
+ it("returns undefined on non-200 (endpoint unsupported)", async () => {
+ const fetchFn = vi.fn(
+ () => jsonResponse({ error: "not found" }, 404) as unknown as ReturnType<FetchLike>,
+ );
+
+ const result = await getUsage({
+ baseURL: "https://api.example.com/v1",
+ apiKey: "sk-test",
+ providerId: "openai",
+ fetchFn,
+ });
+
+ expect(result).toBeUndefined();
+ });
+
+ it("returns undefined on a network error (never throws)", async () => {
+ const fetchFn = vi.fn(
+ () => Promise.reject(new Error("ECONNREFUSED")) as unknown as Promise<Response>,
+ );
+
+ const result = await getUsage({
+ baseURL: "https://api.example.com/v1",
+ apiKey: "sk-test",
+ providerId: "openai",
+ fetchFn,
+ });
+
+ expect(result).toBeUndefined();
+ });
+
+ it("returns undefined when the response shape is unexpected", async () => {
+ const fetchFn = vi.fn(
+ () => jsonResponse({ unexpected: true }) as unknown as ReturnType<FetchLike>,
+ );
+
+ const result = await getUsage({
+ baseURL: "https://api.example.com/v1",
+ apiKey: "sk-test",
+ providerId: "openai",
+ fetchFn,
+ });
+
+ expect(result).toBeUndefined();
+ });
+
+ it("returns undefined when concurrent_sessions is not a number", async () => {
+ const fetchFn = vi.fn(
+ () =>
+ jsonResponse({
+ usage: { concurrent_sessions: "three" },
+ }) as unknown as ReturnType<FetchLike>,
+ );
+
+ const result = await getUsage({
+ baseURL: "https://api.example.com/v1",
+ apiKey: "sk-test",
+ providerId: "openai",
+ fetchFn,
+ });
+
+ expect(result).toBeUndefined();
+ });
+
+ it("truncates a fractional concurrent_sessions to an integer", async () => {
+ const fetchFn = vi.fn(
+ () =>
+ jsonResponse({ usage: { concurrent_sessions: 2.9 } }) as unknown as ReturnType<FetchLike>,
+ );
+
+ const result = await getUsage({
+ baseURL: "https://api.example.com/v1",
+ apiKey: "sk-test",
+ providerId: "openai",
+ fetchFn,
+ });
+
+ expect(result).toEqual<ProviderUsage>({ concurrentSessions: 2 });
+ });
+
+ it("falls back to globalThis.fetch when fetchFn is absent", async () => {
+ const original = globalThis.fetch;
+ const stub = vi.fn(
+ () => jsonResponse({ usage: { concurrent_sessions: 1 } }) as unknown as ReturnType<FetchLike>,
+ );
+ globalThis.fetch = stub as unknown as typeof globalThis.fetch;
+
+ try {
+ const result = await getUsage({
+ baseURL: "https://api.example.com/v1",
+ apiKey: "sk-test",
+ providerId: "openai",
+ });
+ expect(result).toEqual<ProviderUsage>({ concurrentSessions: 1 });
+ expect(stub).toHaveBeenCalledOnce();
+ } finally {
+ globalThis.fetch = original;
+ }
+ });
+});
diff --git a/packages/openai-stream/src/getUsage.ts b/packages/openai-stream/src/getUsage.ts
new file mode 100644
index 0000000..5da7fd7
--- /dev/null
+++ b/packages/openai-stream/src/getUsage.ts
@@ -0,0 +1,73 @@
+import type { ProviderUsage } from "@dispatch/kernel";
+import type { FetchLike } from "@dispatch/trace-replay";
+
+/**
+ * Generic OpenAI-compatible usage fetch. The Umans `/v1/usage` endpoint returns:
+ *
+ * { usage: { concurrent_sessions: number }, limits: { concurrency: { limit, hard_cap } } }
+ *
+ * We extract only `concurrent_sessions` (the count a concurrency limiter gates
+ * on). Lives in this library (`@dispatch/openai-stream`) so any OpenAI-compatible
+ * provider extension reuses it without cross-extension code import
+ * (isolation-over-DRY: coupling is via this typed library surface). A provider
+ * supplies its own `id` (used in error labels) via `createOpenAICompatProvider`.
+ */
+
+/** The raw shape of the `/v1/usage` response (only the fields we read). */
+interface UsageResponse {
+ readonly usage?: {
+ readonly concurrent_sessions?: number;
+ };
+}
+
+export interface GetUsageConfig {
+ readonly baseURL: string;
+ readonly apiKey: string;
+ readonly fetchFn?: FetchLike;
+ readonly providerId: string;
+}
+
+/**
+ * Fetch + map the upstream usage snapshot. Returns `undefined` on any error,
+ * non-200, or unexpected shape so the caller (the concurrency limiter) falls
+ * back to cooldown-only slot recycling (no usage gate) — never throws.
+ *
+ * Pure-ish I/O wrapper: the only effect is the injected fetch. Extracted for
+ * direct unit testing with a fake fetch.
+ */
+export async function getUsage(config: GetUsageConfig): Promise<ProviderUsage | undefined> {
+ const effectiveFetch: FetchLike = config.fetchFn ?? fetch;
+ const url = `${config.baseURL}/usage`;
+
+ let response: Response;
+ try {
+ response = await effectiveFetch(url, {
+ method: "GET",
+ headers: {
+ Authorization: `Bearer ${config.apiKey}`,
+ },
+ });
+ } catch {
+ // Network error — the upstream is unreachable; treat as "no usage info".
+ return undefined;
+ }
+
+ if (!response.ok) {
+ // 404 / 401 / 5xx — the endpoint is unsupported or rejected the request.
+ return undefined;
+ }
+
+ let body: UsageResponse;
+ try {
+ body = (await response.json()) as UsageResponse;
+ } catch {
+ return undefined;
+ }
+
+ const raw = body.usage?.concurrent_sessions;
+ if (typeof raw !== "number" || !Number.isFinite(raw) || raw < 0) {
+ return undefined;
+ }
+
+ return { concurrentSessions: Math.trunc(raw) };
+}
diff --git a/packages/openai-stream/src/index.ts b/packages/openai-stream/src/index.ts
index 3f76b99..ff6b4f5 100644
--- a/packages/openai-stream/src/index.ts
+++ b/packages/openai-stream/src/index.ts
@@ -8,6 +8,7 @@ export type {
export { convertMessages } from "./convert-messages.js";
export type { OpenAITool } from "./convert-tools.js";
export { convertTools } from "./convert-tools.js";
+export { getUsage } from "./getUsage.js";
export { isVisionModelId, parseModelList } from "./listModels.js";
export { parseSSELines } from "./parse-sse.js";
export type { CreateOpenAICompatProviderOpts } from "./provider.js";
diff --git a/packages/openai-stream/src/provider.test.ts b/packages/openai-stream/src/provider.test.ts
index 8bc6e98..13a303e 100644
--- a/packages/openai-stream/src/provider.test.ts
+++ b/packages/openai-stream/src/provider.test.ts
@@ -81,6 +81,27 @@ describe("createOpenAICompatProvider stamps the given id on the ProviderContract
await expect(listModels()).rejects.toThrow("listModels[my-custom-id]: HTTP 401 — Unauthorized");
});
+
+ it("exposes getUsage that returns the upstream concurrent_sessions", async () => {
+ const fetchFn = vi.fn(
+ () =>
+ new Response(JSON.stringify({ usage: { concurrent_sessions: 2 } }), {
+ status: 200,
+ headers: { "Content-Type": "application/json" },
+ }) as unknown as ReturnType<FetchLike>,
+ );
+ const provider = createOpenAICompatProvider({
+ credentials: makeCreds(),
+ model: "test-model",
+ id: "umans",
+ fetchFn,
+ });
+ const getUsage = provider.getUsage;
+ if (!getUsage) throw new Error("getUsage not defined");
+
+ const usage = await getUsage();
+ expect(usage).toEqual({ concurrentSessions: 2 });
+ });
});
describe("transformBody", () => {
diff --git a/packages/openai-stream/src/provider.ts b/packages/openai-stream/src/provider.ts
index df5a4ed..9a9369f 100644
--- a/packages/openai-stream/src/provider.ts
+++ b/packages/openai-stream/src/provider.ts
@@ -4,9 +4,11 @@ import type {
ModelInfo,
ProviderContract,
ProviderStreamOptions,
+ ProviderUsage,
ToolContract,
} from "@dispatch/kernel";
import type { FetchLike } from "@dispatch/trace-replay";
+import { getUsage as fetchUsage } from "./getUsage.js";
import { listModels as fetchModels } from "./listModels.js";
import { streamChat } from "./stream.js";
@@ -69,5 +71,12 @@ export function createOpenAICompatProvider(opts: CreateOpenAICompatProviderOpts)
providerId: opts.id,
...(fetchFn !== undefined ? { fetchFn } : {}),
}),
+ getUsage: (): Promise<ProviderUsage | undefined> =>
+ fetchUsage({
+ baseURL,
+ apiKey,
+ providerId: opts.id,
+ ...(fetchFn !== undefined ? { fetchFn } : {}),
+ }),
};
}
diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts
index 62f202f..6db8964 100644
--- a/packages/provider-concurrency/src/concurrency-manager.test.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.test.ts
@@ -1,3 +1,4 @@
+import type { ProviderUsage } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
import { type ConcurrencyService, createConcurrencyManager } from "./concurrency-manager.js";
@@ -59,7 +60,11 @@ function createFakeTimers() {
};
}
-function createManager(opts?: { releaseCooldownMs?: number }): {
+function createManager(opts?: {
+ releaseCooldownMs?: number;
+ fetchUsage?: (providerId: string) => Promise<ProviderUsage | undefined>;
+ onLimitReduced?: (providerId: string, newLimit: number, oldLimit: number) => void;
+}): {
manager: ConcurrencyService;
timers: ReturnType<typeof createFakeTimers>;
} {
@@ -70,6 +75,8 @@ function createManager(opts?: { releaseCooldownMs?: number }): {
watchdogIntervalMs: 1000,
defaultPauseMs: 30000,
...(opts?.releaseCooldownMs !== undefined ? { releaseCooldownMs: opts.releaseCooldownMs } : {}),
+ ...(opts?.fetchUsage !== undefined ? { fetchUsage: opts.fetchUsage } : {}),
+ ...(opts?.onLimitReduced !== undefined ? { onLimitReduced: opts.onLimitReduced } : {}),
setTimeout: timers.setTimeout,
clearTimeout: timers.clearTimeout,
setInterval: timers.setInterval,
@@ -100,6 +107,8 @@ describe("createConcurrencyManager", () => {
inFlight: 1,
queued: 0,
paused: false,
+ cooldownMs: 0,
+ autoReduced: false,
});
release1();
expect(manager.getStatus("umans")?.inFlight).toBe(0);
@@ -342,6 +351,8 @@ describe("createConcurrencyManager", () => {
inFlight: 0,
queued: 0,
paused: false,
+ cooldownMs: 0,
+ autoReduced: false,
});
});
@@ -484,4 +495,306 @@ describe("createConcurrencyManager", () => {
expect(queuedCalled).toBe(false);
release();
});
+
+ // ─── Configurable cooldown ──────────────────────────────────────────────
+
+ it("setCooldown changes the cooldown applied to subsequently recycled slots", async () => {
+ const { manager, timers } = createManager({ releaseCooldownMs: 200 });
+ manager.setLimit("umans", 1);
+
+ const release1 = await manager.acquire("umans", "conv1", 0);
+ expect(manager.getStatus("umans")?.cooldownMs).toBe(200);
+
+ // Bump the cooldown to 500ms.
+ manager.setCooldown("umans", 500);
+ expect(manager.getStatus("umans")?.cooldownMs).toBe(500);
+
+ // Queue a waiter.
+ let resolved = false;
+ const promise2 = manager.acquire("umans", "conv2", 100).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+
+ // Release — the NEW cooldown (500ms) applies.
+ release1();
+ expect(resolved).toBe(false);
+ timers.advance(200); // old cooldown elapsed — still cooling (500ms now).
+ expect(resolved).toBe(false);
+ timers.advance(300); // 500ms total → slot recycled, waiter granted.
+ const release2 = await promise2;
+ expect(resolved).toBe(true);
+ release2();
+ });
+
+ it("getCooldowns returns all configured cooldowns", () => {
+ const { manager } = createManager({ releaseCooldownMs: 350 });
+ manager.setLimit("umans", 4);
+ manager.setCooldown("openai-compat", 100);
+
+ const cooldowns = manager.getCooldowns();
+ expect(cooldowns).toContainEqual({ providerId: "umans", cooldownMs: 350 });
+ expect(cooldowns).toContainEqual({ providerId: "openai-compat", cooldownMs: 100 });
+ });
+
+ it("setCooldown does NOT impose a limit when none is configured (override seeds on setLimit)", async () => {
+ const { manager } = createManager({ releaseCooldownMs: 350 });
+
+ // Set a cooldown with NO limit configured yet.
+ manager.setCooldown("umans", 500);
+ expect(manager.getCooldown("umans")).toBe(500);
+ // No limit → acquire must be unlimited (no state with a limit imposed).
+ const release = await manager.acquire("umans", "conv1", 0);
+ expect(typeof release).toBe("function");
+ release();
+ expect(manager.getStatus("umans")).toBeUndefined(); // no limit state created
+ expect(manager.getLimit("umans")).toBeUndefined();
+
+ // Now set a limit — the pending cooldown override seeds the new state.
+ manager.setLimit("umans", 4);
+ expect(manager.getStatus("umans")?.cooldownMs).toBe(500);
+ });
+
+ // ─── Adaptive headroom (reduce limit by 1 on 429) ────────────────────────
+
+ it("reportRateLimit reduces the limit by 1 (one-way) and sets autoReduced notice", () => {
+ const reduced: { providerId: string; newLimit: number; oldLimit: number }[] = [];
+ const { manager } = createManager({
+ onLimitReduced: (p, n, o) => reduced.push({ providerId: p, newLimit: n, oldLimit: o }),
+ });
+ manager.setLimit("umans", 4);
+
+ manager.reportRateLimit("umans");
+
+ expect(manager.getLimit("umans")).toBe(3);
+ const status = manager.getStatus("umans");
+ expect(status?.autoReduced).toBe(true);
+ expect(status?.autoReducedFrom).toBe(4);
+ expect(status?.notice).toContain("auto-reduced to 3");
+ expect(reduced).toEqual([{ providerId: "umans", newLimit: 3, oldLimit: 4 }]);
+ });
+
+ it("repeated 429s keep reducing (4 -> 3 -> 2 -> 1) and never go below 1", () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 4);
+
+ manager.reportRateLimit("umans");
+ expect(manager.getLimit("umans")).toBe(3);
+ manager.reportRateLimit("umans");
+ expect(manager.getLimit("umans")).toBe(2);
+ manager.reportRateLimit("umans");
+ expect(manager.getLimit("umans")).toBe(1);
+ // Already at the floor — stays 1.
+ manager.reportRateLimit("umans");
+ expect(manager.getLimit("umans")).toBe(1);
+ const status = manager.getStatus("umans");
+ expect(status?.autoReduced).toBe(true);
+ // autoReducedFrom records the FIRST reduction's original limit (4).
+ expect(status?.autoReducedFrom).toBe(4);
+ });
+
+ it("a MANUAL setLimit clears the auto-reduce notice", () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 4);
+ manager.reportRateLimit("umans"); // 4 -> 3, autoReduced
+ expect(manager.getStatus("umans")?.autoReduced).toBe(true);
+
+ // User restores the limit manually.
+ manager.setLimit("umans", 4);
+ const status = manager.getStatus("umans");
+ expect(status?.autoReduced).toBe(false);
+ expect(status?.autoReducedFrom).toBeUndefined();
+ expect(status?.notice).toBeUndefined();
+ });
+
+ it("removeLimit clears the auto-reduce state", () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 4);
+ manager.reportRateLimit("umans"); // auto-reduced
+ expect(manager.getStatus("umans")?.autoReduced).toBe(true);
+
+ manager.removeLimit("umans");
+ expect(manager.getStatus("umans")).toBeUndefined();
+ });
+
+ // ─── Usage gate (poll concurrent_sessions before granting queued agents) ─
+
+ it("usage gate blocks a queued waiter while upstream concurrent_sessions >= limit", async () => {
+ // Upstream always reports AT the limit (4) → the gate never admits.
+ const { manager, timers } = createManager({
+ fetchUsage: async () => ({ concurrentSessions: 4 }),
+ });
+ manager.setLimit("umans", 4);
+ manager.setCooldown("umans", 0); // no cooldown — isolate the gate
+
+ // Fill all 4 slots (fast-path, no gate).
+ const releases = await Promise.all([
+ manager.acquire("umans", "c1", 0),
+ manager.acquire("umans", "c2", 0),
+ manager.acquire("umans", "c3", 0),
+ manager.acquire("umans", "c4", 0),
+ ]);
+ expect(manager.getStatus("umans")?.inFlight).toBe(4);
+
+ // 5th agent queues.
+ let resolved = false;
+ const promise5 = manager.acquire("umans", "c5", 10).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+ expect(manager.getStatus("umans")?.queued).toBe(1);
+
+ // Release one slot. Cooldown is 0 → recycle polls upstream → 4 >= 4 → NOT granted.
+ releases[0]?.();
+ // Let the async poll settle.
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+ expect(manager.getStatus("umans")?.queued).toBe(1);
+
+ // Advance past the 1s fallback repoll — still 4 → still blocked.
+ timers.advance(1000);
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+
+ for (const r of releases.slice(1)) r?.();
+ void promise5;
+ });
+
+ it("usage gate admits a queued waiter once upstream concurrent_sessions < limit", async () => {
+ // Upstream starts at the limit; drops to 3 after the release.
+ let upstream = 4;
+ const { manager } = createManager({
+ fetchUsage: async () => ({ concurrentSessions: upstream }),
+ });
+ manager.setLimit("umans", 4);
+ manager.setCooldown("umans", 0);
+
+ const releases = await Promise.all([
+ manager.acquire("umans", "c1", 0),
+ manager.acquire("umans", "c2", 0),
+ manager.acquire("umans", "c3", 0),
+ manager.acquire("umans", "c4", 0),
+ ]);
+
+ let resolved = false;
+ const promise5 = manager.acquire("umans", "c5", 10).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+
+ // Upstream now drops to 3 (the released session finally decremented).
+ upstream = 3;
+ // Release a slot → cooldown 0 → poll → 3 < 4 → admit the waiter.
+ releases[0]?.();
+ const release5 = await promise5;
+ expect(resolved).toBe(true);
+ expect(manager.getStatus("umans")?.queued).toBe(0);
+
+ release5();
+ for (const r of releases.slice(1)) r?.();
+ });
+
+ it("usage gate falls back to granting when fetchUsage returns undefined", async () => {
+ const { manager } = createManager({
+ fetchUsage: async () => undefined, // no usage info available
+ });
+ manager.setLimit("umans", 1);
+ manager.setCooldown("umans", 0);
+
+ const release1 = await manager.acquire("umans", "c1", 0);
+ let resolved = false;
+ const promise2 = manager.acquire("umans", "c2", 10).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+
+ // Release → poll returns undefined → fall back to cooldown-only (grant).
+ release1();
+ const release2 = await promise2;
+ expect(resolved).toBe(true);
+ release2();
+ });
+
+ it("usage gate admits at most ONE queued waiter per successful poll", async () => {
+ let upstream = 4;
+ const { manager } = createManager({
+ fetchUsage: async () => ({ concurrentSessions: upstream }),
+ });
+ manager.setLimit("umans", 4);
+ manager.setCooldown("umans", 0);
+
+ const releases = await Promise.all([
+ manager.acquire("umans", "c1", 0),
+ manager.acquire("umans", "c2", 0),
+ manager.acquire("umans", "c3", 0),
+ manager.acquire("umans", "c4", 0),
+ ]);
+
+ // Queue two waiters.
+ let r5 = false;
+ let r6 = false;
+ const p5 = manager.acquire("umans", "c5", 10).then((r) => {
+ r5 = true;
+ return r;
+ });
+ const p6 = manager.acquire("umans", "c6", 20).then((r) => {
+ r6 = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(manager.getStatus("umans")?.queued).toBe(2);
+
+ // Upstream drops to 3. Release one slot → poll 3 < 4 → admit ONE (c5).
+ upstream = 3;
+ releases[0]?.();
+ await Promise.resolve();
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(r5).toBe(true);
+ expect(r6).toBe(false); // c6 still queued — needs another poll.
+ expect(manager.getStatus("umans")?.queued).toBe(1);
+
+ const release5 = await p5;
+ release5();
+ void p6;
+ for (const r of releases.slice(1)) r?.();
+ });
+
+ it("usage gate clears the fallback repoll timer when the queue drains", () => {
+ const { manager, timers } = createManager({
+ fetchUsage: async () => ({ concurrentSessions: 0 }),
+ });
+ manager.setLimit("umans", 1);
+ manager.setCooldown("umans", 0);
+
+ return manager.acquire("umans", "c1", 0).then(async (release1) => {
+ // Queue a waiter (arms the 1s fallback timer).
+ const p2 = manager.acquire("umans", "c2", 10);
+ await Promise.resolve();
+ await Promise.resolve();
+
+ // Release → poll 0 < 1 → grant → queue drains → fallback timer cleared.
+ release1();
+ const release2 = await p2;
+ release2();
+
+ // Advancing past 1s must NOT throw or fire at a drained state.
+ expect(() => timers.advance(1000)).not.toThrow();
+ });
+ });
});
diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts
index d4516b2..2d3d857 100644
--- a/packages/provider-concurrency/src/concurrency-manager.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.ts
@@ -7,14 +7,32 @@
* (the agent whose current prompt started the longest ago wins the next slot).
*
* A watchdog reclaims slots held beyond a timeout (deadlock / stuck-agent
- * recovery). 429 backoff pauses a provider's queue for a configurable duration.
+ * recovery). 429 backoff pauses a provider's queue for a configurable duration
+ * AND adaptively reduces the effective limit by 1 (one-way, persisted) so the
+ * resumed queue runs with headroom instead of re-overshooting.
*
- * This module is the PURE decision logic. It takes an injected clock (`now`)
- * and injected timers (`setTimeout`/`clearTimeout`/`setInterval`/`clearInterval`)
- * so it is fully testable with deterministic fake time. The extension layer
- * wires real timers.
+ * ── Usage gate (anti-overshoot) ──
+ * When a `fetchUsage` callback is injected, before admitting a QUEUED agent the
+ * manager polls the provider's upstream `concurrent_sessions` count and grants
+ * only when it is below the configured limit. This composes with the release
+ * cooldown: release → cooldown delay → usage-gate poll → grant (only if upstream
+ * has room). A waiter is re-checked on two triggers (either one): another agent
+ * releases a slot (immediate re-poll, restarting the 1s countdown) or a 1s
+ * fallback timer elapses (in case the upstream count drops on its own). Each
+ * successful poll admits at most ONE queued waiter (each admission pushes the
+ * upstream count back toward the limit), so additional waiters are admitted on
+ * subsequent repolls. When `fetchUsage` is absent or returns `undefined`, the
+ * gate is skipped and the manager falls back to cooldown-only recycling.
+ *
+ * This module is the PURE decision logic. It takes an injected clock (`now`),
+ * injected timers (`setTimeout`/`clearTimeout`/`setInterval`/`clearInterval`),
+ * and an injected usage-poll effect (`fetchUsage`) so it is fully testable with
+ * deterministic fake time + a fake fetcher. The extension layer wires real
+ * timers + the host's provider registry.
*/
+import type { ProviderUsage } from "@dispatch/kernel";
+
// ─── Types ───────────────────────────────────────────────────────────────────
/** Status snapshot for a single provider's concurrency state. */
@@ -30,6 +48,25 @@ export interface ProviderConcurrencyStatus {
readonly paused: boolean;
/** When the pause expires (epoch-ms). Present only when paused. */
readonly pausedUntil?: number;
+ /**
+ * Per-slot release cooldown (ms) — how long a recycled slot is held before the
+ * next waiter is admitted. Covers the upstream provider's accounting lag.
+ * Configurable + persisted per provider.
+ */
+ readonly cooldownMs: number;
+ /**
+ * Whether the limit was auto-reduced by a 429 (adaptive headroom). The user
+ * restores the limit manually (PUT /concurrency/limits/:providerId) which
+ * clears this flag. The frontend renders a visible notice when `true`.
+ */
+ readonly autoReduced: boolean;
+ /** The original limit before auto-reduction (present only when autoReduced). */
+ readonly autoReducedFrom?: number;
+ /**
+ * A human-readable notice string for the frontend to render as a banner when
+ * the limit was auto-reduced. Present only when `autoReduced` is true.
+ */
+ readonly notice?: string;
}
/**
@@ -66,8 +103,10 @@ export interface ConcurrencyLimiter {
/**
* Report a 429 from a provider. Pauses the queue for that provider for
- * `retryAfterMs` (or a default duration when omitted). Queued and in-flight
- * requests are unaffected; new `acquire` calls block until the pause expires.
+ * `retryAfterMs` (or a default duration when omitted), AND reduces the
+ * provider's effective limit by 1 (one-way, down to a minimum of 1) so the
+ * resumed queue runs with headroom. Queued and in-flight requests are
+ * otherwise unaffected; new `acquire` calls block until the pause expires.
*/
reportRateLimit(providerId: string, retryAfterMs?: number): void;
}
@@ -76,7 +115,7 @@ export interface ConcurrencyLimiter {
* The full service surface (limiter + config + status) for HTTP routes.
*/
export interface ConcurrencyService extends ConcurrencyLimiter {
- /** Set the concurrency limit for a provider. Creates the state if new. */
+ /** Set the concurrency limit for a provider (MANUAL — clears the auto-reduce notice). Creates the state if new. */
setLimit(providerId: string, limit: number): void;
/** Get the configured limit, or `undefined` when none. */
getLimit(providerId: string): number | undefined;
@@ -84,6 +123,16 @@ export interface ConcurrencyService extends ConcurrencyLimiter {
removeLimit(providerId: string): void;
/** All configured limits as `{ providerId, limit }` entries. */
getLimits(): readonly { providerId: string; limit: number }[];
+ /**
+ * Set the release cooldown (ms) for a provider. Applied to subsequently
+ * recycled slots; in-flight cooldown timers keep their original duration.
+ * Creates the state if new (with no limit — unlimited but cooldown-gated).
+ */
+ setCooldown(providerId: string, cooldownMs: number): void;
+ /** Get the configured cooldown (ms), or `undefined` when none was set. */
+ getCooldown(providerId: string): number | undefined;
+ /** All configured cooldowns as `{ providerId, cooldownMs }` entries. */
+ getCooldowns(): readonly { providerId: string; cooldownMs: number }[];
/** Status for one provider, or `undefined` when no limit is configured. */
getStatus(providerId: string): ProviderConcurrencyStatus | undefined;
/** Status for every provider with a configured limit. */
@@ -115,6 +164,19 @@ interface ProviderState {
paused: boolean;
pausedUntil: number | undefined;
pauseTimer: ReturnType<typeof setTimeout> | undefined;
+ /** Per-provider release cooldown (ms). Defaults to the manager opt; settable at runtime. */
+ cooldownMs: number;
+ // ── Adaptive headroom ──
+ autoReduced: boolean;
+ autoReducedFrom: number | undefined;
+ notice: string | undefined;
+ // ── Usage-gate state ──
+ /** A usage poll is in flight for this provider (prevents overlapping polls). */
+ gatePolling: boolean;
+ /** Another repoll trigger fired while a poll was in flight → re-poll on completion. */
+ gateRepollRequested: boolean;
+ /** The 1s fallback repoll timer (re-checked periodically even without releases). */
+ gateRepollTimer: ReturnType<typeof setTimeout> | undefined;
}
export interface ConcurrencyManagerOpts {
@@ -127,24 +189,40 @@ export interface ConcurrencyManagerOpts {
/** Default pause duration when a 429 arrives without Retry-After (ms). */
readonly defaultPauseMs: number;
/**
- * Delay after a slot is released before the slot is recycled (ms). During
- * this window `inFlight` stays incremented — a new `acquire` sees the slot
- * as still held and queues. This covers the upstream provider's accounting
- * lag: the provider's `concurrent_sessions` counter may not decrement the
- * instant our stream completes, so re-admitting immediately risks an N+1
- * overshoot. 0 = instant re-admission (no cooldown). Default: 0.
+ * Default delay after a slot is released before the slot is recycled (ms).
+ * During this window `inFlight` stays incremented — a new `acquire` sees the
+ * slot as still held and queues. This covers the upstream provider's
+ * accounting lag: the provider's `concurrent_sessions` counter may not
+ * decrement the instant our stream completes, so re-admitting immediately
+ * risks an N+1 overshoot. 0 = instant re-admission (no cooldown). Default: 0.
+ * Per-provider overrides via `setCooldown`.
*/
readonly releaseCooldownMs?: number;
+ /**
+ * Injected usage-poll effect. When present, before admitting a QUEUED agent
+ * the manager calls this and grants only when `concurrentSessions` is below
+ * the configured limit (usage gate). When absent, the manager falls back to
+ * cooldown-only slot recycling. Injected (like `now`/`setTimeout`) so the
+ * manager stays unit-testable with a fake fetcher; never hardcodes `fetch`.
+ */
+ readonly fetchUsage?: (providerId: string) => Promise<ProviderUsage | undefined>;
/** Injected timers (default: global). Override in tests for deterministic time. */
readonly setTimeout?: typeof setTimeout;
readonly clearTimeout?: typeof clearTimeout;
readonly setInterval?: typeof setInterval;
readonly clearInterval?: typeof clearInterval;
- /** Optional logger for watchdog + pause events. */
+ /** Optional logger for watchdog + pause + auto-reduce events. */
readonly onWatchdogReclaim?: (providerId: string, conversationId: string, heldMs: number) => void;
readonly onPause?: (providerId: string, durationMs: number) => void;
+ /** Fired when a 429 adaptively reduces a provider's limit (for persistence + logging). */
+ readonly onLimitReduced?: (providerId: string, newLimit: number, oldLimit: number) => void;
}
+/** Min interval between usage-gate fallback repolls (ms). The release trigger is immediate. */
+const USAGE_REPOLL_INTERVAL_MS = 1000;
+/** Minimum the limit may be auto-reduced to (never 0). */
+const MIN_LIMIT = 1;
+
function noopRelease(): void {
// No limit configured → nothing to release.
}
@@ -153,16 +231,42 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
const now = opts.now;
const slotTimeoutMs = opts.slotTimeoutMs;
const defaultPauseMs = opts.defaultPauseMs;
- const releaseCooldownMs = opts.releaseCooldownMs ?? 0;
+ const defaultCooldownMs = opts.releaseCooldownMs ?? 0;
+ const fetchUsage = opts.fetchUsage;
const setTimeout = opts.setTimeout ?? globalThis.setTimeout.bind(globalThis);
const clearTimeout = opts.clearTimeout ?? globalThis.clearTimeout.bind(globalThis);
const setInterval = opts.setInterval ?? globalThis.setInterval.bind(globalThis);
const clearInterval = opts.clearInterval ?? globalThis.clearInterval.bind(globalThis);
const states = new Map<string, ProviderState>();
+ const cooldownOverrides = new Map<string, number>();
const cooldownTimers = new Set<ReturnType<typeof setTimeout>>();
let slotIdCounter = 0;
+ function makeState(limit: number, cooldownMs: number): ProviderState {
+ return {
+ limit,
+ inFlight: 0,
+ slots: new Map(),
+ queue: [],
+ paused: false,
+ pausedUntil: undefined,
+ pauseTimer: undefined,
+ cooldownMs,
+ autoReduced: false,
+ autoReducedFrom: undefined,
+ notice: undefined,
+ gatePolling: false,
+ gateRepollRequested: false,
+ gateRepollTimer: undefined,
+ };
+ }
+
+ /** Seed the cooldown for new state from any pending override (else the default). */
+ function seedCooldown(providerId: string): number {
+ return cooldownOverrides.get(providerId) ?? defaultCooldownMs;
+ }
+
// ── Slot granting ──────────────────────────────────────────────────────────
function grantSlot(state: ProviderState, providerId: string, conversationId: string): () => void {
@@ -173,20 +277,20 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
released = true;
state.slots.delete(id);
- // Recycle the slot: decrement inFlight + grant the next waiter.
+ // Recycle the slot: decrement inFlight + attempt to grant the next waiter.
// With a release cooldown > 0, defer this by the cooldown duration so
// the upstream provider has time to decrement its concurrent_sessions
// counter — preventing an N+1 overshoot from accounting lag. During the
// cooldown, inFlight stays incremented, so new acquires queue.
const recycle = () => {
state.inFlight--;
- tryGrantNext(providerId);
+ void tryGrantNext(providerId);
};
- if (releaseCooldownMs > 0) {
+ if (state.cooldownMs > 0) {
const timer = setTimeout(() => {
cooldownTimers.delete(timer);
recycle();
- }, releaseCooldownMs);
+ }, state.cooldownMs);
cooldownTimers.add(timer);
} else {
recycle();
@@ -201,10 +305,12 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
return releaseFn;
}
- function tryGrantNext(providerId: string): void {
- const state = states.get(providerId);
- if (state === undefined) return;
- if (state.paused) return;
+ /**
+ * Grant queued waiters WITHOUT the usage gate (the fast path used when no
+ * `fetchUsage` is configured, or after a poll confirmed upstream has room).
+ * Grants while there is internal room (`inFlight < limit`). Synchronous.
+ */
+ function grantLoop(state: ProviderState, providerId: string): void {
while (state.queue.length > 0 && state.inFlight < state.limit) {
const waiter = state.queue[0];
if (waiter === undefined) break;
@@ -212,6 +318,105 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
const releaseFn = grantSlot(state, providerId, waiter.conversationId);
waiter.resolve(releaseFn);
}
+ // If the queue drained, no need to keep the usage-gate fallback timer armed.
+ if (state.queue.length === 0 && state.gateRepollTimer !== undefined) {
+ clearTimeout(state.gateRepollTimer);
+ state.gateRepollTimer = undefined;
+ }
+ }
+
+ /**
+ * Drain the queue, gated on the upstream usage poll when `fetchUsage` is
+ * configured. Called from release (post-cooldown), setLimit, pause-expiry,
+ * and the repoll timer. Async because the usage poll is an injected I/O
+ * effect; callers fire-and-forget the returned promise.
+ *
+ * Only QUEUED agents are gated. The fast-path immediate grant in `acquire`
+ * (when `inFlight < limit`) is NOT gated — it fires only when there is
+ * clearly internal room, and the cooldown keeps `inFlight` inflated during
+ * the accounting-lag window so the fast-path does not fire then.
+ *
+ * Each successful poll admits at most ONE queued waiter (each admission pushes
+ * the upstream count back toward the limit); additional waiters are admitted
+ * on subsequent repolls (release triggers an immediate re-poll; the 1s
+ * fallback timer covers an upstream count that drops on its own).
+ */
+ async function tryGrantNext(providerId: string): Promise<void> {
+ const state = states.get(providerId);
+ if (state === undefined) return;
+ if (state.paused) return;
+ if (state.queue.length === 0) return;
+ if (state.inFlight >= state.limit) return; // no internal room
+
+ // No usage gate → immediate grant loop (original behavior).
+ if (fetchUsage === undefined) {
+ grantLoop(state, providerId);
+ return;
+ }
+
+ // Avoid overlapping polls for this provider. A poll is already in flight;
+ // mark that another trigger fired so it re-polls on completion.
+ if (state.gatePolling) {
+ state.gateRepollRequested = true;
+ return;
+ }
+
+ await pollAndGrant(providerId, state);
+ }
+
+ /** Poll upstream usage, then grant if there is headroom. */
+ async function pollAndGrant(providerId: string, state: ProviderState): Promise<void> {
+ state.gatePolling = true;
+ try {
+ const snapshot = await fetchUsage?.(providerId);
+
+ // Conditions may have changed during the async poll — re-check.
+ if (state.paused) return;
+ if (state.queue.length === 0) return;
+ if (state.inFlight >= state.limit) return;
+
+ if (snapshot === undefined) {
+ // No usage info available → fall back to cooldown-only (grant).
+ grantLoop(state, providerId);
+ return;
+ }
+
+ if (snapshot.concurrentSessions < state.limit) {
+ // Upstream has room — admit exactly ONE queued waiter (grantLoop will
+ // stop after one because granting increments inFlight toward limit, and
+ // each admission pushes upstream back toward the limit).
+ grantLoop(state, providerId);
+ }
+ // else: upstream at/over limit → keep queued; repoll timer handles retry.
+ } finally {
+ state.gatePolling = false;
+ // (Re)arm the 1s fallback timer while waiters remain queued, so an
+ // upstream count that drops on its own is still detected.
+ armGateRepoll(providerId, state);
+ if (state.gateRepollRequested) {
+ state.gateRepollRequested = false;
+ // A release (or other trigger) fired during the poll → re-poll now.
+ void tryGrantNext(providerId);
+ }
+ }
+ }
+
+ function armGateRepoll(providerId: string, state: ProviderState): void {
+ // Only arm while there are queued waiters (otherwise no work to re-check).
+ if (state.queue.length === 0) {
+ if (state.gateRepollTimer !== undefined) {
+ clearTimeout(state.gateRepollTimer);
+ state.gateRepollTimer = undefined;
+ }
+ return;
+ }
+ if (state.gateRepollTimer !== undefined) {
+ clearTimeout(state.gateRepollTimer);
+ }
+ state.gateRepollTimer = setTimeout(() => {
+ state.gateRepollTimer = undefined;
+ void tryGrantNext(providerId);
+ }, USAGE_REPOLL_INTERVAL_MS);
}
// ── Watchdog ──────────────────────────────────────────────────────────────────
@@ -231,6 +436,14 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
const watchdogTimer = setInterval(sweep, opts.watchdogIntervalMs);
+ // ── Adaptive headroom ──────────────────────────────────────────────────────
+
+ function clearAutoReduce(state: ProviderState): void {
+ state.autoReduced = false;
+ state.autoReducedFrom = undefined;
+ state.notice = undefined;
+ }
+
// ── Public API ─────────────────────────────────────────────────────────────
const manager: ConcurrencyService = {
@@ -254,9 +467,13 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
return new Promise<() => void>((resolve) => {
state.queue.push({ conversationId, promptStartedAt, resolve });
// Keep sorted ascending by promptStartedAt (oldest first).
- // Insertion sort would be O(n), but the queue is typically tiny (<20),
- // so a simple sort is fine and keeps the code simple.
state.queue.sort((a, b) => a.promptStartedAt - b.promptStartedAt);
+ // If the usage gate is active, ensure the fallback repoll timer is
+ // armed (a release may not come for a while; the 1s timer covers an
+ // upstream count that drops on its own).
+ if (fetchUsage !== undefined) {
+ armGateRepoll(providerId, state);
+ }
});
},
@@ -272,32 +489,43 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
clearTimeout(state.pauseTimer);
}
opts.onPause?.(providerId, pauseDuration);
+
+ // Adaptive headroom: reduce the effective limit by 1 (one-way, min 1) so
+ // the resumed queue runs with headroom instead of re-overshooting. The
+ // reduction is persisted + surfaced (via onLimitReduced + status).
+ if (state.limit > MIN_LIMIT) {
+ const oldLimit = state.limit;
+ state.limit = Math.max(MIN_LIMIT, state.limit - 1);
+ if (!state.autoReduced) {
+ state.autoReduced = true;
+ state.autoReducedFrom = oldLimit;
+ }
+ state.notice =
+ `Concurrency limit auto-reduced to ${state.limit} after a 429 — ` +
+ "restore manually when ready.";
+ opts.onLimitReduced?.(providerId, state.limit, oldLimit);
+ }
+
state.pauseTimer = setTimeout(() => {
state.paused = false;
state.pausedUntil = undefined;
state.pauseTimer = undefined;
- tryGrantNext(providerId);
+ void tryGrantNext(providerId);
}, pauseDuration);
},
setLimit(providerId, limit) {
let state = states.get(providerId);
if (state === undefined) {
- state = {
- limit,
- inFlight: 0,
- slots: new Map(),
- queue: [],
- paused: false,
- pausedUntil: undefined,
- pauseTimer: undefined,
- };
+ state = makeState(limit, seedCooldown(providerId));
states.set(providerId, state);
} else {
state.limit = limit;
+ // A MANUAL limit set clears the auto-reduce notice (the user took control).
+ clearAutoReduce(state);
}
// A higher limit may let queued requests through.
- tryGrantNext(providerId);
+ void tryGrantNext(providerId);
},
getLimit(providerId) {
@@ -315,6 +543,12 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
clearTimeout(state.pauseTimer);
state.pauseTimer = undefined;
}
+ // Clear usage-gate fallback timer.
+ if (state.gateRepollTimer !== undefined) {
+ clearTimeout(state.gateRepollTimer);
+ state.gateRepollTimer = undefined;
+ }
+ clearAutoReduce(state);
// Grant all queued requests (they become unlimited now).
while (state.queue.length > 0) {
@@ -338,6 +572,41 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
}));
},
+ setCooldown(providerId, cooldownMs) {
+ // A cooldown is only meaningful WITH a limit (it gates slot recycling,
+ // which only happens under a limit). But we store the override regardless
+ // so it applies the moment a limit IS set — and so a persisted cooldown
+ // restored before a limit does NOT impose a limit (setCooldown never
+ // creates a state). If a state already exists, update it live.
+ cooldownOverrides.set(providerId, cooldownMs);
+ const state = states.get(providerId);
+ if (state !== undefined) {
+ state.cooldownMs = cooldownMs;
+ }
+ },
+
+ getCooldown(providerId) {
+ const state = states.get(providerId);
+ if (state !== undefined) return state.cooldownMs;
+ return cooldownOverrides.get(providerId);
+ },
+
+ getCooldowns() {
+ // Merge: states (cooldown from state.cooldownMs) + pending overrides with no state.
+ const seen = new Set<string>();
+ const out: { providerId: string; cooldownMs: number }[] = [];
+ for (const [providerId, s] of states) {
+ seen.add(providerId);
+ out.push({ providerId, cooldownMs: s.cooldownMs });
+ }
+ for (const [providerId, cooldownMs] of cooldownOverrides) {
+ if (!seen.has(providerId)) {
+ out.push({ providerId, cooldownMs });
+ }
+ }
+ return out;
+ },
+
getStatus(providerId) {
const state = states.get(providerId);
if (state === undefined) return undefined;
@@ -347,7 +616,11 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
inFlight: state.inFlight,
queued: state.queue.length,
paused: state.paused,
+ cooldownMs: state.cooldownMs,
+ autoReduced: state.autoReduced,
...(state.pausedUntil !== undefined ? { pausedUntil: state.pausedUntil } : {}),
+ ...(state.autoReducedFrom !== undefined ? { autoReducedFrom: state.autoReducedFrom } : {}),
+ ...(state.notice !== undefined ? { notice: state.notice } : {}),
};
},
@@ -367,6 +640,9 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
if (state.pauseTimer !== undefined) {
clearTimeout(state.pauseTimer);
}
+ if (state.gateRepollTimer !== undefined) {
+ clearTimeout(state.gateRepollTimer);
+ }
}
states.clear();
},
diff --git a/packages/provider-concurrency/src/extension.ts b/packages/provider-concurrency/src/extension.ts
index 4af4f9a..261d531 100644
--- a/packages/provider-concurrency/src/extension.ts
+++ b/packages/provider-concurrency/src/extension.ts
@@ -24,24 +24,31 @@ export const manifest: Manifest = {
* - `DEFAULT_PAUSE_MS` (30s): default 429 backoff when no Retry-After is given.
* Umans docs note each concurrency 429 deprioritizes the account for ~30 min,
* but a 30s queue pause prevents immediate re-overshoot while still allowing
- * recovery.
- * - `RELEASE_COOLDOWN_MS` (200ms): after a slot is released, hold it for this
+ * recovery. Combined with adaptive headroom (limit reduced by 1) + the usage
+ * gate, the resumed queue no longer re-overshoots — so the pause is kept
+ * (gives upstream a breather) rather than dropped.
+ * - `RELEASE_COOLDOWN_MS` (350ms): after a slot is released, hold it for this
* duration before recycling it to the next waiter. Covers the upstream
- * provider's accounting lag — the provider's concurrent_sessions counter
- * may not decrement the instant our stream completes, so re-admitting
- * immediately risks an N+1 overshoot that triggers a 429. 200ms is the
- * default most concurrency proxies use for AI/LLM APIs.
+ * provider's accounting lag — the provider's concurrent_sessions counter may
+ * not decrement the instant our stream completes, so re-admitting immediately
+ * risks an N+1 overshoot that triggers a 429. Raised from 200ms to 350ms
+ * (Umans's accounting lag exceeded the 200ms cooldown, causing overshoot at 4
+ * connections). Configurable + persisted per provider (PUT
+ * /concurrency/cooldown/:providerId).
*/
const SLOT_TIMEOUT_MS = 5 * 60 * 1000;
const WATCHDOG_INTERVAL_MS = 30 * 1000;
const DEFAULT_PAUSE_MS = 30 * 1000;
-const RELEASE_COOLDOWN_MS = 200;
+const RELEASE_COOLDOWN_MS = 350;
+
+/** Storage key prefix for persisted cooldowns (limits are stored under the bare providerId). */
+const COOLDOWN_KEY_PREFIX = "cooldown:";
/**
- * Wrap a `ConcurrencyService` so `setLimit`/`removeLimit` persist to the
- * given `StorageNamespace`. All other methods delegate directly to the inner
- * service. Persistence is fire-and-forget — a storage write failure logs a
- * warning but does NOT fail the API call (the in-memory limit is already set).
+ * Wrap a `ConcurrencyService` so `setLimit`/`removeLimit`/`setCooldown` persist
+ * to the given `StorageNamespace`. All other methods delegate directly to the
+ * inner service. Persistence is fire-and-forget — a storage write failure logs
+ * a warning but does NOT fail the API call (the in-memory value is already set).
*/
function createPersistedService(
inner: ConcurrencyService,
@@ -69,8 +76,19 @@ function createPersistedService(
}),
);
},
+ setCooldown(providerId, cooldownMs) {
+ inner.setCooldown(providerId, cooldownMs);
+ storage.set(`${COOLDOWN_KEY_PREFIX}${providerId}`, String(cooldownMs)).catch((err) =>
+ logger.warn("provider-concurrency: failed to persist cooldown", {
+ providerId,
+ err: err instanceof Error ? err.message : String(err),
+ }),
+ );
+ },
getLimit: inner.getLimit.bind(inner),
getLimits: inner.getLimits.bind(inner),
+ getCooldown: inner.getCooldown.bind(inner),
+ getCooldowns: inner.getCooldowns.bind(inner),
getStatus: inner.getStatus.bind(inner),
getStatusAll: inner.getStatusAll.bind(inner),
destroy: inner.destroy.bind(inner),
@@ -79,7 +97,8 @@ function createPersistedService(
/**
* Load saved limits from storage and apply them to the manager.
- * Called during activate, before the service is registered.
+ * Called during activate, before the service is registered. Skips cooldown
+ * keys (prefixed `cooldown:`) — those are loaded by {@link loadCooldowns}.
*/
async function loadLimits(
storage: StorageNamespace,
@@ -87,7 +106,9 @@ async function loadLimits(
logger: Logger,
): Promise<void> {
const keys = await storage.keys();
- for (const providerId of keys) {
+ for (const key of keys) {
+ if (key.startsWith(COOLDOWN_KEY_PREFIX)) continue; // cooldown settings
+ const providerId = key;
const raw = await storage.get(providerId);
if (raw === null) continue;
const limit = Number.parseInt(raw, 10);
@@ -98,16 +119,53 @@ async function loadLimits(
}
}
+/**
+ * Load saved cooldowns from storage and apply them to the manager.
+ * Cooldowns are stored under `cooldown:<providerId>` keys (distinct from the
+ * bare-`<providerId>` limit keys) so the two settings persist independently.
+ */
+async function loadCooldowns(
+ storage: StorageNamespace,
+ manager: ConcurrencyService,
+ logger: Logger,
+): Promise<void> {
+ const keys = await storage.keys(COOLDOWN_KEY_PREFIX);
+ for (const key of keys) {
+ const providerId = key.slice(COOLDOWN_KEY_PREFIX.length);
+ if (providerId.length === 0) continue;
+ const raw = await storage.get(key);
+ if (raw === null) continue;
+ const cooldownMs = Number.parseInt(raw, 10);
+ if (!Number.isNaN(cooldownMs) && cooldownMs >= 0) {
+ manager.setCooldown(providerId, cooldownMs);
+ logger.info(`provider-concurrency: restored cooldown ${cooldownMs}ms for "${providerId}"`);
+ }
+ }
+}
+
export async function activate(host: HostAPI): Promise<void> {
const logger = host.logger;
const storage = host.storage("provider-concurrency");
+ // Build the injected usage-poll effect from the host's provider registry.
+ // Lazy (called at poll time, not activate time) so activation order with the
+ // provider extensions doesn't matter. A provider that doesn't expose
+ // `getUsage` (or isn't registered) → returns undefined → the manager's usage
+ // gate falls back to cooldown-only recycling for that provider. This keeps
+ // the manager pure (the HTTP poll is an injected effect, not hardcoded fetch).
+ const fetchUsage = async (providerId: string) => {
+ const provider = host.getProviders().get(providerId);
+ if (provider === undefined || provider.getUsage === undefined) return undefined;
+ return provider.getUsage();
+ };
+
const managerOpts: ConcurrencyManagerOpts = {
now: () => Date.now(),
slotTimeoutMs: SLOT_TIMEOUT_MS,
watchdogIntervalMs: WATCHDOG_INTERVAL_MS,
defaultPauseMs: DEFAULT_PAUSE_MS,
releaseCooldownMs: RELEASE_COOLDOWN_MS,
+ fetchUsage,
onWatchdogReclaim: (providerId, conversationId, heldMs) => {
logger.warn("provider-concurrency: watchdog reclaimed stale slot", {
providerId,
@@ -121,13 +179,28 @@ export async function activate(host: HostAPI): Promise<void> {
durationMs,
});
},
+ onLimitReduced: (providerId, newLimit, oldLimit) => {
+ logger.warn("provider-concurrency: 429 adaptive headroom — limit reduced", {
+ providerId,
+ oldLimit,
+ newLimit,
+ });
+ // Persist the reduced (one-way) limit so it survives a restart.
+ storage.set(providerId, String(newLimit)).catch((err) =>
+ logger.warn("provider-concurrency: failed to persist auto-reduced limit", {
+ providerId,
+ err: err instanceof Error ? err.message : String(err),
+ }),
+ );
+ },
};
const inner = createConcurrencyManager(managerOpts);
- // Restore persisted limits before registering the service so the first
- // request sees the correct configuration.
+ // Restore persisted limits + cooldowns before registering the service so the
+ // first request sees the correct configuration.
await loadLimits(storage, inner, logger);
+ await loadCooldowns(storage, inner, logger);
const service = createPersistedService(inner, storage, logger);
host.provideService(concurrencyServiceHandle, service);
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index d5f3000..797ad22 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -1062,6 +1062,17 @@ export interface ConcurrencyLimitResponse {
* - `queued`: how many agents are waiting for a slot.
* - `paused`: whether the queue is paused due to a 429 backoff.
* - `pausedUntil`: when the pause expires (epoch-ms), present only when paused.
+ * - `cooldownMs`: the per-slot release cooldown (ms). A recycled slot is held
+ * this long before the next waiter is admitted — covers the upstream
+ * provider's accounting lag. Configurable + persisted per provider.
+ * - `autoReduced`: whether the limit was auto-reduced by 1 after a 429
+ * (adaptive headroom, one-way, persisted). The user restores the limit
+ * manually via `PUT /concurrency/limits/:providerId`, which clears the flag.
+ * When `true`, the frontend renders a visible notice/banner.
+ * - `autoReducedFrom`: the original limit before auto-reduction (present only
+ * when `autoReduced` is true).
+ * - `notice`: a human-readable notice string for the frontend to render as a
+ * banner when the limit was auto-reduced (present only when `autoReduced`).
*/
export interface ConcurrencyStatusEntry {
readonly providerId: string;
@@ -1070,6 +1081,10 @@ export interface ConcurrencyStatusEntry {
readonly queued: number;
readonly paused: boolean;
readonly pausedUntil?: number;
+ readonly cooldownMs: number;
+ readonly autoReduced: boolean;
+ readonly autoReducedFrom?: number;
+ readonly notice?: string;
}
/**
@@ -1079,3 +1094,26 @@ export interface ConcurrencyStatusEntry {
export interface ConcurrencyStatusResponse {
readonly providers: readonly ConcurrencyStatusEntry[];
}
+
+// ─── Provider concurrency cooldown ────────────────────────────────────────────
+
+/**
+ * Response of `GET /concurrency/cooldown/:providerId` — the per-slot release
+ * cooldown (ms) for a provider. A recycled slot is held this long before the
+ * next waiter is admitted, covering the upstream provider's accounting lag.
+ * When no cooldown was explicitly set, the server default (350ms) is returned.
+ */
+export interface ConcurrencyCooldownResponse {
+ readonly providerId: string;
+ readonly cooldownMs: number;
+}
+
+/**
+ * Body of `PUT /concurrency/cooldown/:providerId` — set the release cooldown
+ * (ms) for a provider. `cooldownMs` must be a non-negative integer (0 = no
+ * cooldown, instant re-admission). The value is persisted and applied to
+ * subsequently recycled slots.
+ */
+export interface SetConcurrencyCooldownRequest {
+ readonly cooldownMs: number;
+}
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index 0fcc8f0..ee7b2de 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -8,6 +8,7 @@ import type {
ComputerListResponse,
ComputerResponse,
ComputerStatusResponse,
+ ConcurrencyCooldownResponse,
ConcurrencyLimitResponse,
ConcurrencyLimitsResponse,
ConcurrencyStatusResponse,
@@ -31,6 +32,7 @@ import type {
QueueResponse,
ReasoningEffortResponse,
SetCompactPercentRequest,
+ SetConcurrencyCooldownRequest,
SetConcurrencyLimitRequest,
SetConversationComputerRequest,
SetSystemPromptTemplateRequest,
@@ -687,6 +689,53 @@ export function createApp(opts: CreateServerOptions): Hono {
return c.json({ ok: true, providerId }, 200);
});
+ app.get("/concurrency/cooldown/:providerId", (c) => {
+ const providerId = c.req.param("providerId");
+ if (opts.concurrencyService === undefined) {
+ return c.json({ error: "Concurrency service not available" }, 503);
+ }
+ // A cooldown may be the default (when a limit is configured but no explicit
+ // cooldown was set) or explicitly set. getCooldown returns undefined only
+ // when the provider has NO state at all (no limit, no cooldown) — treat that
+ // as "not configured".
+ const cooldownMs = opts.concurrencyService.getCooldown(providerId);
+ if (cooldownMs === undefined) {
+ return c.json({ error: "No concurrency configuration for this provider" }, 404);
+ }
+ const body: ConcurrencyCooldownResponse = { providerId, cooldownMs };
+ return c.json(body, 200);
+ });
+
+ app.put("/concurrency/cooldown/:providerId", async (c) => {
+ const providerId = c.req.param("providerId");
+ if (opts.concurrencyService === undefined) {
+ return c.json({ error: "Concurrency service not available" }, 503);
+ }
+
+ let body: unknown;
+ try {
+ body = await c.req.json();
+ } catch {
+ log.warn("concurrency: invalid JSON body");
+ return c.json({ error: "Invalid JSON body" }, 400);
+ }
+
+ const parsed = body as SetConcurrencyCooldownRequest;
+ if (
+ parsed === null ||
+ typeof parsed !== "object" ||
+ typeof parsed.cooldownMs !== "number" ||
+ !Number.isInteger(parsed.cooldownMs) ||
+ parsed.cooldownMs < 0
+ ) {
+ return c.json({ error: "Body must be { cooldownMs: <non-negative integer> }" }, 400);
+ }
+
+ opts.concurrencyService.setCooldown(providerId, parsed.cooldownMs);
+ const responseBody: ConcurrencyCooldownResponse = { providerId, cooldownMs: parsed.cooldownMs };
+ return c.json(responseBody, 200);
+ });
+
app.get("/concurrency/status", (c) => {
if (opts.concurrencyService === undefined) {
const body: ConcurrencyStatusResponse = { providers: [] };