summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-27 03:03:53 +0900
committerAdam Malczewski <[email protected]>2026-06-27 03:03:53 +0900
commita6b95188a110464b6ffa0334c8af58463f2a36f2 (patch)
treeeb6ef57909e164be4ae721ea1fb25585354d351e
parentad9d135e583c99a0d93327115defa43187cde1c3 (diff)
downloaddispatch-a6b95188a110464b6ffa0334c8af58463f2a36f2.tar.gz
dispatch-a6b95188a110464b6ffa0334c8af58463f2a36f2.zip
feat(provider-concurrency): implement per-provider in-memory concurrency limits with oldest-agent-first scheduling
-rw-r--r--bun.lock12
-rw-r--r--packages/host-bin/package.json1
-rw-r--r--packages/host-bin/src/main.ts2
-rw-r--r--packages/host-bin/tsconfig.json3
-rw-r--r--packages/provider-concurrency/package.json11
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.test.ts386
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.ts327
-rw-r--r--packages/provider-concurrency/src/extension.ts61
-rw-r--r--packages/provider-concurrency/src/index.ts10
-rw-r--r--packages/provider-concurrency/src/provider-wrapper.test.ts173
-rw-r--r--packages/provider-concurrency/src/provider-wrapper.ts59
-rw-r--r--packages/provider-concurrency/src/service.ts11
-rw-r--r--packages/provider-concurrency/tsconfig.json6
-rw-r--r--packages/session-orchestrator/package.json1
-rw-r--r--packages/session-orchestrator/src/extension.ts14
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts49
-rw-r--r--packages/session-orchestrator/tsconfig.json1
-rw-r--r--packages/transport-contract/src/index.ts53
-rw-r--r--packages/transport-http/package.json1
-rw-r--r--packages/transport-http/src/app.ts90
-rw-r--r--packages/transport-http/src/extension.ts15
-rw-r--r--packages/transport-http/src/seam.ts2
-rw-r--r--packages/transport-http/tsconfig.json2
-rw-r--r--tsconfig.json3
24 files changed, 1293 insertions, 0 deletions
diff --git a/bun.lock b/bun.lock
index 2261ba8..602e42a 100644
--- a/bun.lock
+++ b/bun.lock
@@ -84,6 +84,7 @@
"@dispatch/lsp": "workspace:*",
"@dispatch/mcp": "workspace:*",
"@dispatch/message-queue": "workspace:*",
+ "@dispatch/provider-concurrency": "workspace:*",
"@dispatch/provider-openai-compat": "workspace:*",
"@dispatch/provider-umans": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
@@ -161,6 +162,13 @@
"@dispatch/wire": "workspace:*",
},
},
+ "packages/provider-concurrency": {
+ "name": "@dispatch/provider-concurrency",
+ "version": "0.0.0",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ },
+ },
"packages/provider-openai-compat": {
"name": "@dispatch/provider-openai-compat",
"version": "0.0.0",
@@ -186,6 +194,7 @@
"@dispatch/credential-store": "workspace:*",
"@dispatch/kernel": "workspace:*",
"@dispatch/message-queue": "workspace:*",
+ "@dispatch/provider-concurrency": "workspace:*",
"@dispatch/system-prompt": "workspace:*",
},
},
@@ -338,6 +347,7 @@
"@dispatch/kernel": "workspace:*",
"@dispatch/lsp": "workspace:*",
"@dispatch/mcp": "workspace:*",
+ "@dispatch/provider-concurrency": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
"@dispatch/system-prompt": "workspace:*",
"@dispatch/throughput-store": "workspace:*",
@@ -415,6 +425,8 @@
"@dispatch/openai-stream": ["@dispatch/openai-stream@workspace:packages/openai-stream"],
+ "@dispatch/provider-concurrency": ["@dispatch/provider-concurrency@workspace:packages/provider-concurrency"],
+
"@dispatch/provider-openai-compat": ["@dispatch/provider-openai-compat@workspace:packages/provider-openai-compat"],
"@dispatch/provider-umans": ["@dispatch/provider-umans@workspace:packages/provider-umans"],
diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json
index 65ea305..e68251b 100644
--- a/packages/host-bin/package.json
+++ b/packages/host-bin/package.json
@@ -13,6 +13,7 @@
"@dispatch/exec-backend": "workspace:*",
"@dispatch/heartbeat": "workspace:*",
"@dispatch/provider-openai-compat": "workspace:*",
+ "@dispatch/provider-concurrency": "workspace:*",
"@dispatch/provider-umans": "workspace:*",
"@dispatch/message-queue": "workspace:*",
"@dispatch/mcp": "workspace:*",
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index 8633052..2ab1118 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -24,6 +24,7 @@ import {
import { extension as lspExt } from "@dispatch/lsp";
import { extension as mcpExt } from "@dispatch/mcp";
import { extension as messageQueueExt } from "@dispatch/message-queue";
+import { extension as providerConcurrencyExt } from "@dispatch/provider-concurrency";
import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat";
import { extension as providerUmansExt } from "@dispatch/provider-umans";
import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator";
@@ -79,6 +80,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [
authApikeyExt,
providerOpenaiCompatExt,
providerUmansExt,
+ providerConcurrencyExt,
// exec-backend must precede the tool extensions that
// `dependsOn: ["exec-backend"]` (tool-edit-file/read/shell/write). It
// provides the ExecBackendResolver the tools resolve through; placing it
diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json
index 2b1edf5..cb85915 100644
--- a/packages/host-bin/tsconfig.json
+++ b/packages/host-bin/tsconfig.json
@@ -23,6 +23,9 @@
"path": "../message-queue"
},
{
+ "path": "../provider-concurrency"
+ },
+ {
"path": "../skills"
},
{
diff --git a/packages/provider-concurrency/package.json b/packages/provider-concurrency/package.json
new file mode 100644
index 0000000..10c522a
--- /dev/null
+++ b/packages/provider-concurrency/package.json
@@ -0,0 +1,11 @@
+{
+ "name": "@dispatch/provider-concurrency",
+ "version": "0.0.0",
+ "type": "module",
+ "private": true,
+ "main": "dist/index.js",
+ "types": "dist/index.d.ts",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*"
+ }
+}
diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts
new file mode 100644
index 0000000..bfefb4a
--- /dev/null
+++ b/packages/provider-concurrency/src/concurrency-manager.test.ts
@@ -0,0 +1,386 @@
+import { describe, expect, it } from "vitest";
+import { type ConcurrencyService, createConcurrencyManager } from "./concurrency-manager.js";
+
+// ─── Fake timers ──────────────────────────────────────────────────────────────
+
+interface FakeTimer {
+ fire: () => void;
+ cleared: boolean;
+}
+
+function createFakeTimers() {
+ let currentTime = 0;
+ const intervals: FakeTimer[] = [];
+ const timeouts: { time: number; fire: () => void; cleared: boolean }[] = [];
+
+ const setInterval = ((_fn: () => void, _ms: number) => {
+ const timer: FakeTimer = { fire: () => _fn(), cleared: false };
+ intervals.push(timer);
+ return timer as unknown as ReturnType<typeof setInterval>;
+ }) as typeof setInterval;
+
+ const clearInterval = ((timer: ReturnType<typeof setInterval>) => {
+ const t = timer as unknown as FakeTimer;
+ t.cleared = true;
+ }) as typeof clearInterval;
+
+ const setTimeout = ((_fn: () => void, ms: number) => {
+ const entry = { time: currentTime + ms, fire: () => _fn(), cleared: false };
+ timeouts.push(entry);
+ return entry as unknown as ReturnType<typeof setTimeout>;
+ }) as typeof setTimeout;
+
+ const clearTimeout = ((timer: ReturnType<typeof setTimeout>) => {
+ const t = timer as unknown as { cleared: boolean };
+ t.cleared = true;
+ }) as typeof clearTimeout;
+
+ return {
+ now: () => currentTime,
+ advance(ms: number) {
+ currentTime += ms;
+ // Fire any due timeouts.
+ for (const entry of timeouts) {
+ if (!entry.cleared && entry.time <= currentTime) {
+ entry.cleared = true;
+ entry.fire();
+ }
+ }
+ },
+ fireIntervals() {
+ for (const timer of intervals) {
+ if (!timer.cleared) timer.fire();
+ }
+ },
+ setInterval,
+ clearInterval,
+ setTimeout,
+ clearTimeout,
+ };
+}
+
+function createManager(): {
+ manager: ConcurrencyService;
+ timers: ReturnType<typeof createFakeTimers>;
+} {
+ const timers = createFakeTimers();
+ const manager = createConcurrencyManager({
+ now: timers.now,
+ slotTimeoutMs: 5000,
+ watchdogIntervalMs: 1000,
+ defaultPauseMs: 30000,
+ setTimeout: timers.setTimeout,
+ clearTimeout: timers.clearTimeout,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+ return { manager, timers };
+}
+
+describe("createConcurrencyManager", () => {
+ it("returns no-op release for providers with no configured limit", async () => {
+ const { manager } = createManager();
+ const release = await manager.acquire("unknown", "conv1", 0);
+ expect(typeof release).toBe("function");
+ // No state → release is a no-op, no error.
+ release();
+ expect(manager.getStatus("unknown")).toBeUndefined();
+ });
+
+ it("grants immediately when under the limit", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 4);
+
+ const release1 = await manager.acquire("umans", "conv1", 0);
+ const status = manager.getStatus("umans");
+ expect(status).toEqual({
+ providerId: "umans",
+ limit: 4,
+ inFlight: 1,
+ queued: 0,
+ paused: false,
+ });
+ release1();
+ expect(manager.getStatus("umans")?.inFlight).toBe(0);
+ });
+
+ it("queues when at the limit and grants on release (FIFO when same priority)", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 1);
+
+ const release1 = await manager.acquire("umans", "conv1", 100);
+
+ // Second request should block (at limit).
+ let resolved = false;
+ const promise2 = manager.acquire("umans", "conv2", 200).then((r) => {
+ resolved = true;
+ return r;
+ });
+
+ // Let microtasks settle.
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+ expect(manager.getStatus("umans")?.queued).toBe(1);
+
+ // Release the first slot.
+ release1();
+
+ const release2 = await promise2;
+ expect(resolved).toBe(true);
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+ expect(manager.getStatus("umans")?.queued).toBe(0);
+ release2();
+ });
+
+ it("grants to the oldest agent first (priority queue by promptStartedAt)", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 1);
+
+ // Hold the single slot.
+ const release0 = await manager.acquire("umans", "holder", 0);
+
+ // Three agents queue with different prompt start times.
+ // Agent C started latest (t=300), Agent A started earliest (t=100).
+ const results: string[] = [];
+ const acquireAndRecord = (conv: string, promptAt: number) =>
+ manager.acquire("umans", conv, promptAt).then((r) => {
+ results.push(conv);
+ return r;
+ });
+
+ // Queue in non-sorted order: B (t=200), A (t=100), C (t=300).
+ const pB = acquireAndRecord("convB", 200);
+ const pA = acquireAndRecord("convA", 100);
+ const pC = acquireAndRecord("convC", 300);
+
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(results).toEqual([]); // none resolved yet.
+
+ // Release the holder. The oldest agent (A, t=100) should get the slot first.
+ release0();
+
+ const rA = await pA;
+ expect(results).toEqual(["convA"]);
+
+ rA.release ? rA.release() : rA();
+
+ // Now B (t=200) should be next.
+ const rB = await pB;
+ expect(results).toEqual(["convA", "convB"]);
+ rB.release ? rB.release() : rB();
+
+ // Then C (t=300).
+ const rC = await pC;
+ expect(results).toEqual(["convA", "convB", "convC"]);
+ rC.release ? rC.release() : rC();
+ });
+
+ it("does not grant slots while paused (429 backoff)", async () => {
+ const { manager, timers } = createManager();
+ manager.setLimit("umans", 1);
+
+ const release1 = await manager.acquire("umans", "conv1", 0);
+ release1();
+
+ // Simulate a 429 → queue pauses.
+ manager.reportRateLimit("umans");
+ const status = manager.getStatus("umans");
+ expect(status?.paused).toBe(true);
+ expect(status?.pausedUntil).toBe(30000);
+
+ // A new acquire should block (paused, even though under limit).
+ let resolved = false;
+ const promise = manager.acquire("umans", "conv2", 0).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+
+ // Advance past the pause duration.
+ timers.advance(30000);
+
+ const release2 = await promise;
+ expect(resolved).toBe(true);
+ expect(manager.getStatus("umans")?.paused).toBe(false);
+ release2();
+ });
+
+ it("respects retryAfterMs for 429 backoff", () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 2);
+
+ manager.reportRateLimit("umans", 5000);
+ expect(manager.getStatus("umans")?.pausedUntil).toBe(5000);
+ });
+
+ it("watchdog reclaims slots held beyond the timeout", async () => {
+ const { manager, timers } = createManager();
+ manager.setLimit("umans", 1);
+
+ const release = await manager.acquire("umans", "conv1", 0);
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+
+ // Advance past the slot timeout (5000ms) and fire the watchdog.
+ timers.advance(5001);
+ timers.fireIntervals();
+
+ // The watchdog should have force-released the slot.
+ expect(manager.getStatus("umans")?.inFlight).toBe(0);
+
+ // Calling release again (from the holder) should be a no-op (idempotent).
+ release();
+ expect(manager.getStatus("umans")?.inFlight).toBe(0);
+ });
+
+ it("watchdog grants the next waiter after reclaiming a stale slot", async () => {
+ const { manager, timers } = createManager();
+ manager.setLimit("umans", 1);
+
+ // Hold the slot.
+ await manager.acquire("umans", "holder", 0);
+
+ // Queue a waiter.
+ let resolved = false;
+ const promise = manager.acquire("umans", "waiter", 10).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+
+ // Watchdog reclaims the held slot.
+ timers.advance(5001);
+ timers.fireIntervals();
+
+ // The waiter should now be granted.
+ const release = await promise;
+ expect(resolved).toBe(true);
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+ release();
+ });
+
+ it("setLimit grants queued requests when the limit increases", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 1);
+
+ const release1 = await manager.acquire("umans", "conv1", 0);
+
+ // Queue a waiter.
+ let resolved = false;
+ const promise = manager.acquire("umans", "conv2", 100).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+
+ // Increase the limit → the queued request should be granted.
+ manager.setLimit("umans", 2);
+
+ const release2 = await promise;
+ expect(resolved).toBe(true);
+ expect(manager.getStatus("umans")?.inFlight).toBe(2);
+
+ release2();
+ release1();
+ });
+
+ it("removeLimit grants all queued requests and removes the state", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 1);
+
+ const release1 = await manager.acquire("umans", "conv1", 0);
+
+ // Queue two waiters.
+ const p2 = manager.acquire("umans", "conv2", 100);
+ const p3 = manager.acquire("umans", "conv3", 200);
+ await Promise.resolve();
+ await Promise.resolve();
+
+ // Remove the limit → all queued requests should be granted.
+ manager.removeLimit("umans");
+
+ const r2 = await p2;
+ const r3 = await p3;
+ expect(manager.getStatus("umans")).toBeUndefined();
+
+ // Releases work (no error after state removal).
+ r2();
+ r3();
+ release1();
+ });
+
+ it("getLimits returns all configured limits", () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 4);
+ manager.setLimit("openai-compat", 5);
+
+ const limits = manager.getLimits();
+ expect(limits).toHaveLength(2);
+ expect(limits).toContainEqual({ providerId: "umans", limit: 4 });
+ expect(limits).toContainEqual({ providerId: "openai-compat", limit: 5 });
+ });
+
+ it("getStatusAll returns status for all configured providers", () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 4);
+ manager.setLimit("anthropic", 3);
+
+ const statuses = manager.getStatusAll();
+ expect(statuses).toHaveLength(2);
+ const umans = statuses.find((s) => s.providerId === "umans");
+ expect(umans).toEqual({
+ providerId: "umans",
+ limit: 4,
+ inFlight: 0,
+ queued: 0,
+ paused: false,
+ });
+ });
+
+ it("destroy clears timers without error", () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 4);
+ manager.reportRateLimit("umans", 5000);
+ expect(() => manager.destroy()).not.toThrow();
+ });
+
+ it("release is idempotent (double-release does not overshoot)", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 2);
+
+ const release = await manager.acquire("umans", "conv1", 0);
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+
+ release();
+ expect(manager.getStatus("umans")?.inFlight).toBe(0);
+
+ // Double-release should not decrement below 0.
+ release();
+ expect(manager.getStatus("umans")?.inFlight).toBe(0);
+ });
+
+ it("multiple concurrent acquires up to the limit all resolve immediately", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 3);
+
+ const releases = await Promise.all([
+ manager.acquire("umans", "conv1", 0),
+ manager.acquire("umans", "conv2", 0),
+ manager.acquire("umans", "conv3", 0),
+ ]);
+
+ expect(manager.getStatus("umans")?.inFlight).toBe(3);
+
+ for (const release of releases) {
+ release();
+ }
+ expect(manager.getStatus("umans")?.inFlight).toBe(0);
+ });
+});
diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts
new file mode 100644
index 0000000..e7535cf
--- /dev/null
+++ b/packages/provider-concurrency/src/concurrency-manager.ts
@@ -0,0 +1,327 @@
+/**
+ * In-memory per-provider concurrency limiter.
+ *
+ * Tracks and limits how many concurrent API requests (token-generating
+ * requests) are in flight per provider. When the limit is reached, additional
+ * requests queue and are granted slots based on oldest-agent-first priority
+ * (the agent whose current prompt started the longest ago wins the next slot).
+ *
+ * A watchdog reclaims slots held beyond a timeout (deadlock / stuck-agent
+ * recovery). 429 backoff pauses a provider's queue for a configurable duration.
+ *
+ * This module is the PURE decision logic. It takes an injected clock (`now`)
+ * and injected timers (`setTimeout`/`clearTimeout`/`setInterval`/`clearInterval`)
+ * so it is fully testable with deterministic fake time. The extension layer
+ * wires real timers.
+ */
+
+// ─── Types ───────────────────────────────────────────────────────────────────
+
+/** Status snapshot for a single provider's concurrency state. */
+export interface ProviderConcurrencyStatus {
+ readonly providerId: string;
+ /** Configured concurrency limit. Always present (status is only returned for providers with a limit). */
+ readonly limit: number;
+ /** Currently in-flight (held) slots. */
+ readonly inFlight: number;
+ /** Agents waiting in the queue for a slot. */
+ readonly queued: number;
+ /** Whether the queue is paused (429 backoff). */
+ readonly paused: boolean;
+ /** When the pause expires (epoch-ms). Present only when paused. */
+ readonly pausedUntil?: number;
+}
+
+/**
+ * The limiter surface a consumer (session-orchestrator) needs: acquire a
+ * slot before a provider stream starts, release it when the stream completes,
+ * and report rate-limit (429) events so the manager can back off.
+ */
+export interface ConcurrencyLimiter {
+ /**
+ * Acquire a concurrency slot for `providerId`. Resolves immediately when a
+ * slot is available; otherwise blocks (queued by oldest-agent-first) until
+ * one frees up. The returned function MUST be called when the response
+ * stream completes (in a `finally` block). For providers with no configured
+ * limit, resolves instantly with a no-op release.
+ *
+ * @param providerId The provider to limit (e.g. "umans", "openai-compat").
+ * @param conversationId The agent requesting the slot.
+ * @param promptStartedAt When the agent's current prompt (turn) started
+ * (epoch-ms). Used for oldest-agent-first scheduling.
+ */
+ acquire(providerId: string, conversationId: string, promptStartedAt: number): Promise<() => void>;
+
+ /**
+ * Report a 429 from a provider. Pauses the queue for that provider for
+ * `retryAfterMs` (or a default duration when omitted). Queued and in-flight
+ * requests are unaffected; new `acquire` calls block until the pause expires.
+ */
+ reportRateLimit(providerId: string, retryAfterMs?: number): void;
+}
+
+/**
+ * The full service surface (limiter + config + status) for HTTP routes.
+ */
+export interface ConcurrencyService extends ConcurrencyLimiter {
+ /** Set the concurrency limit for a provider. Creates the state if new. */
+ setLimit(providerId: string, limit: number): void;
+ /** Get the configured limit, or `undefined` when none. */
+ getLimit(providerId: string): number | undefined;
+ /** Remove the limit for a provider (makes it unlimited). */
+ removeLimit(providerId: string): void;
+ /** All configured limits as `{ providerId, limit }` entries. */
+ getLimits(): readonly { providerId: string; limit: number }[];
+ /** Status for one provider, or `undefined` when no limit is configured. */
+ getStatus(providerId: string): ProviderConcurrencyStatus | undefined;
+ /** Status for every provider with a configured limit. */
+ getStatusAll(): readonly ProviderConcurrencyStatus[];
+ /** Stop the watchdog + clear all timers. */
+ destroy(): void;
+}
+
+// ─── Internal state ───────────────────────────────────────────────────────────
+
+interface Slot {
+ readonly conversationId: string;
+ readonly acquiredAt: number;
+ /** Idempotent release — safe to call from the holder or the watchdog. */
+ readonly releaseFn: () => void;
+}
+
+interface QueuedWaiter {
+ readonly conversationId: string;
+ readonly promptStartedAt: number;
+ readonly resolve: (release: () => void) => void;
+}
+
+interface ProviderState {
+ limit: number;
+ inFlight: number;
+ slots: Map<number, Slot>;
+ queue: QueuedWaiter[];
+ paused: boolean;
+ pausedUntil: number | undefined;
+ pauseTimer: ReturnType<typeof setTimeout> | undefined;
+}
+
+export interface ConcurrencyManagerOpts {
+ /** Monotonic-ish clock (epoch-ms). */
+ readonly now: () => number;
+ /** Max time a slot may be held before the watchdog reclaims it (ms). */
+ readonly slotTimeoutMs: number;
+ /** How often the watchdog sweeps (ms). */
+ readonly watchdogIntervalMs: number;
+ /** Default pause duration when a 429 arrives without Retry-After (ms). */
+ readonly defaultPauseMs: number;
+ /** Injected timers (default: global). Override in tests for deterministic time. */
+ readonly setTimeout?: typeof setTimeout;
+ readonly clearTimeout?: typeof clearTimeout;
+ readonly setInterval?: typeof setInterval;
+ readonly clearInterval?: typeof clearInterval;
+ /** Optional logger for watchdog + pause events. */
+ readonly onWatchdogReclaim?: (providerId: string, conversationId: string, heldMs: number) => void;
+ readonly onPause?: (providerId: string, durationMs: number) => void;
+}
+
+function noopRelease(): void {
+ // No limit configured → nothing to release.
+}
+
+export function createConcurrencyManager(opts: ConcurrencyManagerOpts): ConcurrencyService {
+ const now = opts.now;
+ const slotTimeoutMs = opts.slotTimeoutMs;
+ const defaultPauseMs = opts.defaultPauseMs;
+ const setTimeout = opts.setTimeout ?? globalThis.setTimeout.bind(globalThis);
+ const clearTimeout = opts.clearTimeout ?? globalThis.clearTimeout.bind(globalThis);
+ const setInterval = opts.setInterval ?? globalThis.setInterval.bind(globalThis);
+ const clearInterval = opts.clearInterval ?? globalThis.clearInterval.bind(globalThis);
+
+ const states = new Map<string, ProviderState>();
+ let slotIdCounter = 0;
+
+ // ── Slot granting ──────────────────────────────────────────────────────────
+
+ function grantSlot(state: ProviderState, providerId: string, conversationId: string): () => void {
+ const id = slotIdCounter++;
+ let released = false;
+ const releaseFn = () => {
+ if (released) return;
+ released = true;
+ state.slots.delete(id);
+ state.inFlight--;
+ tryGrantNext(providerId);
+ };
+ state.slots.set(id, {
+ conversationId,
+ acquiredAt: now(),
+ releaseFn,
+ });
+ state.inFlight++;
+ return releaseFn;
+ }
+
+ function tryGrantNext(providerId: string): void {
+ const state = states.get(providerId);
+ if (state === undefined) return;
+ if (state.paused) return;
+ while (state.queue.length > 0 && state.inFlight < state.limit) {
+ const waiter = state.queue[0];
+ if (waiter === undefined) break;
+ state.queue.shift();
+ const releaseFn = grantSlot(state, providerId, waiter.conversationId);
+ waiter.resolve(releaseFn);
+ }
+ }
+
+ // ── Watchdog ──────────────────────────────────────────────────────────────────
+
+ function sweep(): void {
+ const currentNow = now();
+ for (const [providerId, state] of states) {
+ for (const [, slot] of state.slots) {
+ const heldMs = currentNow - slot.acquiredAt;
+ if (heldMs > slotTimeoutMs) {
+ opts.onWatchdogReclaim?.(providerId, slot.conversationId, heldMs);
+ slot.releaseFn();
+ }
+ }
+ }
+ }
+
+ const watchdogTimer = setInterval(sweep, opts.watchdogIntervalMs);
+
+ // ── Public API ─────────────────────────────────────────────────────────────
+
+ const manager: ConcurrencyService = {
+ acquire(providerId, conversationId, promptStartedAt) {
+ const state = states.get(providerId);
+ if (state === undefined) {
+ // No limit configured → unlimited.
+ return Promise.resolve(noopRelease);
+ }
+
+ if (!state.paused && state.inFlight < state.limit) {
+ return Promise.resolve(grantSlot(state, providerId, conversationId));
+ }
+
+ // Queue (oldest-agent-first by promptStartedAt).
+ return new Promise<() => void>((resolve) => {
+ state.queue.push({ conversationId, promptStartedAt, resolve });
+ // Keep sorted ascending by promptStartedAt (oldest first).
+ // Insertion sort would be O(n), but the queue is typically tiny (<20),
+ // so a simple sort is fine and keeps the code simple.
+ state.queue.sort((a, b) => a.promptStartedAt - b.promptStartedAt);
+ });
+ },
+
+ reportRateLimit(providerId, retryAfterMs) {
+ const state = states.get(providerId);
+ if (state === undefined) return;
+
+ const pauseDuration = retryAfterMs ?? defaultPauseMs;
+ state.paused = true;
+ state.pausedUntil = now() + pauseDuration;
+
+ if (state.pauseTimer !== undefined) {
+ clearTimeout(state.pauseTimer);
+ }
+ opts.onPause?.(providerId, pauseDuration);
+ state.pauseTimer = setTimeout(() => {
+ state.paused = false;
+ state.pausedUntil = undefined;
+ state.pauseTimer = undefined;
+ tryGrantNext(providerId);
+ }, pauseDuration);
+ },
+
+ setLimit(providerId, limit) {
+ let state = states.get(providerId);
+ if (state === undefined) {
+ state = {
+ limit,
+ inFlight: 0,
+ slots: new Map(),
+ queue: [],
+ paused: false,
+ pausedUntil: undefined,
+ pauseTimer: undefined,
+ };
+ states.set(providerId, state);
+ } else {
+ state.limit = limit;
+ }
+ // A higher limit may let queued requests through.
+ tryGrantNext(providerId);
+ },
+
+ getLimit(providerId) {
+ return states.get(providerId)?.limit;
+ },
+
+ removeLimit(providerId) {
+ const state = states.get(providerId);
+ if (state === undefined) return;
+
+ // Clear pause.
+ state.paused = false;
+ state.pausedUntil = undefined;
+ if (state.pauseTimer !== undefined) {
+ clearTimeout(state.pauseTimer);
+ state.pauseTimer = undefined;
+ }
+
+ // Grant all queued requests (they become unlimited now).
+ while (state.queue.length > 0) {
+ const waiter = state.queue[0];
+ if (waiter === undefined) break;
+ state.queue.shift();
+ const releaseFn = grantSlot(state, providerId, waiter.conversationId);
+ waiter.resolve(releaseFn);
+ }
+
+ // Remove the state. In-flight slots' release functions still work —
+ // they close over `state` and call `tryGrantNext` which finds no state
+ // and returns early. The watchdog won't sweep removed states.
+ states.delete(providerId);
+ },
+
+ getLimits() {
+ return [...states.entries()].map(([providerId, s]) => ({
+ providerId,
+ limit: s.limit,
+ }));
+ },
+
+ getStatus(providerId) {
+ const state = states.get(providerId);
+ if (state === undefined) return undefined;
+ return {
+ providerId,
+ limit: state.limit,
+ inFlight: state.inFlight,
+ queued: state.queue.length,
+ paused: state.paused,
+ ...(state.pausedUntil !== undefined ? { pausedUntil: state.pausedUntil } : {}),
+ };
+ },
+
+ getStatusAll() {
+ return [...states.keys()]
+ .map((providerId) => manager.getStatus(providerId))
+ .filter((s): s is ProviderConcurrencyStatus => s !== undefined);
+ },
+
+ destroy() {
+ clearInterval(watchdogTimer);
+ for (const state of states.values()) {
+ if (state.pauseTimer !== undefined) {
+ clearTimeout(state.pauseTimer);
+ }
+ }
+ states.clear();
+ },
+ };
+
+ return manager;
+}
diff --git a/packages/provider-concurrency/src/extension.ts b/packages/provider-concurrency/src/extension.ts
new file mode 100644
index 0000000..c741173
--- /dev/null
+++ b/packages/provider-concurrency/src/extension.ts
@@ -0,0 +1,61 @@
+import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
+import { type ConcurrencyService, createConcurrencyManager } from "./concurrency-manager.js";
+import { concurrencyServiceHandle } from "./service.js";
+
+export const manifest: Manifest = {
+ id: "provider-concurrency",
+ name: "Provider Concurrency Limits",
+ version: "0.0.0",
+ apiVersion: "^0.1.0",
+ trust: "bundled",
+ activation: "eager",
+ contributes: { services: ["provider-concurrency/service"] },
+};
+
+/**
+ * Default tuning constants.
+ *
+ * - `SLOT_TIMEOUT_MS` (5 min): a slot held longer than this is force-reclaimed
+ * by the watchdog (deadlock / stuck-agent recovery). Generation streams
+ * rarely exceed 2–3 minutes; 5 min is a generous safety margin.
+ * - `WATCHDOG_INTERVAL_MS` (30s): how often the watchdog sweeps for stale slots.
+ * - `DEFAULT_PAUSE_MS` (30s): default 429 backoff when no Retry-After is given.
+ * Umans docs note each concurrency 429 deprioritizes the account for ~30 min,
+ * but a 30s queue pause prevents immediate re-overshoot while still allowing
+ * recovery.
+ */
+const SLOT_TIMEOUT_MS = 5 * 60 * 1000;
+const WATCHDOG_INTERVAL_MS = 30 * 1000;
+const DEFAULT_PAUSE_MS = 30 * 1000;
+
+export function activate(host: HostAPI): void {
+ const logger = host.logger;
+
+ const manager: ConcurrencyService = createConcurrencyManager({
+ now: () => Date.now(),
+ slotTimeoutMs: SLOT_TIMEOUT_MS,
+ watchdogIntervalMs: WATCHDOG_INTERVAL_MS,
+ defaultPauseMs: DEFAULT_PAUSE_MS,
+ onWatchdogReclaim: (providerId, conversationId, heldMs) => {
+ logger.warn("provider-concurrency: watchdog reclaimed stale slot", {
+ providerId,
+ conversationId,
+ heldMs,
+ });
+ },
+ onPause: (providerId, durationMs) => {
+ logger.warn("provider-concurrency: 429 backoff — pausing queue", {
+ providerId,
+ durationMs,
+ });
+ },
+ });
+
+ host.provideService(concurrencyServiceHandle, manager);
+ logger.info("provider-concurrency: registered");
+}
+
+export const extension: Extension = {
+ manifest,
+ activate,
+};
diff --git a/packages/provider-concurrency/src/index.ts b/packages/provider-concurrency/src/index.ts
new file mode 100644
index 0000000..f35c070
--- /dev/null
+++ b/packages/provider-concurrency/src/index.ts
@@ -0,0 +1,10 @@
+export {
+ type ConcurrencyLimiter,
+ type ConcurrencyManagerOpts,
+ type ConcurrencyService,
+ createConcurrencyManager,
+ type ProviderConcurrencyStatus,
+} from "./concurrency-manager.js";
+export { extension, manifest } from "./extension.js";
+export { wrapProviderWithConcurrency } from "./provider-wrapper.js";
+export { concurrencyServiceHandle } from "./service.js";
diff --git a/packages/provider-concurrency/src/provider-wrapper.test.ts b/packages/provider-concurrency/src/provider-wrapper.test.ts
new file mode 100644
index 0000000..e024d59
--- /dev/null
+++ b/packages/provider-concurrency/src/provider-wrapper.test.ts
@@ -0,0 +1,173 @@
+import type { ProviderContract, ProviderEvent } from "@dispatch/kernel";
+import { describe, expect, it } from "vitest";
+import type { ConcurrencyLimiter } from "./concurrency-manager.js";
+import { wrapProviderWithConcurrency } from "./provider-wrapper.js";
+
+/** Build a fake provider that yields a sequence of events. */
+function fakeProvider(events: ProviderEvent[]): ProviderContract {
+ return {
+ id: "test-provider",
+ stream: async function* (): AsyncIterable<ProviderEvent> {
+ for (const e of events) {
+ yield e;
+ }
+ },
+ };
+}
+
+/** A fake limiter that records acquire/release calls. */
+function recordingLimiter(): ConcurrencyLimiter & {
+ acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[];
+ releaseCalls: number;
+ rateLimitReports: string[];
+} {
+ const acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[] =
+ [];
+ const releaseCalls: { count: number } = { count: 0 };
+ const rateLimitReports: string[] = [];
+
+ return {
+ acquireCalls,
+ get releaseCalls() {
+ return releaseCalls.count;
+ },
+ rateLimitReports,
+ acquire(providerId, conversationId, promptStartedAt) {
+ acquireCalls.push({ providerId, conversationId, promptStartedAt });
+ return Promise.resolve(() => {
+ releaseCalls.count++;
+ });
+ },
+ reportRateLimit(providerId) {
+ rateLimitReports.push(providerId);
+ },
+ };
+}
+
+describe("wrapProviderWithConcurrency", () => {
+ it("acquires a slot before streaming and releases after the stream completes", async () => {
+ const provider = fakeProvider([
+ { type: "text-delta", delta: "hello" },
+ { type: "finish", reason: "stop" },
+ ]);
+ const limiter = recordingLimiter();
+
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 12345);
+
+ const events: ProviderEvent[] = [];
+ for await (const e of wrapped.stream([], [])) {
+ events.push(e);
+ }
+
+ // Slot acquired before stream, released after.
+ expect(limiter.acquireCalls).toEqual([
+ { providerId: "test-provider", conversationId: "conv1", promptStartedAt: 12345 },
+ ]);
+ expect(limiter.releaseCalls).toBe(1);
+ expect(events).toEqual([
+ { type: "text-delta", delta: "hello" },
+ { type: "finish", reason: "stop" },
+ ]);
+ });
+
+ it("releases the slot even when the stream throws", async () => {
+ const provider: ProviderContract = {
+ id: "err-provider",
+ stream: async function* (): AsyncIterable<ProviderEvent> {
+ yield { type: "text-delta", delta: "partial" };
+ throw new Error("stream exploded");
+ },
+ };
+ const limiter = recordingLimiter();
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+
+ await expect(async () => {
+ for await (const _e of wrapped.stream([], [])) {
+ // consume
+ }
+ }).rejects.toThrow("stream exploded");
+
+ expect(limiter.releaseCalls).toBe(1);
+ });
+
+ it("reports 429 errors to the limiter", async () => {
+ const provider = fakeProvider([
+ { type: "error", message: "Too many requests", code: "429", retryable: true },
+ ]);
+ const limiter = recordingLimiter();
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+
+ const events: ProviderEvent[] = [];
+ for await (const e of wrapped.stream([], [])) {
+ events.push(e);
+ }
+
+ expect(limiter.rateLimitReports).toEqual(["test-provider"]);
+ // The 429 error event is still yielded to the consumer (kernel handles retry).
+ expect(events).toHaveLength(1);
+ expect(events[0]?.type).toBe("error");
+ });
+
+ it("does not report non-429 errors", async () => {
+ const provider = fakeProvider([
+ { type: "error", message: "Internal error", code: "500", retryable: true },
+ ]);
+ const limiter = recordingLimiter();
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+
+ for await (const _e of wrapped.stream([], [])) {
+ // consume
+ }
+
+ expect(limiter.rateLimitReports).toEqual([]);
+ });
+
+ it("preserves the provider id and listModels", async () => {
+ const provider: ProviderContract = {
+ id: "my-provider",
+ stream: async function* (): AsyncIterable<ProviderEvent> {
+ yield { type: "finish", reason: "stop" };
+ },
+ listModels: async () => [{ id: "model-1" }],
+ };
+ const limiter = recordingLimiter();
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+
+ expect(wrapped.id).toBe("my-provider");
+ expect(wrapped.listModels).toBeDefined();
+ const models = await wrapped.listModels?.();
+ expect(models).toEqual([{ id: "model-1" }]);
+ });
+
+ it("passes through messages, tools, and opts to the inner stream", async () => {
+ let receivedArgs:
+ | {
+ messages: unknown;
+ tools: unknown;
+ opts: unknown;
+ }
+ | undefined;
+
+ const provider: ProviderContract = {
+ id: "passthrough",
+ stream: async function* (messages, tools, opts): AsyncIterable<ProviderEvent> {
+ receivedArgs = { messages, tools, opts };
+ yield { type: "finish", reason: "stop" };
+ },
+ };
+ const limiter = recordingLimiter();
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+
+ const messages = [{ role: "user" as const, chunks: [{ type: "text" as const, text: "hi" }] }];
+ const tools = [{ name: "test_tool", description: "test", parameters: {} }];
+ const opts = { model: "gpt-4" };
+
+ for await (const _e of wrapped.stream(messages, tools, opts)) {
+ // consume
+ }
+
+ expect(receivedArgs?.messages).toBe(messages);
+ expect(receivedArgs?.tools).toBe(tools);
+ expect(receivedArgs?.opts).toBe(opts);
+ });
+});
diff --git a/packages/provider-concurrency/src/provider-wrapper.ts b/packages/provider-concurrency/src/provider-wrapper.ts
new file mode 100644
index 0000000..ee3ca85
--- /dev/null
+++ b/packages/provider-concurrency/src/provider-wrapper.ts
@@ -0,0 +1,59 @@
+import type {
+ ChatMessage,
+ ProviderContract,
+ ProviderEvent,
+ ProviderStreamOptions,
+ ToolContract,
+} from "@dispatch/kernel";
+import type { ConcurrencyLimiter } from "./concurrency-manager.js";
+
+/**
+ * Wrap a provider's `stream` method with concurrency limiting.
+ *
+ * A slot is acquired BEFORE the first event is yielded (before the HTTP
+ * request is sent — the `await limiter.acquire()` runs before the generator
+ * body starts iterating the inner stream). The slot is released in a `finally`
+ * block AFTER the inner stream completes (the full response stream, not just
+ * HTTP headers — matching the Umans concurrency model where a slot is held
+ * only while tokens are actually generating).
+ *
+ * 429 detection: if the provider yields an `error` event with `code: "429"`,
+ * the limiter is notified so it can pause the queue for that provider.
+ *
+ * @param provider The underlying provider to wrap.
+ * @param limiter The concurrency limiter (acquire/release/reportRateLimit).
+ * @param conversationId The agent requesting the stream (for slot attribution).
+ * @param promptStartedAt When the agent's current prompt (turn) started
+ * (epoch-ms, for oldest-agent-first scheduling).
+ */
+export function wrapProviderWithConcurrency(
+ provider: ProviderContract,
+ limiter: ConcurrencyLimiter,
+ conversationId: string,
+ promptStartedAt: number,
+): ProviderContract {
+ const innerStream = provider.stream;
+ const providerId = provider.id;
+
+ return {
+ id: provider.id,
+ stream: async function* (
+ messages: readonly ChatMessage[],
+ tools: readonly ToolContract[],
+ opts?: ProviderStreamOptions,
+ ): AsyncIterable<ProviderEvent> {
+ const release = await limiter.acquire(providerId, conversationId, promptStartedAt);
+ try {
+ for await (const event of innerStream(messages, tools, opts)) {
+ if (event.type === "error" && event.code === "429") {
+ limiter.reportRateLimit(providerId);
+ }
+ yield event;
+ }
+ } finally {
+ release();
+ }
+ },
+ ...(provider.listModels !== undefined ? { listModels: provider.listModels } : {}),
+ };
+}
diff --git a/packages/provider-concurrency/src/service.ts b/packages/provider-concurrency/src/service.ts
new file mode 100644
index 0000000..aa578e8
--- /dev/null
+++ b/packages/provider-concurrency/src/service.ts
@@ -0,0 +1,11 @@
+import { defineService } from "@dispatch/kernel";
+import type { ConcurrencyService } from "./concurrency-manager.js";
+
+/**
+ * Typed service handle for the provider-concurrency service. The
+ * `provider-concurrency` extension provides the implementation; the
+ * session-orchestrator + transport-http consume it.
+ */
+export const concurrencyServiceHandle = defineService<ConcurrencyService>(
+ "provider-concurrency/service",
+);
diff --git a/packages/provider-concurrency/tsconfig.json b/packages/provider-concurrency/tsconfig.json
new file mode 100644
index 0000000..44ed916
--- /dev/null
+++ b/packages/provider-concurrency/tsconfig.json
@@ -0,0 +1,6 @@
+{
+ "extends": "../../tsconfig.base.json",
+ "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true },
+ "include": ["src/**/*.ts"],
+ "references": [{ "path": "../kernel" }]
+}
diff --git a/packages/session-orchestrator/package.json b/packages/session-orchestrator/package.json
index ba34c4d..b9f3d22 100644
--- a/packages/session-orchestrator/package.json
+++ b/packages/session-orchestrator/package.json
@@ -10,6 +10,7 @@
"@dispatch/conversation-store": "workspace:*",
"@dispatch/credential-store": "workspace:*",
"@dispatch/message-queue": "workspace:*",
+ "@dispatch/provider-concurrency": "workspace:*",
"@dispatch/system-prompt": "workspace:*"
}
}
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index 5afffd8..0cd83ef 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -3,6 +3,7 @@ import { credentialStoreHandle } from "@dispatch/credential-store";
import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
import { runTurn } from "@dispatch/kernel";
import { messageQueueHandle } from "@dispatch/message-queue";
+import { concurrencyServiceHandle } from "@dispatch/provider-concurrency";
import { systemPromptHandle } from "@dispatch/system-prompt";
import {
cacheWarmHandle,
@@ -93,6 +94,19 @@ export function activate(host: HostAPI): void {
return undefined;
}
},
+ resolveConcurrencyLimiter: () => {
+ // Lazily resolve the concurrency limiter. Returns undefined when the
+ // provider-concurrency extension isn't loaded (no concurrency limiting —
+ // feature degrades off). Lazy so activation order with
+ // provider-concurrency doesn't matter; called per-turn, not at activate.
+ const loaded = host.getExtensions().some((m) => m.id === "provider-concurrency");
+ if (!loaded) return undefined;
+ try {
+ return host.getService(concurrencyServiceHandle);
+ } catch {
+ return undefined;
+ }
+ },
});
host.provideService(sessionOrchestratorHandle, orchestrator);
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 96cd3a3..b73647d 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -20,6 +20,8 @@ import type {
} from "@dispatch/kernel";
import { defineEventHook, defineService, type ServiceHandle } from "@dispatch/kernel";
import type { MessageQueueService, QueuedMessage } from "@dispatch/message-queue";
+import type { ConcurrencyLimiter } from "@dispatch/provider-concurrency";
+import { wrapProviderWithConcurrency } from "@dispatch/provider-concurrency";
import type { SystemPromptService } from "@dispatch/system-prompt";
import { createMetricsAccumulator } from "./metrics.js";
import {
@@ -335,6 +337,14 @@ export interface SessionOrchestratorDeps {
* order doesn't matter.
*/
readonly resolveSystemPrompt?: () => SystemPromptService | undefined;
+ /**
+ * Lazily resolves the concurrency limiter, or `undefined` when the
+ * provider-concurrency extension isn't loaded (no concurrency limiting —
+ * feature degrades off). When present, each resolved provider is wrapped so
+ * that a concurrency slot is acquired before the stream starts and released
+ * when the stream completes. Lazy so activation order doesn't matter.
+ */
+ readonly resolveConcurrencyLimiter?: () => ConcurrencyLimiter | undefined;
/** Apply the per-turn tools filter chain. Injected for testability. */
readonly applyToolsFilter: (assembly: ToolAssembly) => Promise<ToolAssembly>;
/** Base logger (auto-scoped to this extension); childed per turn for span capture. */
@@ -439,6 +449,7 @@ export function createSessionOrchestrator(
systemPromptOverride: string | undefined,
): void {
const turnId = generateTurnId();
+ const promptStartedAt = deps.now?.() ?? Date.now();
const controller = new AbortController();
activeTurns.set(conversationId, { buffer: [], turnId, controller });
activeConversations.add(conversationId);
@@ -594,6 +605,22 @@ export function createSessionOrchestrator(
provider = deps.resolveProvider();
}
+ // Wrap the resolved provider with concurrency limiting when the
+ // provider-concurrency extension is loaded. The slot is acquired
+ // before the stream starts (before the HTTP request) and released
+ // when the stream completes (after all tokens are generated). The
+ // promptStartedAt (turn start time) is used for oldest-agent-first
+ // scheduling when multiple agents are queued.
+ const limiter = deps.resolveConcurrencyLimiter?.();
+ if (limiter !== undefined) {
+ provider = wrapProviderWithConcurrency(
+ provider,
+ limiter,
+ conversationId,
+ promptStartedAt,
+ );
+ }
+
const baseTools = deps.resolveTools();
const assembled = await deps.applyToolsFilter({
tools: baseTools,
@@ -990,6 +1017,17 @@ export function createWarmService(
provider = deps.resolveProvider();
}
+ // Wrap with concurrency limiting (same as the main turn path).
+ const warmLimiter = deps.resolveConcurrencyLimiter?.();
+ if (warmLimiter !== undefined) {
+ provider = wrapProviderWithConcurrency(
+ provider,
+ warmLimiter,
+ conversationId,
+ deps.now?.() ?? Date.now(),
+ );
+ }
+
const baseTools = deps.resolveTools();
// Resolve cwd the SAME way handleMessage does — pass opts.cwd as the overrideCwd
// The tools filter is cwd-sensitive (e.g. skill discovery rewrites the
@@ -1136,6 +1174,17 @@ export function createCompactionService(
provider = deps.resolveProvider();
}
+ // Wrap with concurrency limiting (same as the main turn path).
+ const compactionLimiter = deps.resolveConcurrencyLimiter?.();
+ if (compactionLimiter !== undefined) {
+ provider = wrapProviderWithConcurrency(
+ provider,
+ compactionLimiter,
+ conversationId,
+ deps.now?.() ?? Date.now(),
+ );
+ }
+
// Build the summarization request: system prompt + conversation text + instruction
const conversationText = formatMessagesForSummary(toSummarize);
const summaryRequest: ChatMessage = {
diff --git a/packages/session-orchestrator/tsconfig.json b/packages/session-orchestrator/tsconfig.json
index bc729fc..f316387 100644
--- a/packages/session-orchestrator/tsconfig.json
+++ b/packages/session-orchestrator/tsconfig.json
@@ -7,6 +7,7 @@
{ "path": "../conversation-store" },
{ "path": "../credential-store" },
{ "path": "../message-queue" },
+ { "path": "../provider-concurrency" },
{ "path": "../system-prompt" }
]
}
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index 6a9a29f..400d9d5 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -985,3 +985,56 @@ export interface HeartbeatRunsResponse {
export interface StopHeartbeatRunResponse {
readonly ok: true;
}
+
+// ─── Provider concurrency limits ──────────────────────────────────────────────
+
+/**
+ * Response of `GET /concurrency/limits` — all providers with configured
+ * concurrency limits. Each entry pairs a provider id (e.g. "umans",
+ * "openai-compat") with its maximum concurrent in-flight requests. Providers
+ * not listed here have no limit (unlimited).
+ */
+export interface ConcurrencyLimitsResponse {
+ readonly limits: readonly { readonly providerId: string; readonly limit: number }[];
+}
+
+/**
+ * Body of `PUT /concurrency/limits/:providerId` — set or update the concurrency
+ * limit for a provider. `limit` must be a positive integer. When a limit is
+ * set, requests beyond the limit queue (oldest-agent-first) rather than being
+ * sent immediately.
+ */
+export interface SetConcurrencyLimitRequest {
+ readonly limit: number;
+}
+
+/** Response of `GET/PUT /concurrency/limits/:providerId` — the configured limit. */
+export interface ConcurrencyLimitResponse {
+ readonly providerId: string;
+ readonly limit: number;
+}
+
+/**
+ * One provider's live concurrency status.
+ *
+ * - `inFlight`: how many slots are currently held (tokens being generated).
+ * - `queued`: how many agents are waiting for a slot.
+ * - `paused`: whether the queue is paused due to a 429 backoff.
+ * - `pausedUntil`: when the pause expires (epoch-ms), present only when paused.
+ */
+export interface ConcurrencyStatusEntry {
+ readonly providerId: string;
+ readonly limit: number;
+ readonly inFlight: number;
+ readonly queued: number;
+ readonly paused: boolean;
+ readonly pausedUntil?: number;
+}
+
+/**
+ * Response of `GET /concurrency/status` — live status for every provider with a
+ * configured limit. Providers without a limit are absent (they are unlimited).
+ */
+export interface ConcurrencyStatusResponse {
+ readonly providers: readonly ConcurrencyStatusEntry[];
+}
diff --git a/packages/transport-http/package.json b/packages/transport-http/package.json
index 3c722e2..71da855 100644
--- a/packages/transport-http/package.json
+++ b/packages/transport-http/package.json
@@ -12,6 +12,7 @@
"@dispatch/kernel": "workspace:*",
"@dispatch/lsp": "workspace:*",
"@dispatch/mcp": "workspace:*",
+ "@dispatch/provider-concurrency": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
"@dispatch/throughput-store": "workspace:*",
"@dispatch/transport-contract": "workspace:*",
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index 4fb295e..23f8dde 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -8,6 +8,9 @@ import type {
ComputerListResponse,
ComputerResponse,
ComputerStatusResponse,
+ ConcurrencyLimitResponse,
+ ConcurrencyLimitsResponse,
+ ConcurrencyStatusResponse,
ConversationComputerResponse,
ConversationHistoryResponse,
ConversationListResponse,
@@ -28,6 +31,7 @@ import type {
QueueResponse,
ReasoningEffortResponse,
SetCompactPercentRequest,
+ SetConcurrencyLimitRequest,
SetConversationComputerRequest,
SetSystemPromptTemplateRequest,
SetWorkspaceDefaultComputerRequest,
@@ -67,6 +71,7 @@ import {
import {
type CompactionService,
type ComputerService,
+ type ConcurrencyService,
type ConversationStore,
type CredentialStore,
conversationOpened,
@@ -110,6 +115,13 @@ export interface CreateServerOptions {
readonly computerService?: ComputerService;
/** Optional — defaults to a no-op store (recording disabled, empty reports). */
readonly throughputStore?: ThroughputStore;
+ /**
+ * Optional — provider concurrency limiter service (provided by the
+ * `provider-concurrency` extension). When absent (extension not loaded),
+ * the `/concurrency/*` routes degrade: limits returns empty, status returns
+ * empty, PUT returns 503.
+ */
+ readonly concurrencyService?: ConcurrencyService;
readonly logger?: Logger;
readonly generateId?: () => string;
/** Injectable clock for sample timestamps (default Date.now). */
@@ -562,6 +574,84 @@ export function createApp(opts: CreateServerOptions): Hono {
}
});
+ // ─── Provider concurrency limits ────────────────────────────────────────────
+
+ app.get("/concurrency/limits", (c) => {
+ if (opts.concurrencyService === undefined) {
+ const body: ConcurrencyLimitsResponse = { limits: [] };
+ return c.json(body, 200);
+ }
+ const limits = opts.concurrencyService.getLimits();
+ const body: ConcurrencyLimitsResponse = { limits };
+ return c.json(body, 200);
+ });
+
+ app.get("/concurrency/limits/:providerId", (c) => {
+ const providerId = c.req.param("providerId");
+ if (opts.concurrencyService === undefined) {
+ return c.json({ error: "Concurrency service not available" }, 503);
+ }
+ const limit = opts.concurrencyService.getLimit(providerId);
+ if (limit === undefined) {
+ return c.json({ error: "No concurrency limit configured for this provider" }, 404);
+ }
+ const body: ConcurrencyLimitResponse = { providerId, limit };
+ return c.json(body, 200);
+ });
+
+ app.put("/concurrency/limits/:providerId", async (c) => {
+ const providerId = c.req.param("providerId");
+ if (opts.concurrencyService === undefined) {
+ return c.json({ error: "Concurrency service not available" }, 503);
+ }
+
+ let body: unknown;
+ try {
+ body = await c.req.json();
+ } catch {
+ log.warn("concurrency: invalid JSON body");
+ return c.json({ error: "Invalid JSON body" }, 400);
+ }
+
+ const parsed = body as SetConcurrencyLimitRequest;
+ if (
+ parsed === null ||
+ typeof parsed !== "object" ||
+ typeof parsed.limit !== "number" ||
+ !Number.isInteger(parsed.limit) ||
+ parsed.limit <= 0
+ ) {
+ return c.json({ error: "Body must be { limit: <positive integer> }" }, 400);
+ }
+
+ opts.concurrencyService.setLimit(providerId, parsed.limit);
+ const responseBody: ConcurrencyLimitResponse = { providerId, limit: parsed.limit };
+ return c.json(responseBody, 200);
+ });
+
+ app.delete("/concurrency/limits/:providerId", (c) => {
+ const providerId = c.req.param("providerId");
+ if (opts.concurrencyService === undefined) {
+ return c.json({ error: "Concurrency service not available" }, 503);
+ }
+ const existing = opts.concurrencyService.getLimit(providerId);
+ if (existing === undefined) {
+ return c.json({ error: "No concurrency limit configured for this provider" }, 404);
+ }
+ opts.concurrencyService.removeLimit(providerId);
+ return c.json({ ok: true, providerId }, 200);
+ });
+
+ app.get("/concurrency/status", (c) => {
+ if (opts.concurrencyService === undefined) {
+ const body: ConcurrencyStatusResponse = { providers: [] };
+ return c.json(body, 200);
+ }
+ const statuses = opts.concurrencyService.getStatusAll();
+ const body: ConcurrencyStatusResponse = { providers: statuses };
+ return c.json(body, 200);
+ });
+
app.post("/conversations/:id/close", (c) => {
const conversationId = c.req.param("id");
const { abortedTurn } = opts.orchestrator.closeConversation(conversationId);
diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts
index 76d58b6..f46fca5 100644
--- a/packages/transport-http/src/extension.ts
+++ b/packages/transport-http/src/extension.ts
@@ -2,9 +2,11 @@ import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
import { createApp } from "./app.js";
import {
type ComputerService,
+ type ConcurrencyService,
cacheWarmHandle,
compactionHandle,
computerServiceHandle,
+ concurrencyServiceHandle,
conversationStoreHandle,
credentialStoreHandle,
heartbeatServiceHandle,
@@ -39,6 +41,9 @@ export const manifest: Manifest = {
"/computers/:alias",
"/computers/:alias/status",
"/computers/:alias/test",
+ "/concurrency/limits",
+ "/concurrency/limits/:providerId",
+ "/concurrency/status",
"/conversations",
"/conversations/:id",
"/conversations/:id/close",
@@ -105,6 +110,15 @@ export function createTransportHttpExtension(): Extension & {
} catch {
computerService = undefined;
}
+ // Optional: the `provider-concurrency` extension provides the
+ // concurrency limiter service. NOT in dependsOn (may be absent), so
+ // resolve defensively — when absent the /concurrency/* routes degrade.
+ let concurrencyService: ConcurrencyService | undefined;
+ try {
+ concurrencyService = host.getService(concurrencyServiceHandle);
+ } catch {
+ concurrencyService = undefined;
+ }
const logger = host.logger;
const app = createApp({
@@ -119,6 +133,7 @@ export function createTransportHttpExtension(): Extension & {
systemPromptService,
heartbeatService,
...(computerService !== undefined ? { computerService } : {}),
+ ...(concurrencyService !== undefined ? { concurrencyService } : {}),
logger,
emit: host.emit.bind(host),
...(process.env.DISPATCH_WEB_DIR !== undefined
diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts
index c60edf0..dcb3f80 100644
--- a/packages/transport-http/src/seam.ts
+++ b/packages/transport-http/src/seam.ts
@@ -12,6 +12,8 @@ export type { LspServerStatus, LspService } from "@dispatch/lsp";
export { lspServiceHandle } from "@dispatch/lsp";
export type { McpServerStatus, McpService } from "@dispatch/mcp";
export { mcpServiceHandle } from "@dispatch/mcp";
+export type { ConcurrencyService } from "@dispatch/provider-concurrency";
+export { concurrencyServiceHandle } from "@dispatch/provider-concurrency";
export type {
CompactionService,
SessionOrchestrator,
diff --git a/packages/transport-http/tsconfig.json b/packages/transport-http/tsconfig.json
index 4a76434..7759bce 100644
--- a/packages/transport-http/tsconfig.json
+++ b/packages/transport-http/tsconfig.json
@@ -8,6 +8,8 @@
{ "path": "../heartbeat" },
{ "path": "../kernel" },
{ "path": "../lsp" },
+ { "path": "../mcp" },
+ { "path": "../provider-concurrency" },
{ "path": "../session-orchestrator" },
{ "path": "../system-prompt" },
{ "path": "../throughput-store" },
diff --git a/tsconfig.json b/tsconfig.json
index e4e833d..d31b44a 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -98,6 +98,9 @@
"path": "./packages/heartbeat"
},
{
+ "path": "./packages/provider-concurrency"
+ },
+ {
"path": "./packages/system-prompt"
},
{