summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-27 16:36:20 +0900
committerAdam Malczewski <[email protected]>2026-06-27 16:36:20 +0900
commit87e85e026e54b1dc25b0648af298ab0a8a715701 (patch)
tree81f31e3261f245e71c285254b2c64e030a3190e1
parentbfe2a26963ea9b50da4ae8985280ceca9d880adf (diff)
downloaddispatch-87e85e026e54b1dc25b0648af298ab0a8a715701.tar.gz
dispatch-87e85e026e54b1dc25b0648af298ab0a8a715701.zip
feat(provider-concurrency): add release cooldown (200ms) to prevent N+1 overshoot from provider accounting lag
-rw-r--r--notes/concurrency-library-investigation.md228
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.test.ts66
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.ts36
-rw-r--r--packages/provider-concurrency/src/extension.ts8
4 files changed, 335 insertions, 3 deletions
diff --git a/notes/concurrency-library-investigation.md b/notes/concurrency-library-investigation.md
new file mode 100644
index 0000000..abf7dab
--- /dev/null
+++ b/notes/concurrency-library-investigation.md
@@ -0,0 +1,228 @@
+# Concurrency Library Investigation
+
+> Research-only. No code changes. Investigated 2026-06-26.
+
+## 1. ai-concurrency-shaper (joeycumines)
+
+**Repository:** https://github.com/joeycumines/ai-concurrency-shaper
+
+### What it is
+A **standalone reverse proxy** written in **Go**. It sits between your
+application and an upstream AI/LLM API (e.g. Anthropic, OpenAI), limiting
+concurrent requests to configured HTTP routes. Requests exceeding the limit
+block until a slot opens. Non-matching requests pass through unmodified.
+
+### How it works
+- Uses a **token-bucket channel** (Go's `chan struct{}`) as a semaphore. Each
+ limited request acquires a token; the token is returned when the request
+ completes (full response body streamed).
+- Supports **per-route** concurrency limits via `-limit "POST /v1/chat/completions:4"`
+ and **global** concurrency via `-global-concurrency 10`.
+- Supports **route grouping** — multiple routes can share one limiter via
+ `@group` syntax.
+- Has a **circuit breaker** (trips after N failures in a window, exponential
+ backoff with phantom concurrency holds).
+- Has **concurrency protection flags**: release cooldown, cancel cooldown,
+ failure hold, adaptive headroom (reduce effective limit by 1 after a 429).
+- **429 handling**: skips retrying 429s by default (`-retry-skip-429=true`).
+- Optional **TUI dashboard** (Bubble Tea) with live metrics.
+- **In-memory only** — no persistence, no external state (single process).
+
+### Maturity
+- **1 star, 1 fork, 7 commits**, created June 10, 2026 (16 days ago).
+- **Single contributor** (Joseph Cumines).
+- **GPL-3.0 license** (copyleft — incompatible with closed-source distribution).
+- Written in **Go**, not JavaScript/TypeScript.
+
+### Critical finding: language mismatch
+This is a **Go binary**, not an npm library. It cannot be imported into a
+Bun/TypeScript application. To use it, we would need to:
+1. Run it as a **separate process** (a sidecar proxy).
+2. Point our provider's `baseURL` at the proxy's listen address.
+3. The proxy forwards to the real upstream.
+
+This fundamentally changes our architecture from "in-process concurrency
+management" to "external proxy-based concurrency management."
+
+### Feature comparison vs our implementation
+
+| Feature | Our impl | ai-concurrency-shaper |
+|---|---|---|
+| Per-provider configurable limits | ✅ (runtime API + in-memory) | ✅ (per-route CLI flags, not runtime) |
+| Oldest-agent-first scheduling | ✅ (priority queue by promptStartedAt) | ❌ (FIFO semaphore, no priority) |
+| Slot held only during token generation | ✅ (acquire/release around stream) | ✅ (token held until response body completes) |
+| 429 backoff | ✅ (pause queue, Retry-After aware) | ✅ (circuit breaker + adaptive headroom) |
+| Watchdog/deadlock recovery | ✅ (timeout-based slot reclaim) | ❌ (no watchdog; relies on queue-timeout) |
+| Runtime-configurable limits | ✅ (HTTP API, no restart) | ❌ (CLI flags only, requires restart) |
+| In-memory / no external state | ✅ | ✅ |
+| Language | TypeScript (Bun) | Go |
+| Integration model | In-process (wraps ProviderContract) | External proxy (separate process) |
+| Status reporting API | ✅ (HTTP endpoints) | TUI only (no HTTP API) |
+| License | Our code (MIT-style) | GPL-3.0 |
+
+### What would change to use it
+1. Deploy the Go binary as a sidecar process.
+2. Change each provider's `baseURL` to point at the proxy.
+3. Lose runtime-configurable limits (CLI flags only).
+4. Lose oldest-agent-first scheduling (FIFO only).
+5. Lose the in-process status API (TUI only, no HTTP).
+6. Add a process management dependency (start/stop/monitor the proxy).
+7. Accept GPL-3.0 license implications.
+
+## 2. Drawbacks of using ai-concurrency-shaper
+
+### Fatal drawbacks
+1. **No oldest-agent-first scheduling.** Our implementation prioritizes the
+ agent whose prompt started longest ago. The proxy uses a simple FIFO
+ semaphore — no priority queue. This is a core requirement.
+2. **No runtime-configurable limits.** Limits are set via CLI flags at startup.
+ Our frontend has a settings UI to add/remove/update limits at runtime.
+3. **Language mismatch.** It's a Go binary, not an npm package. We'd need to
+ run it as a separate process, adding operational complexity.
+4. **No HTTP status API.** The proxy exposes status only via TUI. Our frontend
+ polls `GET /concurrency/status` for live in-flight/queued/paused state.
+5. **GPL-3.0 license.** Copyleft — may be incompatible with our distribution
+ model.
+
+### Secondary drawbacks
+6. **No watchdog.** If a slot holder dies (e.g. the proxy's connection to the
+ client drops but the upstream request is still in flight), there's no
+ timeout-based reclaim. It relies on `-queue-timeout` for queue wait, not
+ for held slots.
+7. **Immature.** 1 star, 7 commits, 16 days old, single contributor. No
+ community, no battle-testing.
+8. **No provider-awareness.** It limits by HTTP route pattern, not by
+ "provider." Our implementation is keyed on `providerId` (e.g. "umans",
+ "openai-compat"), which maps naturally to our `ProviderContract.id`.
+9. **Operational overhead.** Running a separate Go process alongside the Bun
+ app adds deployment complexity, monitoring burden, and a failure mode
+ (proxy down = all requests fail).
+
+## 3. Other libraries/tools for AI API concurrency
+
+### p-queue (sindresorhus)
+- **npm**, TypeScript, MIT, 4.2k stars, 727k dependents.
+- General-purpose promise queue with concurrency control.
+- Supports **priority** (`{priority: number}` per task), **rate limiting**
+ (`intervalCap` + `interval`), **pause/resume**, **timeouts**, **custom
+ queue class** (for custom scheduling), AbortSignal cancellation.
+- **No AI-specific logic** — no 429 detection, no provider concept, no
+ streaming-awareness.
+- Could be used as a **building block** to replace our queue implementation,
+ but we'd still need the provider wrapper, 429 detection, watchdog, and
+ status API on top.
+- **Feature complete** (maintainer says no further development planned, but
+ accepts PRs).
+
+### p-limit (sindresorhus)
+- **npm**, TypeScript, MIT, 5.4k dependents.
+- Simpler than p-queue — just limits concurrent executions. No queue, no
+ priority, no pause. Not suitable for our use case (we need queuing).
+
+### ai-sdk-rate-limiter (piyushgupta344)
+- **npm**, TypeScript, zero dependencies.
+- Designed for the Vercel AI SDK (`@ai-sdk/*`). Wraps model objects.
+- Has **priority queuing** (high/normal/low lanes), **429 backoff with
+ Retry-After**, **cost tracking & budget enforcement**, **multi-tenant
+ scopes**, **Redis for multi-instance**, **observability** (Prometheus,
+ OpenTelemetry).
+- Built-in limits for OpenAI, Anthropic, Google, Groq, Mistral, Cohere.
+- **Closest to our use case** of any npm library found.
+- **Maturity unknown** — couldn't verify star count or maintenance activity
+ from the docs site. Appears to be a solo project.
+- **Concerns**: It's designed for the Vercel AI SDK's model-wrapping pattern.
+ Our provider architecture uses `ProviderContract.stream()` returning an
+ `AsyncIterable<ProviderEvent>`, not the Vercel AI SDK's model interface.
+ We'd need an adapter. It also doesn't expose a status API for the frontend.
+
+### LiteLLM (BerriAI)
+- **Python**, MIT, 51.7k GitHub stars, very mature (39k+ commits).
+- Full **LLM gateway/proxy** — not a library. Runs as a separate server.
+- Has `enforce_model_rate_limits` for RPM/TPM hard limits.
+- Supports **least-busy routing**, **latency-based routing**, **cost-based
+ routing**, **fallbacks**, **deployment priority**.
+- **No concurrency limiting** — it limits requests-per-minute (RPM) and
+ tokens-per-minute (TPM), not concurrent in-flight requests. RPM is a rate
+ limit, not a concurrency limit. An agent that sends 4 requests in 1 second
+ and then waits would be blocked by RPM=60 but not by concurrency=4.
+- Would require running a Python proxy server alongside our Bun app.
+- Overkill for our use case — it's a full LLM gateway with virtual keys, cost
+ tracking, guardrails, etc.
+
+### Portkey AI Gateway
+- **Hosted/self-hosted**, open-source (Apache-2.0).
+- Full AI gateway with routing, fallbacks, caching, observability.
+- Rate limiting is **Enterprise-only** and per-team/per-key, not per-provider
+ concurrency.
+- Same architectural model as LiteLLM (external proxy).
+
+### Kong AI Gateway
+- Built on Kong's API gateway. Enterprise-focused.
+- Rate limiting is plugin-based (RPM/TPM), not concurrency-based.
+- Same external-proxy model.
+
+## 4. Recommendation
+
+### **Keep the custom implementation. Do not switch to a library.**
+
+### Rationale
+
+1. **No library matches our requirements.** The core differentiators of our
+ implementation — **oldest-agent-first scheduling**, **runtime-configurable
+ per-provider limits**, **in-process status API**, and **stream-aware slot
+ lifecycle** — are not found in any library or proxy we investigated:
+ - `ai-concurrency-shaper` is a Go proxy with FIFO, no priority, no runtime
+ config, no HTTP status API, and GPL-3.0.
+ - `p-queue` has priority but no AI-specific logic, no 429 detection, no
+ streaming-awareness, no status API.
+ - `ai-sdk-rate-limiter` is close but designed for the Vercel AI SDK's model
+ interface, not our `ProviderContract.stream()` pattern.
+ - LiteLLM/Portkey/Kong are full gateway proxies with RPM/TPM rate limiting,
+ not concurrency limiting, and require running a separate server.
+
+2. **Our implementation is small and well-tested.** The core
+ `concurrency-manager.ts` is ~280 lines of pure logic with 15 tests and
+ injected timers. The `provider-wrapper.ts` is ~70 lines with 6 tests. The
+ total surface is tiny — there's no maintenance burden to justify
+ offloading to a library.
+
+3. **The architecture is a perfect fit.** Our `ProviderContract` wrapping
+ pattern (acquire before stream, release after stream) is the cleanest
+ possible integration point. An external proxy would add a network hop,
+ a process to manage, and break the direct relationship between the
+ orchestrator and the provider.
+
+4. **Oldest-agent-first scheduling is a hard requirement.** No library or
+ proxy we found supports priority queuing by turn-start time. This is the
+ core value proposition of our implementation — older agents complete
+ sooner, reducing overall wait time.
+
+5. **Runtime configurability is a hard requirement.** The frontend has a
+ settings UI for adding/removing/updating limits without restart. No
+ library supports this — they all require static configuration.
+
+### What we would lose by switching
+- Oldest-agent-first scheduling (no library supports it).
+- Runtime-configurable limits (no library supports it).
+- In-process status API (no library supports it).
+- Direct integration with `ProviderContract` (would need adapter or proxy).
+- ~280 lines of well-tested code that we own and control.
+
+### What we would gain by switching
+- Nothing we don't already have. The features libraries offer (circuit
+ breakers, adaptive headroom, TUI dashboards) are either already in our
+ implementation (429 backoff, watchdog) or not needed (TUI — we have a web
+ frontend).
+
+### When to reconsider
+- If we need **multi-instance** concurrency coordination (multiple app
+ processes sharing limits), we would need Redis-backed state. At that point,
+ `ai-sdk-rate-limiter` (which has a Redis store) or a custom Redis-backed
+ extension of our current implementation would be worth evaluating.
+- If we need **RPM/TPM rate limiting** (not just concurrency), LiteLLM or
+ Portkey could complement our concurrency limits. But these are orthogonal
+ concerns — RPM limits total requests per minute, while concurrency limits
+ in-flight requests at any moment.
+- If `ai-sdk-rate-limiter` matures and adds a status API + custom model
+ adapters, it could be worth re-evaluating as a replacement for the queue
+ layer (while keeping our provider wrapper + status API).
diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts
index bfefb4a..6583a34 100644
--- a/packages/provider-concurrency/src/concurrency-manager.test.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.test.ts
@@ -59,7 +59,7 @@ function createFakeTimers() {
};
}
-function createManager(): {
+function createManager(opts?: { releaseCooldownMs?: number }): {
manager: ConcurrencyService;
timers: ReturnType<typeof createFakeTimers>;
} {
@@ -69,6 +69,7 @@ function createManager(): {
slotTimeoutMs: 5000,
watchdogIntervalMs: 1000,
defaultPauseMs: 30000,
+ ...(opts?.releaseCooldownMs !== undefined ? { releaseCooldownMs: opts.releaseCooldownMs } : {}),
setTimeout: timers.setTimeout,
clearTimeout: timers.clearTimeout,
setInterval: timers.setInterval,
@@ -383,4 +384,67 @@ describe("createConcurrencyManager", () => {
}
expect(manager.getStatus("umans")?.inFlight).toBe(0);
});
+
+ it("release cooldown delays slot recycling (inFlight stays incremented during cooldown)", async () => {
+ const { manager, timers } = createManager({ releaseCooldownMs: 200 });
+ manager.setLimit("umans", 1);
+
+ const release1 = await manager.acquire("umans", "conv1", 0);
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+
+ // Queue a waiter.
+ let resolved = false;
+ const promise2 = manager.acquire("umans", "conv2", 100).then((r) => {
+ resolved = true;
+ return r;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolved).toBe(false);
+ expect(manager.getStatus("umans")?.queued).toBe(1);
+
+ // Release the slot — inFlight should stay 1 (cooldown active).
+ release1();
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+ expect(resolved).toBe(false); // waiter NOT granted yet
+
+ // Advance past the cooldown.
+ timers.advance(200);
+
+ // Now the slot is recycled and the waiter is granted.
+ const release2 = await promise2;
+ expect(resolved).toBe(true);
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+ expect(manager.getStatus("umans")?.queued).toBe(0);
+ release2();
+ });
+
+ it("release cooldown is idempotent (double-release only schedules one cooldown)", async () => {
+ const { manager, timers } = createManager({ releaseCooldownMs: 200 });
+ manager.setLimit("umans", 2);
+
+ const release = await manager.acquire("umans", "conv1", 0);
+ expect(manager.getStatus("umans")?.inFlight).toBe(1);
+
+ release();
+ expect(manager.getStatus("umans")?.inFlight).toBe(1); // still 1 (cooldown)
+
+ // Double-release should not schedule a second cooldown.
+ release();
+
+ // After cooldown, inFlight should drop by exactly 1 (to 0), not 2.
+ timers.advance(200);
+ expect(manager.getStatus("umans")?.inFlight).toBe(0);
+ });
+
+ it("destroy clears cooldown timers without error", () => {
+ const { manager } = createManager({ releaseCooldownMs: 200 });
+ manager.setLimit("umans", 1);
+ // Acquire + release to schedule a cooldown timer.
+ manager.acquire("umans", "conv1", 0).then((release) => {
+ release();
+ // Now there's a pending cooldown timer — destroy should clean it up.
+ expect(() => manager.destroy()).not.toThrow();
+ });
+ });
});
diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts
index e7535cf..49575f7 100644
--- a/packages/provider-concurrency/src/concurrency-manager.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.ts
@@ -114,6 +114,15 @@ export interface ConcurrencyManagerOpts {
readonly watchdogIntervalMs: number;
/** Default pause duration when a 429 arrives without Retry-After (ms). */
readonly defaultPauseMs: number;
+ /**
+ * Delay after a slot is released before the slot is recycled (ms). During
+ * this window `inFlight` stays incremented — a new `acquire` sees the slot
+ * as still held and queues. This covers the upstream provider's accounting
+ * lag: the provider's `concurrent_sessions` counter may not decrement the
+ * instant our stream completes, so re-admitting immediately risks an N+1
+ * overshoot. 0 = instant re-admission (no cooldown). Default: 0.
+ */
+ readonly releaseCooldownMs?: number;
/** Injected timers (default: global). Override in tests for deterministic time. */
readonly setTimeout?: typeof setTimeout;
readonly clearTimeout?: typeof clearTimeout;
@@ -132,12 +141,14 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
const now = opts.now;
const slotTimeoutMs = opts.slotTimeoutMs;
const defaultPauseMs = opts.defaultPauseMs;
+ const releaseCooldownMs = opts.releaseCooldownMs ?? 0;
const setTimeout = opts.setTimeout ?? globalThis.setTimeout.bind(globalThis);
const clearTimeout = opts.clearTimeout ?? globalThis.clearTimeout.bind(globalThis);
const setInterval = opts.setInterval ?? globalThis.setInterval.bind(globalThis);
const clearInterval = opts.clearInterval ?? globalThis.clearInterval.bind(globalThis);
const states = new Map<string, ProviderState>();
+ const cooldownTimers = new Set<ReturnType<typeof setTimeout>>();
let slotIdCounter = 0;
// ── Slot granting ──────────────────────────────────────────────────────────
@@ -149,8 +160,25 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
if (released) return;
released = true;
state.slots.delete(id);
- state.inFlight--;
- tryGrantNext(providerId);
+
+ // Recycle the slot: decrement inFlight + grant the next waiter.
+ // With a release cooldown > 0, defer this by the cooldown duration so
+ // the upstream provider has time to decrement its concurrent_sessions
+ // counter — preventing an N+1 overshoot from accounting lag. During the
+ // cooldown, inFlight stays incremented, so new acquires queue.
+ const recycle = () => {
+ state.inFlight--;
+ tryGrantNext(providerId);
+ };
+ if (releaseCooldownMs > 0) {
+ const timer = setTimeout(() => {
+ cooldownTimers.delete(timer);
+ recycle();
+ }, releaseCooldownMs);
+ cooldownTimers.add(timer);
+ } else {
+ recycle();
+ }
};
state.slots.set(id, {
conversationId,
@@ -314,6 +342,10 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
destroy() {
clearInterval(watchdogTimer);
+ for (const timer of cooldownTimers) {
+ clearTimeout(timer);
+ }
+ cooldownTimers.clear();
for (const state of states.values()) {
if (state.pauseTimer !== undefined) {
clearTimeout(state.pauseTimer);
diff --git a/packages/provider-concurrency/src/extension.ts b/packages/provider-concurrency/src/extension.ts
index c741173..fc2d578 100644
--- a/packages/provider-concurrency/src/extension.ts
+++ b/packages/provider-concurrency/src/extension.ts
@@ -23,10 +23,17 @@ export const manifest: Manifest = {
* Umans docs note each concurrency 429 deprioritizes the account for ~30 min,
* but a 30s queue pause prevents immediate re-overshoot while still allowing
* recovery.
+ * - `RELEASE_COOLDOWN_MS` (200ms): after a slot is released, hold it for this
+ * duration before recycling it to the next waiter. Covers the upstream
+ * provider's accounting lag — the provider's concurrent_sessions counter
+ * may not decrement the instant our stream completes, so re-admitting
+ * immediately risks an N+1 overshoot that triggers a 429. 200ms is the
+ * default most concurrency proxies use for AI/LLM APIs.
*/
const SLOT_TIMEOUT_MS = 5 * 60 * 1000;
const WATCHDOG_INTERVAL_MS = 30 * 1000;
const DEFAULT_PAUSE_MS = 30 * 1000;
+const RELEASE_COOLDOWN_MS = 200;
export function activate(host: HostAPI): void {
const logger = host.logger;
@@ -36,6 +43,7 @@ export function activate(host: HostAPI): void {
slotTimeoutMs: SLOT_TIMEOUT_MS,
watchdogIntervalMs: WATCHDOG_INTERVAL_MS,
defaultPauseMs: DEFAULT_PAUSE_MS,
+ releaseCooldownMs: RELEASE_COOLDOWN_MS,
onWatchdogReclaim: (providerId, conversationId, heldMs) => {
logger.warn("provider-concurrency: watchdog reclaimed stale slot", {
providerId,