diff options
| author | Adam Malczewski <[email protected]> | 2026-06-27 03:03:53 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-27 03:03:53 +0900 |
| commit | a6b95188a110464b6ffa0334c8af58463f2a36f2 (patch) | |
| tree | eb6ef57909e164be4ae721ea1fb25585354d351e | |
| parent | ad9d135e583c99a0d93327115defa43187cde1c3 (diff) | |
| download | dispatch-a6b95188a110464b6ffa0334c8af58463f2a36f2.tar.gz dispatch-a6b95188a110464b6ffa0334c8af58463f2a36f2.zip | |
feat(provider-concurrency): implement per-provider in-memory concurrency limits with oldest-agent-first scheduling
24 files changed, 1293 insertions, 0 deletions
@@ -84,6 +84,7 @@ "@dispatch/lsp": "workspace:*", "@dispatch/mcp": "workspace:*", "@dispatch/message-queue": "workspace:*", + "@dispatch/provider-concurrency": "workspace:*", "@dispatch/provider-openai-compat": "workspace:*", "@dispatch/provider-umans": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", @@ -161,6 +162,13 @@ "@dispatch/wire": "workspace:*", }, }, + "packages/provider-concurrency": { + "name": "@dispatch/provider-concurrency", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + }, + }, "packages/provider-openai-compat": { "name": "@dispatch/provider-openai-compat", "version": "0.0.0", @@ -186,6 +194,7 @@ "@dispatch/credential-store": "workspace:*", "@dispatch/kernel": "workspace:*", "@dispatch/message-queue": "workspace:*", + "@dispatch/provider-concurrency": "workspace:*", "@dispatch/system-prompt": "workspace:*", }, }, @@ -338,6 +347,7 @@ "@dispatch/kernel": "workspace:*", "@dispatch/lsp": "workspace:*", "@dispatch/mcp": "workspace:*", + "@dispatch/provider-concurrency": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", "@dispatch/system-prompt": "workspace:*", "@dispatch/throughput-store": "workspace:*", @@ -415,6 +425,8 @@ "@dispatch/openai-stream": ["@dispatch/openai-stream@workspace:packages/openai-stream"], + "@dispatch/provider-concurrency": ["@dispatch/provider-concurrency@workspace:packages/provider-concurrency"], + "@dispatch/provider-openai-compat": ["@dispatch/provider-openai-compat@workspace:packages/provider-openai-compat"], "@dispatch/provider-umans": ["@dispatch/provider-umans@workspace:packages/provider-umans"], diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json index 65ea305..e68251b 100644 --- a/packages/host-bin/package.json +++ b/packages/host-bin/package.json @@ -13,6 +13,7 @@ "@dispatch/exec-backend": "workspace:*", "@dispatch/heartbeat": "workspace:*", "@dispatch/provider-openai-compat": "workspace:*", + "@dispatch/provider-concurrency": "workspace:*", "@dispatch/provider-umans": "workspace:*", "@dispatch/message-queue": "workspace:*", "@dispatch/mcp": "workspace:*", diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index 8633052..2ab1118 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -24,6 +24,7 @@ import { import { extension as lspExt } from "@dispatch/lsp"; import { extension as mcpExt } from "@dispatch/mcp"; import { extension as messageQueueExt } from "@dispatch/message-queue"; +import { extension as providerConcurrencyExt } from "@dispatch/provider-concurrency"; import { extension as providerOpenaiCompatExt } from "@dispatch/provider-openai-compat"; import { extension as providerUmansExt } from "@dispatch/provider-umans"; import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestrator"; @@ -79,6 +80,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [ authApikeyExt, providerOpenaiCompatExt, providerUmansExt, + providerConcurrencyExt, // exec-backend must precede the tool extensions that // `dependsOn: ["exec-backend"]` (tool-edit-file/read/shell/write). It // provides the ExecBackendResolver the tools resolve through; placing it diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json index 2b1edf5..cb85915 100644 --- a/packages/host-bin/tsconfig.json +++ b/packages/host-bin/tsconfig.json @@ -23,6 +23,9 @@ "path": "../message-queue" }, { + "path": "../provider-concurrency" + }, + { "path": "../skills" }, { diff --git a/packages/provider-concurrency/package.json b/packages/provider-concurrency/package.json new file mode 100644 index 0000000..10c522a --- /dev/null +++ b/packages/provider-concurrency/package.json @@ -0,0 +1,11 @@ +{ + "name": "@dispatch/provider-concurrency", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*" + } +} diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts new file mode 100644 index 0000000..bfefb4a --- /dev/null +++ b/packages/provider-concurrency/src/concurrency-manager.test.ts @@ -0,0 +1,386 @@ +import { describe, expect, it } from "vitest"; +import { type ConcurrencyService, createConcurrencyManager } from "./concurrency-manager.js"; + +// ─── Fake timers ────────────────────────────────────────────────────────────── + +interface FakeTimer { + fire: () => void; + cleared: boolean; +} + +function createFakeTimers() { + let currentTime = 0; + const intervals: FakeTimer[] = []; + const timeouts: { time: number; fire: () => void; cleared: boolean }[] = []; + + const setInterval = ((_fn: () => void, _ms: number) => { + const timer: FakeTimer = { fire: () => _fn(), cleared: false }; + intervals.push(timer); + return timer as unknown as ReturnType<typeof setInterval>; + }) as typeof setInterval; + + const clearInterval = ((timer: ReturnType<typeof setInterval>) => { + const t = timer as unknown as FakeTimer; + t.cleared = true; + }) as typeof clearInterval; + + const setTimeout = ((_fn: () => void, ms: number) => { + const entry = { time: currentTime + ms, fire: () => _fn(), cleared: false }; + timeouts.push(entry); + return entry as unknown as ReturnType<typeof setTimeout>; + }) as typeof setTimeout; + + const clearTimeout = ((timer: ReturnType<typeof setTimeout>) => { + const t = timer as unknown as { cleared: boolean }; + t.cleared = true; + }) as typeof clearTimeout; + + return { + now: () => currentTime, + advance(ms: number) { + currentTime += ms; + // Fire any due timeouts. + for (const entry of timeouts) { + if (!entry.cleared && entry.time <= currentTime) { + entry.cleared = true; + entry.fire(); + } + } + }, + fireIntervals() { + for (const timer of intervals) { + if (!timer.cleared) timer.fire(); + } + }, + setInterval, + clearInterval, + setTimeout, + clearTimeout, + }; +} + +function createManager(): { + manager: ConcurrencyService; + timers: ReturnType<typeof createFakeTimers>; +} { + const timers = createFakeTimers(); + const manager = createConcurrencyManager({ + now: timers.now, + slotTimeoutMs: 5000, + watchdogIntervalMs: 1000, + defaultPauseMs: 30000, + setTimeout: timers.setTimeout, + clearTimeout: timers.clearTimeout, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + return { manager, timers }; +} + +describe("createConcurrencyManager", () => { + it("returns no-op release for providers with no configured limit", async () => { + const { manager } = createManager(); + const release = await manager.acquire("unknown", "conv1", 0); + expect(typeof release).toBe("function"); + // No state → release is a no-op, no error. + release(); + expect(manager.getStatus("unknown")).toBeUndefined(); + }); + + it("grants immediately when under the limit", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 4); + + const release1 = await manager.acquire("umans", "conv1", 0); + const status = manager.getStatus("umans"); + expect(status).toEqual({ + providerId: "umans", + limit: 4, + inFlight: 1, + queued: 0, + paused: false, + }); + release1(); + expect(manager.getStatus("umans")?.inFlight).toBe(0); + }); + + it("queues when at the limit and grants on release (FIFO when same priority)", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 1); + + const release1 = await manager.acquire("umans", "conv1", 100); + + // Second request should block (at limit). + let resolved = false; + const promise2 = manager.acquire("umans", "conv2", 200).then((r) => { + resolved = true; + return r; + }); + + // Let microtasks settle. + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + expect(manager.getStatus("umans")?.queued).toBe(1); + + // Release the first slot. + release1(); + + const release2 = await promise2; + expect(resolved).toBe(true); + expect(manager.getStatus("umans")?.inFlight).toBe(1); + expect(manager.getStatus("umans")?.queued).toBe(0); + release2(); + }); + + it("grants to the oldest agent first (priority queue by promptStartedAt)", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 1); + + // Hold the single slot. + const release0 = await manager.acquire("umans", "holder", 0); + + // Three agents queue with different prompt start times. + // Agent C started latest (t=300), Agent A started earliest (t=100). + const results: string[] = []; + const acquireAndRecord = (conv: string, promptAt: number) => + manager.acquire("umans", conv, promptAt).then((r) => { + results.push(conv); + return r; + }); + + // Queue in non-sorted order: B (t=200), A (t=100), C (t=300). + const pB = acquireAndRecord("convB", 200); + const pA = acquireAndRecord("convA", 100); + const pC = acquireAndRecord("convC", 300); + + await Promise.resolve(); + await Promise.resolve(); + expect(results).toEqual([]); // none resolved yet. + + // Release the holder. The oldest agent (A, t=100) should get the slot first. + release0(); + + const rA = await pA; + expect(results).toEqual(["convA"]); + + rA.release ? rA.release() : rA(); + + // Now B (t=200) should be next. + const rB = await pB; + expect(results).toEqual(["convA", "convB"]); + rB.release ? rB.release() : rB(); + + // Then C (t=300). + const rC = await pC; + expect(results).toEqual(["convA", "convB", "convC"]); + rC.release ? rC.release() : rC(); + }); + + it("does not grant slots while paused (429 backoff)", async () => { + const { manager, timers } = createManager(); + manager.setLimit("umans", 1); + + const release1 = await manager.acquire("umans", "conv1", 0); + release1(); + + // Simulate a 429 → queue pauses. + manager.reportRateLimit("umans"); + const status = manager.getStatus("umans"); + expect(status?.paused).toBe(true); + expect(status?.pausedUntil).toBe(30000); + + // A new acquire should block (paused, even though under limit). + let resolved = false; + const promise = manager.acquire("umans", "conv2", 0).then((r) => { + resolved = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // Advance past the pause duration. + timers.advance(30000); + + const release2 = await promise; + expect(resolved).toBe(true); + expect(manager.getStatus("umans")?.paused).toBe(false); + release2(); + }); + + it("respects retryAfterMs for 429 backoff", () => { + const { manager } = createManager(); + manager.setLimit("umans", 2); + + manager.reportRateLimit("umans", 5000); + expect(manager.getStatus("umans")?.pausedUntil).toBe(5000); + }); + + it("watchdog reclaims slots held beyond the timeout", async () => { + const { manager, timers } = createManager(); + manager.setLimit("umans", 1); + + const release = await manager.acquire("umans", "conv1", 0); + expect(manager.getStatus("umans")?.inFlight).toBe(1); + + // Advance past the slot timeout (5000ms) and fire the watchdog. + timers.advance(5001); + timers.fireIntervals(); + + // The watchdog should have force-released the slot. + expect(manager.getStatus("umans")?.inFlight).toBe(0); + + // Calling release again (from the holder) should be a no-op (idempotent). + release(); + expect(manager.getStatus("umans")?.inFlight).toBe(0); + }); + + it("watchdog grants the next waiter after reclaiming a stale slot", async () => { + const { manager, timers } = createManager(); + manager.setLimit("umans", 1); + + // Hold the slot. + await manager.acquire("umans", "holder", 0); + + // Queue a waiter. + let resolved = false; + const promise = manager.acquire("umans", "waiter", 10).then((r) => { + resolved = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // Watchdog reclaims the held slot. + timers.advance(5001); + timers.fireIntervals(); + + // The waiter should now be granted. + const release = await promise; + expect(resolved).toBe(true); + expect(manager.getStatus("umans")?.inFlight).toBe(1); + release(); + }); + + it("setLimit grants queued requests when the limit increases", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 1); + + const release1 = await manager.acquire("umans", "conv1", 0); + + // Queue a waiter. + let resolved = false; + const promise = manager.acquire("umans", "conv2", 100).then((r) => { + resolved = true; + return r; + }); + await Promise.resolve(); + await Promise.resolve(); + expect(resolved).toBe(false); + + // Increase the limit → the queued request should be granted. + manager.setLimit("umans", 2); + + const release2 = await promise; + expect(resolved).toBe(true); + expect(manager.getStatus("umans")?.inFlight).toBe(2); + + release2(); + release1(); + }); + + it("removeLimit grants all queued requests and removes the state", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 1); + + const release1 = await manager.acquire("umans", "conv1", 0); + + // Queue two waiters. + const p2 = manager.acquire("umans", "conv2", 100); + const p3 = manager.acquire("umans", "conv3", 200); + await Promise.resolve(); + await Promise.resolve(); + + // Remove the limit → all queued requests should be granted. + manager.removeLimit("umans"); + + const r2 = await p2; + const r3 = await p3; + expect(manager.getStatus("umans")).toBeUndefined(); + + // Releases work (no error after state removal). + r2(); + r3(); + release1(); + }); + + it("getLimits returns all configured limits", () => { + const { manager } = createManager(); + manager.setLimit("umans", 4); + manager.setLimit("openai-compat", 5); + + const limits = manager.getLimits(); + expect(limits).toHaveLength(2); + expect(limits).toContainEqual({ providerId: "umans", limit: 4 }); + expect(limits).toContainEqual({ providerId: "openai-compat", limit: 5 }); + }); + + it("getStatusAll returns status for all configured providers", () => { + const { manager } = createManager(); + manager.setLimit("umans", 4); + manager.setLimit("anthropic", 3); + + const statuses = manager.getStatusAll(); + expect(statuses).toHaveLength(2); + const umans = statuses.find((s) => s.providerId === "umans"); + expect(umans).toEqual({ + providerId: "umans", + limit: 4, + inFlight: 0, + queued: 0, + paused: false, + }); + }); + + it("destroy clears timers without error", () => { + const { manager } = createManager(); + manager.setLimit("umans", 4); + manager.reportRateLimit("umans", 5000); + expect(() => manager.destroy()).not.toThrow(); + }); + + it("release is idempotent (double-release does not overshoot)", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 2); + + const release = await manager.acquire("umans", "conv1", 0); + expect(manager.getStatus("umans")?.inFlight).toBe(1); + + release(); + expect(manager.getStatus("umans")?.inFlight).toBe(0); + + // Double-release should not decrement below 0. + release(); + expect(manager.getStatus("umans")?.inFlight).toBe(0); + }); + + it("multiple concurrent acquires up to the limit all resolve immediately", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 3); + + const releases = await Promise.all([ + manager.acquire("umans", "conv1", 0), + manager.acquire("umans", "conv2", 0), + manager.acquire("umans", "conv3", 0), + ]); + + expect(manager.getStatus("umans")?.inFlight).toBe(3); + + for (const release of releases) { + release(); + } + expect(manager.getStatus("umans")?.inFlight).toBe(0); + }); +}); diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts new file mode 100644 index 0000000..e7535cf --- /dev/null +++ b/packages/provider-concurrency/src/concurrency-manager.ts @@ -0,0 +1,327 @@ +/** + * In-memory per-provider concurrency limiter. + * + * Tracks and limits how many concurrent API requests (token-generating + * requests) are in flight per provider. When the limit is reached, additional + * requests queue and are granted slots based on oldest-agent-first priority + * (the agent whose current prompt started the longest ago wins the next slot). + * + * A watchdog reclaims slots held beyond a timeout (deadlock / stuck-agent + * recovery). 429 backoff pauses a provider's queue for a configurable duration. + * + * This module is the PURE decision logic. It takes an injected clock (`now`) + * and injected timers (`setTimeout`/`clearTimeout`/`setInterval`/`clearInterval`) + * so it is fully testable with deterministic fake time. The extension layer + * wires real timers. + */ + +// ─── Types ─────────────────────────────────────────────────────────────────── + +/** Status snapshot for a single provider's concurrency state. */ +export interface ProviderConcurrencyStatus { + readonly providerId: string; + /** Configured concurrency limit. Always present (status is only returned for providers with a limit). */ + readonly limit: number; + /** Currently in-flight (held) slots. */ + readonly inFlight: number; + /** Agents waiting in the queue for a slot. */ + readonly queued: number; + /** Whether the queue is paused (429 backoff). */ + readonly paused: boolean; + /** When the pause expires (epoch-ms). Present only when paused. */ + readonly pausedUntil?: number; +} + +/** + * The limiter surface a consumer (session-orchestrator) needs: acquire a + * slot before a provider stream starts, release it when the stream completes, + * and report rate-limit (429) events so the manager can back off. + */ +export interface ConcurrencyLimiter { + /** + * Acquire a concurrency slot for `providerId`. Resolves immediately when a + * slot is available; otherwise blocks (queued by oldest-agent-first) until + * one frees up. The returned function MUST be called when the response + * stream completes (in a `finally` block). For providers with no configured + * limit, resolves instantly with a no-op release. + * + * @param providerId The provider to limit (e.g. "umans", "openai-compat"). + * @param conversationId The agent requesting the slot. + * @param promptStartedAt When the agent's current prompt (turn) started + * (epoch-ms). Used for oldest-agent-first scheduling. + */ + acquire(providerId: string, conversationId: string, promptStartedAt: number): Promise<() => void>; + + /** + * Report a 429 from a provider. Pauses the queue for that provider for + * `retryAfterMs` (or a default duration when omitted). Queued and in-flight + * requests are unaffected; new `acquire` calls block until the pause expires. + */ + reportRateLimit(providerId: string, retryAfterMs?: number): void; +} + +/** + * The full service surface (limiter + config + status) for HTTP routes. + */ +export interface ConcurrencyService extends ConcurrencyLimiter { + /** Set the concurrency limit for a provider. Creates the state if new. */ + setLimit(providerId: string, limit: number): void; + /** Get the configured limit, or `undefined` when none. */ + getLimit(providerId: string): number | undefined; + /** Remove the limit for a provider (makes it unlimited). */ + removeLimit(providerId: string): void; + /** All configured limits as `{ providerId, limit }` entries. */ + getLimits(): readonly { providerId: string; limit: number }[]; + /** Status for one provider, or `undefined` when no limit is configured. */ + getStatus(providerId: string): ProviderConcurrencyStatus | undefined; + /** Status for every provider with a configured limit. */ + getStatusAll(): readonly ProviderConcurrencyStatus[]; + /** Stop the watchdog + clear all timers. */ + destroy(): void; +} + +// ─── Internal state ─────────────────────────────────────────────────────────── + +interface Slot { + readonly conversationId: string; + readonly acquiredAt: number; + /** Idempotent release — safe to call from the holder or the watchdog. */ + readonly releaseFn: () => void; +} + +interface QueuedWaiter { + readonly conversationId: string; + readonly promptStartedAt: number; + readonly resolve: (release: () => void) => void; +} + +interface ProviderState { + limit: number; + inFlight: number; + slots: Map<number, Slot>; + queue: QueuedWaiter[]; + paused: boolean; + pausedUntil: number | undefined; + pauseTimer: ReturnType<typeof setTimeout> | undefined; +} + +export interface ConcurrencyManagerOpts { + /** Monotonic-ish clock (epoch-ms). */ + readonly now: () => number; + /** Max time a slot may be held before the watchdog reclaims it (ms). */ + readonly slotTimeoutMs: number; + /** How often the watchdog sweeps (ms). */ + readonly watchdogIntervalMs: number; + /** Default pause duration when a 429 arrives without Retry-After (ms). */ + readonly defaultPauseMs: number; + /** Injected timers (default: global). Override in tests for deterministic time. */ + readonly setTimeout?: typeof setTimeout; + readonly clearTimeout?: typeof clearTimeout; + readonly setInterval?: typeof setInterval; + readonly clearInterval?: typeof clearInterval; + /** Optional logger for watchdog + pause events. */ + readonly onWatchdogReclaim?: (providerId: string, conversationId: string, heldMs: number) => void; + readonly onPause?: (providerId: string, durationMs: number) => void; +} + +function noopRelease(): void { + // No limit configured → nothing to release. +} + +export function createConcurrencyManager(opts: ConcurrencyManagerOpts): ConcurrencyService { + const now = opts.now; + const slotTimeoutMs = opts.slotTimeoutMs; + const defaultPauseMs = opts.defaultPauseMs; + const setTimeout = opts.setTimeout ?? globalThis.setTimeout.bind(globalThis); + const clearTimeout = opts.clearTimeout ?? globalThis.clearTimeout.bind(globalThis); + const setInterval = opts.setInterval ?? globalThis.setInterval.bind(globalThis); + const clearInterval = opts.clearInterval ?? globalThis.clearInterval.bind(globalThis); + + const states = new Map<string, ProviderState>(); + let slotIdCounter = 0; + + // ── Slot granting ────────────────────────────────────────────────────────── + + function grantSlot(state: ProviderState, providerId: string, conversationId: string): () => void { + const id = slotIdCounter++; + let released = false; + const releaseFn = () => { + if (released) return; + released = true; + state.slots.delete(id); + state.inFlight--; + tryGrantNext(providerId); + }; + state.slots.set(id, { + conversationId, + acquiredAt: now(), + releaseFn, + }); + state.inFlight++; + return releaseFn; + } + + function tryGrantNext(providerId: string): void { + const state = states.get(providerId); + if (state === undefined) return; + if (state.paused) return; + while (state.queue.length > 0 && state.inFlight < state.limit) { + const waiter = state.queue[0]; + if (waiter === undefined) break; + state.queue.shift(); + const releaseFn = grantSlot(state, providerId, waiter.conversationId); + waiter.resolve(releaseFn); + } + } + + // ── Watchdog ────────────────────────────────────────────────────────────────── + + function sweep(): void { + const currentNow = now(); + for (const [providerId, state] of states) { + for (const [, slot] of state.slots) { + const heldMs = currentNow - slot.acquiredAt; + if (heldMs > slotTimeoutMs) { + opts.onWatchdogReclaim?.(providerId, slot.conversationId, heldMs); + slot.releaseFn(); + } + } + } + } + + const watchdogTimer = setInterval(sweep, opts.watchdogIntervalMs); + + // ── Public API ───────────────────────────────────────────────────────────── + + const manager: ConcurrencyService = { + acquire(providerId, conversationId, promptStartedAt) { + const state = states.get(providerId); + if (state === undefined) { + // No limit configured → unlimited. + return Promise.resolve(noopRelease); + } + + if (!state.paused && state.inFlight < state.limit) { + return Promise.resolve(grantSlot(state, providerId, conversationId)); + } + + // Queue (oldest-agent-first by promptStartedAt). + return new Promise<() => void>((resolve) => { + state.queue.push({ conversationId, promptStartedAt, resolve }); + // Keep sorted ascending by promptStartedAt (oldest first). + // Insertion sort would be O(n), but the queue is typically tiny (<20), + // so a simple sort is fine and keeps the code simple. + state.queue.sort((a, b) => a.promptStartedAt - b.promptStartedAt); + }); + }, + + reportRateLimit(providerId, retryAfterMs) { + const state = states.get(providerId); + if (state === undefined) return; + + const pauseDuration = retryAfterMs ?? defaultPauseMs; + state.paused = true; + state.pausedUntil = now() + pauseDuration; + + if (state.pauseTimer !== undefined) { + clearTimeout(state.pauseTimer); + } + opts.onPause?.(providerId, pauseDuration); + state.pauseTimer = setTimeout(() => { + state.paused = false; + state.pausedUntil = undefined; + state.pauseTimer = undefined; + tryGrantNext(providerId); + }, pauseDuration); + }, + + setLimit(providerId, limit) { + let state = states.get(providerId); + if (state === undefined) { + state = { + limit, + inFlight: 0, + slots: new Map(), + queue: [], + paused: false, + pausedUntil: undefined, + pauseTimer: undefined, + }; + states.set(providerId, state); + } else { + state.limit = limit; + } + // A higher limit may let queued requests through. + tryGrantNext(providerId); + }, + + getLimit(providerId) { + return states.get(providerId)?.limit; + }, + + removeLimit(providerId) { + const state = states.get(providerId); + if (state === undefined) return; + + // Clear pause. + state.paused = false; + state.pausedUntil = undefined; + if (state.pauseTimer !== undefined) { + clearTimeout(state.pauseTimer); + state.pauseTimer = undefined; + } + + // Grant all queued requests (they become unlimited now). + while (state.queue.length > 0) { + const waiter = state.queue[0]; + if (waiter === undefined) break; + state.queue.shift(); + const releaseFn = grantSlot(state, providerId, waiter.conversationId); + waiter.resolve(releaseFn); + } + + // Remove the state. In-flight slots' release functions still work — + // they close over `state` and call `tryGrantNext` which finds no state + // and returns early. The watchdog won't sweep removed states. + states.delete(providerId); + }, + + getLimits() { + return [...states.entries()].map(([providerId, s]) => ({ + providerId, + limit: s.limit, + })); + }, + + getStatus(providerId) { + const state = states.get(providerId); + if (state === undefined) return undefined; + return { + providerId, + limit: state.limit, + inFlight: state.inFlight, + queued: state.queue.length, + paused: state.paused, + ...(state.pausedUntil !== undefined ? { pausedUntil: state.pausedUntil } : {}), + }; + }, + + getStatusAll() { + return [...states.keys()] + .map((providerId) => manager.getStatus(providerId)) + .filter((s): s is ProviderConcurrencyStatus => s !== undefined); + }, + + destroy() { + clearInterval(watchdogTimer); + for (const state of states.values()) { + if (state.pauseTimer !== undefined) { + clearTimeout(state.pauseTimer); + } + } + states.clear(); + }, + }; + + return manager; +} diff --git a/packages/provider-concurrency/src/extension.ts b/packages/provider-concurrency/src/extension.ts new file mode 100644 index 0000000..c741173 --- /dev/null +++ b/packages/provider-concurrency/src/extension.ts @@ -0,0 +1,61 @@ +import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; +import { type ConcurrencyService, createConcurrencyManager } from "./concurrency-manager.js"; +import { concurrencyServiceHandle } from "./service.js"; + +export const manifest: Manifest = { + id: "provider-concurrency", + name: "Provider Concurrency Limits", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + activation: "eager", + contributes: { services: ["provider-concurrency/service"] }, +}; + +/** + * Default tuning constants. + * + * - `SLOT_TIMEOUT_MS` (5 min): a slot held longer than this is force-reclaimed + * by the watchdog (deadlock / stuck-agent recovery). Generation streams + * rarely exceed 2–3 minutes; 5 min is a generous safety margin. + * - `WATCHDOG_INTERVAL_MS` (30s): how often the watchdog sweeps for stale slots. + * - `DEFAULT_PAUSE_MS` (30s): default 429 backoff when no Retry-After is given. + * Umans docs note each concurrency 429 deprioritizes the account for ~30 min, + * but a 30s queue pause prevents immediate re-overshoot while still allowing + * recovery. + */ +const SLOT_TIMEOUT_MS = 5 * 60 * 1000; +const WATCHDOG_INTERVAL_MS = 30 * 1000; +const DEFAULT_PAUSE_MS = 30 * 1000; + +export function activate(host: HostAPI): void { + const logger = host.logger; + + const manager: ConcurrencyService = createConcurrencyManager({ + now: () => Date.now(), + slotTimeoutMs: SLOT_TIMEOUT_MS, + watchdogIntervalMs: WATCHDOG_INTERVAL_MS, + defaultPauseMs: DEFAULT_PAUSE_MS, + onWatchdogReclaim: (providerId, conversationId, heldMs) => { + logger.warn("provider-concurrency: watchdog reclaimed stale slot", { + providerId, + conversationId, + heldMs, + }); + }, + onPause: (providerId, durationMs) => { + logger.warn("provider-concurrency: 429 backoff — pausing queue", { + providerId, + durationMs, + }); + }, + }); + + host.provideService(concurrencyServiceHandle, manager); + logger.info("provider-concurrency: registered"); +} + +export const extension: Extension = { + manifest, + activate, +}; diff --git a/packages/provider-concurrency/src/index.ts b/packages/provider-concurrency/src/index.ts new file mode 100644 index 0000000..f35c070 --- /dev/null +++ b/packages/provider-concurrency/src/index.ts @@ -0,0 +1,10 @@ +export { + type ConcurrencyLimiter, + type ConcurrencyManagerOpts, + type ConcurrencyService, + createConcurrencyManager, + type ProviderConcurrencyStatus, +} from "./concurrency-manager.js"; +export { extension, manifest } from "./extension.js"; +export { wrapProviderWithConcurrency } from "./provider-wrapper.js"; +export { concurrencyServiceHandle } from "./service.js"; diff --git a/packages/provider-concurrency/src/provider-wrapper.test.ts b/packages/provider-concurrency/src/provider-wrapper.test.ts new file mode 100644 index 0000000..e024d59 --- /dev/null +++ b/packages/provider-concurrency/src/provider-wrapper.test.ts @@ -0,0 +1,173 @@ +import type { ProviderContract, ProviderEvent } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import type { ConcurrencyLimiter } from "./concurrency-manager.js"; +import { wrapProviderWithConcurrency } from "./provider-wrapper.js"; + +/** Build a fake provider that yields a sequence of events. */ +function fakeProvider(events: ProviderEvent[]): ProviderContract { + return { + id: "test-provider", + stream: async function* (): AsyncIterable<ProviderEvent> { + for (const e of events) { + yield e; + } + }, + }; +} + +/** A fake limiter that records acquire/release calls. */ +function recordingLimiter(): ConcurrencyLimiter & { + acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[]; + releaseCalls: number; + rateLimitReports: string[]; +} { + const acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[] = + []; + const releaseCalls: { count: number } = { count: 0 }; + const rateLimitReports: string[] = []; + + return { + acquireCalls, + get releaseCalls() { + return releaseCalls.count; + }, + rateLimitReports, + acquire(providerId, conversationId, promptStartedAt) { + acquireCalls.push({ providerId, conversationId, promptStartedAt }); + return Promise.resolve(() => { + releaseCalls.count++; + }); + }, + reportRateLimit(providerId) { + rateLimitReports.push(providerId); + }, + }; +} + +describe("wrapProviderWithConcurrency", () => { + it("acquires a slot before streaming and releases after the stream completes", async () => { + const provider = fakeProvider([ + { type: "text-delta", delta: "hello" }, + { type: "finish", reason: "stop" }, + ]); + const limiter = recordingLimiter(); + + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 12345); + + const events: ProviderEvent[] = []; + for await (const e of wrapped.stream([], [])) { + events.push(e); + } + + // Slot acquired before stream, released after. + expect(limiter.acquireCalls).toEqual([ + { providerId: "test-provider", conversationId: "conv1", promptStartedAt: 12345 }, + ]); + expect(limiter.releaseCalls).toBe(1); + expect(events).toEqual([ + { type: "text-delta", delta: "hello" }, + { type: "finish", reason: "stop" }, + ]); + }); + + it("releases the slot even when the stream throws", async () => { + const provider: ProviderContract = { + id: "err-provider", + stream: async function* (): AsyncIterable<ProviderEvent> { + yield { type: "text-delta", delta: "partial" }; + throw new Error("stream exploded"); + }, + }; + const limiter = recordingLimiter(); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + + await expect(async () => { + for await (const _e of wrapped.stream([], [])) { + // consume + } + }).rejects.toThrow("stream exploded"); + + expect(limiter.releaseCalls).toBe(1); + }); + + it("reports 429 errors to the limiter", async () => { + const provider = fakeProvider([ + { type: "error", message: "Too many requests", code: "429", retryable: true }, + ]); + const limiter = recordingLimiter(); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + + const events: ProviderEvent[] = []; + for await (const e of wrapped.stream([], [])) { + events.push(e); + } + + expect(limiter.rateLimitReports).toEqual(["test-provider"]); + // The 429 error event is still yielded to the consumer (kernel handles retry). + expect(events).toHaveLength(1); + expect(events[0]?.type).toBe("error"); + }); + + it("does not report non-429 errors", async () => { + const provider = fakeProvider([ + { type: "error", message: "Internal error", code: "500", retryable: true }, + ]); + const limiter = recordingLimiter(); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + + for await (const _e of wrapped.stream([], [])) { + // consume + } + + expect(limiter.rateLimitReports).toEqual([]); + }); + + it("preserves the provider id and listModels", async () => { + const provider: ProviderContract = { + id: "my-provider", + stream: async function* (): AsyncIterable<ProviderEvent> { + yield { type: "finish", reason: "stop" }; + }, + listModels: async () => [{ id: "model-1" }], + }; + const limiter = recordingLimiter(); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + + expect(wrapped.id).toBe("my-provider"); + expect(wrapped.listModels).toBeDefined(); + const models = await wrapped.listModels?.(); + expect(models).toEqual([{ id: "model-1" }]); + }); + + it("passes through messages, tools, and opts to the inner stream", async () => { + let receivedArgs: + | { + messages: unknown; + tools: unknown; + opts: unknown; + } + | undefined; + + const provider: ProviderContract = { + id: "passthrough", + stream: async function* (messages, tools, opts): AsyncIterable<ProviderEvent> { + receivedArgs = { messages, tools, opts }; + yield { type: "finish", reason: "stop" }; + }, + }; + const limiter = recordingLimiter(); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + + const messages = [{ role: "user" as const, chunks: [{ type: "text" as const, text: "hi" }] }]; + const tools = [{ name: "test_tool", description: "test", parameters: {} }]; + const opts = { model: "gpt-4" }; + + for await (const _e of wrapped.stream(messages, tools, opts)) { + // consume + } + + expect(receivedArgs?.messages).toBe(messages); + expect(receivedArgs?.tools).toBe(tools); + expect(receivedArgs?.opts).toBe(opts); + }); +}); diff --git a/packages/provider-concurrency/src/provider-wrapper.ts b/packages/provider-concurrency/src/provider-wrapper.ts new file mode 100644 index 0000000..ee3ca85 --- /dev/null +++ b/packages/provider-concurrency/src/provider-wrapper.ts @@ -0,0 +1,59 @@ +import type { + ChatMessage, + ProviderContract, + ProviderEvent, + ProviderStreamOptions, + ToolContract, +} from "@dispatch/kernel"; +import type { ConcurrencyLimiter } from "./concurrency-manager.js"; + +/** + * Wrap a provider's `stream` method with concurrency limiting. + * + * A slot is acquired BEFORE the first event is yielded (before the HTTP + * request is sent — the `await limiter.acquire()` runs before the generator + * body starts iterating the inner stream). The slot is released in a `finally` + * block AFTER the inner stream completes (the full response stream, not just + * HTTP headers — matching the Umans concurrency model where a slot is held + * only while tokens are actually generating). + * + * 429 detection: if the provider yields an `error` event with `code: "429"`, + * the limiter is notified so it can pause the queue for that provider. + * + * @param provider The underlying provider to wrap. + * @param limiter The concurrency limiter (acquire/release/reportRateLimit). + * @param conversationId The agent requesting the stream (for slot attribution). + * @param promptStartedAt When the agent's current prompt (turn) started + * (epoch-ms, for oldest-agent-first scheduling). + */ +export function wrapProviderWithConcurrency( + provider: ProviderContract, + limiter: ConcurrencyLimiter, + conversationId: string, + promptStartedAt: number, +): ProviderContract { + const innerStream = provider.stream; + const providerId = provider.id; + + return { + id: provider.id, + stream: async function* ( + messages: readonly ChatMessage[], + tools: readonly ToolContract[], + opts?: ProviderStreamOptions, + ): AsyncIterable<ProviderEvent> { + const release = await limiter.acquire(providerId, conversationId, promptStartedAt); + try { + for await (const event of innerStream(messages, tools, opts)) { + if (event.type === "error" && event.code === "429") { + limiter.reportRateLimit(providerId); + } + yield event; + } + } finally { + release(); + } + }, + ...(provider.listModels !== undefined ? { listModels: provider.listModels } : {}), + }; +} diff --git a/packages/provider-concurrency/src/service.ts b/packages/provider-concurrency/src/service.ts new file mode 100644 index 0000000..aa578e8 --- /dev/null +++ b/packages/provider-concurrency/src/service.ts @@ -0,0 +1,11 @@ +import { defineService } from "@dispatch/kernel"; +import type { ConcurrencyService } from "./concurrency-manager.js"; + +/** + * Typed service handle for the provider-concurrency service. The + * `provider-concurrency` extension provides the implementation; the + * session-orchestrator + transport-http consume it. + */ +export const concurrencyServiceHandle = defineService<ConcurrencyService>( + "provider-concurrency/service", +); diff --git a/packages/provider-concurrency/tsconfig.json b/packages/provider-concurrency/tsconfig.json new file mode 100644 index 0000000..44ed916 --- /dev/null +++ b/packages/provider-concurrency/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [{ "path": "../kernel" }] +} diff --git a/packages/session-orchestrator/package.json b/packages/session-orchestrator/package.json index ba34c4d..b9f3d22 100644 --- a/packages/session-orchestrator/package.json +++ b/packages/session-orchestrator/package.json @@ -10,6 +10,7 @@ "@dispatch/conversation-store": "workspace:*", "@dispatch/credential-store": "workspace:*", "@dispatch/message-queue": "workspace:*", + "@dispatch/provider-concurrency": "workspace:*", "@dispatch/system-prompt": "workspace:*" } } diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index 5afffd8..0cd83ef 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -3,6 +3,7 @@ import { credentialStoreHandle } from "@dispatch/credential-store"; import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; import { runTurn } from "@dispatch/kernel"; import { messageQueueHandle } from "@dispatch/message-queue"; +import { concurrencyServiceHandle } from "@dispatch/provider-concurrency"; import { systemPromptHandle } from "@dispatch/system-prompt"; import { cacheWarmHandle, @@ -93,6 +94,19 @@ export function activate(host: HostAPI): void { return undefined; } }, + resolveConcurrencyLimiter: () => { + // Lazily resolve the concurrency limiter. Returns undefined when the + // provider-concurrency extension isn't loaded (no concurrency limiting — + // feature degrades off). Lazy so activation order with + // provider-concurrency doesn't matter; called per-turn, not at activate. + const loaded = host.getExtensions().some((m) => m.id === "provider-concurrency"); + if (!loaded) return undefined; + try { + return host.getService(concurrencyServiceHandle); + } catch { + return undefined; + } + }, }); host.provideService(sessionOrchestratorHandle, orchestrator); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 96cd3a3..b73647d 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -20,6 +20,8 @@ import type { } from "@dispatch/kernel"; import { defineEventHook, defineService, type ServiceHandle } from "@dispatch/kernel"; import type { MessageQueueService, QueuedMessage } from "@dispatch/message-queue"; +import type { ConcurrencyLimiter } from "@dispatch/provider-concurrency"; +import { wrapProviderWithConcurrency } from "@dispatch/provider-concurrency"; import type { SystemPromptService } from "@dispatch/system-prompt"; import { createMetricsAccumulator } from "./metrics.js"; import { @@ -335,6 +337,14 @@ export interface SessionOrchestratorDeps { * order doesn't matter. */ readonly resolveSystemPrompt?: () => SystemPromptService | undefined; + /** + * Lazily resolves the concurrency limiter, or `undefined` when the + * provider-concurrency extension isn't loaded (no concurrency limiting — + * feature degrades off). When present, each resolved provider is wrapped so + * that a concurrency slot is acquired before the stream starts and released + * when the stream completes. Lazy so activation order doesn't matter. + */ + readonly resolveConcurrencyLimiter?: () => ConcurrencyLimiter | undefined; /** Apply the per-turn tools filter chain. Injected for testability. */ readonly applyToolsFilter: (assembly: ToolAssembly) => Promise<ToolAssembly>; /** Base logger (auto-scoped to this extension); childed per turn for span capture. */ @@ -439,6 +449,7 @@ export function createSessionOrchestrator( systemPromptOverride: string | undefined, ): void { const turnId = generateTurnId(); + const promptStartedAt = deps.now?.() ?? Date.now(); const controller = new AbortController(); activeTurns.set(conversationId, { buffer: [], turnId, controller }); activeConversations.add(conversationId); @@ -594,6 +605,22 @@ export function createSessionOrchestrator( provider = deps.resolveProvider(); } + // Wrap the resolved provider with concurrency limiting when the + // provider-concurrency extension is loaded. The slot is acquired + // before the stream starts (before the HTTP request) and released + // when the stream completes (after all tokens are generated). The + // promptStartedAt (turn start time) is used for oldest-agent-first + // scheduling when multiple agents are queued. + const limiter = deps.resolveConcurrencyLimiter?.(); + if (limiter !== undefined) { + provider = wrapProviderWithConcurrency( + provider, + limiter, + conversationId, + promptStartedAt, + ); + } + const baseTools = deps.resolveTools(); const assembled = await deps.applyToolsFilter({ tools: baseTools, @@ -990,6 +1017,17 @@ export function createWarmService( provider = deps.resolveProvider(); } + // Wrap with concurrency limiting (same as the main turn path). + const warmLimiter = deps.resolveConcurrencyLimiter?.(); + if (warmLimiter !== undefined) { + provider = wrapProviderWithConcurrency( + provider, + warmLimiter, + conversationId, + deps.now?.() ?? Date.now(), + ); + } + const baseTools = deps.resolveTools(); // Resolve cwd the SAME way handleMessage does — pass opts.cwd as the overrideCwd // The tools filter is cwd-sensitive (e.g. skill discovery rewrites the @@ -1136,6 +1174,17 @@ export function createCompactionService( provider = deps.resolveProvider(); } + // Wrap with concurrency limiting (same as the main turn path). + const compactionLimiter = deps.resolveConcurrencyLimiter?.(); + if (compactionLimiter !== undefined) { + provider = wrapProviderWithConcurrency( + provider, + compactionLimiter, + conversationId, + deps.now?.() ?? Date.now(), + ); + } + // Build the summarization request: system prompt + conversation text + instruction const conversationText = formatMessagesForSummary(toSummarize); const summaryRequest: ChatMessage = { diff --git a/packages/session-orchestrator/tsconfig.json b/packages/session-orchestrator/tsconfig.json index bc729fc..f316387 100644 --- a/packages/session-orchestrator/tsconfig.json +++ b/packages/session-orchestrator/tsconfig.json @@ -7,6 +7,7 @@ { "path": "../conversation-store" }, { "path": "../credential-store" }, { "path": "../message-queue" }, + { "path": "../provider-concurrency" }, { "path": "../system-prompt" } ] } diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index 6a9a29f..400d9d5 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -985,3 +985,56 @@ export interface HeartbeatRunsResponse { export interface StopHeartbeatRunResponse { readonly ok: true; } + +// ─── Provider concurrency limits ────────────────────────────────────────────── + +/** + * Response of `GET /concurrency/limits` — all providers with configured + * concurrency limits. Each entry pairs a provider id (e.g. "umans", + * "openai-compat") with its maximum concurrent in-flight requests. Providers + * not listed here have no limit (unlimited). + */ +export interface ConcurrencyLimitsResponse { + readonly limits: readonly { readonly providerId: string; readonly limit: number }[]; +} + +/** + * Body of `PUT /concurrency/limits/:providerId` — set or update the concurrency + * limit for a provider. `limit` must be a positive integer. When a limit is + * set, requests beyond the limit queue (oldest-agent-first) rather than being + * sent immediately. + */ +export interface SetConcurrencyLimitRequest { + readonly limit: number; +} + +/** Response of `GET/PUT /concurrency/limits/:providerId` — the configured limit. */ +export interface ConcurrencyLimitResponse { + readonly providerId: string; + readonly limit: number; +} + +/** + * One provider's live concurrency status. + * + * - `inFlight`: how many slots are currently held (tokens being generated). + * - `queued`: how many agents are waiting for a slot. + * - `paused`: whether the queue is paused due to a 429 backoff. + * - `pausedUntil`: when the pause expires (epoch-ms), present only when paused. + */ +export interface ConcurrencyStatusEntry { + readonly providerId: string; + readonly limit: number; + readonly inFlight: number; + readonly queued: number; + readonly paused: boolean; + readonly pausedUntil?: number; +} + +/** + * Response of `GET /concurrency/status` — live status for every provider with a + * configured limit. Providers without a limit are absent (they are unlimited). + */ +export interface ConcurrencyStatusResponse { + readonly providers: readonly ConcurrencyStatusEntry[]; +} diff --git a/packages/transport-http/package.json b/packages/transport-http/package.json index 3c722e2..71da855 100644 --- a/packages/transport-http/package.json +++ b/packages/transport-http/package.json @@ -12,6 +12,7 @@ "@dispatch/kernel": "workspace:*", "@dispatch/lsp": "workspace:*", "@dispatch/mcp": "workspace:*", + "@dispatch/provider-concurrency": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", "@dispatch/throughput-store": "workspace:*", "@dispatch/transport-contract": "workspace:*", diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 4fb295e..23f8dde 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -8,6 +8,9 @@ import type { ComputerListResponse, ComputerResponse, ComputerStatusResponse, + ConcurrencyLimitResponse, + ConcurrencyLimitsResponse, + ConcurrencyStatusResponse, ConversationComputerResponse, ConversationHistoryResponse, ConversationListResponse, @@ -28,6 +31,7 @@ import type { QueueResponse, ReasoningEffortResponse, SetCompactPercentRequest, + SetConcurrencyLimitRequest, SetConversationComputerRequest, SetSystemPromptTemplateRequest, SetWorkspaceDefaultComputerRequest, @@ -67,6 +71,7 @@ import { import { type CompactionService, type ComputerService, + type ConcurrencyService, type ConversationStore, type CredentialStore, conversationOpened, @@ -110,6 +115,13 @@ export interface CreateServerOptions { readonly computerService?: ComputerService; /** Optional — defaults to a no-op store (recording disabled, empty reports). */ readonly throughputStore?: ThroughputStore; + /** + * Optional — provider concurrency limiter service (provided by the + * `provider-concurrency` extension). When absent (extension not loaded), + * the `/concurrency/*` routes degrade: limits returns empty, status returns + * empty, PUT returns 503. + */ + readonly concurrencyService?: ConcurrencyService; readonly logger?: Logger; readonly generateId?: () => string; /** Injectable clock for sample timestamps (default Date.now). */ @@ -562,6 +574,84 @@ export function createApp(opts: CreateServerOptions): Hono { } }); + // ─── Provider concurrency limits ──────────────────────────────────────────── + + app.get("/concurrency/limits", (c) => { + if (opts.concurrencyService === undefined) { + const body: ConcurrencyLimitsResponse = { limits: [] }; + return c.json(body, 200); + } + const limits = opts.concurrencyService.getLimits(); + const body: ConcurrencyLimitsResponse = { limits }; + return c.json(body, 200); + }); + + app.get("/concurrency/limits/:providerId", (c) => { + const providerId = c.req.param("providerId"); + if (opts.concurrencyService === undefined) { + return c.json({ error: "Concurrency service not available" }, 503); + } + const limit = opts.concurrencyService.getLimit(providerId); + if (limit === undefined) { + return c.json({ error: "No concurrency limit configured for this provider" }, 404); + } + const body: ConcurrencyLimitResponse = { providerId, limit }; + return c.json(body, 200); + }); + + app.put("/concurrency/limits/:providerId", async (c) => { + const providerId = c.req.param("providerId"); + if (opts.concurrencyService === undefined) { + return c.json({ error: "Concurrency service not available" }, 503); + } + + let body: unknown; + try { + body = await c.req.json(); + } catch { + log.warn("concurrency: invalid JSON body"); + return c.json({ error: "Invalid JSON body" }, 400); + } + + const parsed = body as SetConcurrencyLimitRequest; + if ( + parsed === null || + typeof parsed !== "object" || + typeof parsed.limit !== "number" || + !Number.isInteger(parsed.limit) || + parsed.limit <= 0 + ) { + return c.json({ error: "Body must be { limit: <positive integer> }" }, 400); + } + + opts.concurrencyService.setLimit(providerId, parsed.limit); + const responseBody: ConcurrencyLimitResponse = { providerId, limit: parsed.limit }; + return c.json(responseBody, 200); + }); + + app.delete("/concurrency/limits/:providerId", (c) => { + const providerId = c.req.param("providerId"); + if (opts.concurrencyService === undefined) { + return c.json({ error: "Concurrency service not available" }, 503); + } + const existing = opts.concurrencyService.getLimit(providerId); + if (existing === undefined) { + return c.json({ error: "No concurrency limit configured for this provider" }, 404); + } + opts.concurrencyService.removeLimit(providerId); + return c.json({ ok: true, providerId }, 200); + }); + + app.get("/concurrency/status", (c) => { + if (opts.concurrencyService === undefined) { + const body: ConcurrencyStatusResponse = { providers: [] }; + return c.json(body, 200); + } + const statuses = opts.concurrencyService.getStatusAll(); + const body: ConcurrencyStatusResponse = { providers: statuses }; + return c.json(body, 200); + }); + app.post("/conversations/:id/close", (c) => { const conversationId = c.req.param("id"); const { abortedTurn } = opts.orchestrator.closeConversation(conversationId); diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts index 76d58b6..f46fca5 100644 --- a/packages/transport-http/src/extension.ts +++ b/packages/transport-http/src/extension.ts @@ -2,9 +2,11 @@ import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; import { createApp } from "./app.js"; import { type ComputerService, + type ConcurrencyService, cacheWarmHandle, compactionHandle, computerServiceHandle, + concurrencyServiceHandle, conversationStoreHandle, credentialStoreHandle, heartbeatServiceHandle, @@ -39,6 +41,9 @@ export const manifest: Manifest = { "/computers/:alias", "/computers/:alias/status", "/computers/:alias/test", + "/concurrency/limits", + "/concurrency/limits/:providerId", + "/concurrency/status", "/conversations", "/conversations/:id", "/conversations/:id/close", @@ -105,6 +110,15 @@ export function createTransportHttpExtension(): Extension & { } catch { computerService = undefined; } + // Optional: the `provider-concurrency` extension provides the + // concurrency limiter service. NOT in dependsOn (may be absent), so + // resolve defensively — when absent the /concurrency/* routes degrade. + let concurrencyService: ConcurrencyService | undefined; + try { + concurrencyService = host.getService(concurrencyServiceHandle); + } catch { + concurrencyService = undefined; + } const logger = host.logger; const app = createApp({ @@ -119,6 +133,7 @@ export function createTransportHttpExtension(): Extension & { systemPromptService, heartbeatService, ...(computerService !== undefined ? { computerService } : {}), + ...(concurrencyService !== undefined ? { concurrencyService } : {}), logger, emit: host.emit.bind(host), ...(process.env.DISPATCH_WEB_DIR !== undefined diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts index c60edf0..dcb3f80 100644 --- a/packages/transport-http/src/seam.ts +++ b/packages/transport-http/src/seam.ts @@ -12,6 +12,8 @@ export type { LspServerStatus, LspService } from "@dispatch/lsp"; export { lspServiceHandle } from "@dispatch/lsp"; export type { McpServerStatus, McpService } from "@dispatch/mcp"; export { mcpServiceHandle } from "@dispatch/mcp"; +export type { ConcurrencyService } from "@dispatch/provider-concurrency"; +export { concurrencyServiceHandle } from "@dispatch/provider-concurrency"; export type { CompactionService, SessionOrchestrator, diff --git a/packages/transport-http/tsconfig.json b/packages/transport-http/tsconfig.json index 4a76434..7759bce 100644 --- a/packages/transport-http/tsconfig.json +++ b/packages/transport-http/tsconfig.json @@ -8,6 +8,8 @@ { "path": "../heartbeat" }, { "path": "../kernel" }, { "path": "../lsp" }, + { "path": "../mcp" }, + { "path": "../provider-concurrency" }, { "path": "../session-orchestrator" }, { "path": "../system-prompt" }, { "path": "../throughput-store" }, diff --git a/tsconfig.json b/tsconfig.json index e4e833d..d31b44a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -98,6 +98,9 @@ "path": "./packages/heartbeat" }, { + "path": "./packages/provider-concurrency" + }, + { "path": "./packages/system-prompt" }, { |
