diff options
| author | Adam Malczewski <[email protected]> | 2026-06-27 18:49:50 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-27 18:49:50 +0900 |
| commit | d19f75caca6f6ad49e95b83021cb4cf00a39297f (patch) | |
| tree | bc106b082cc492095d502be9702a65365acbbd2a | |
| parent | 87e85e026e54b1dc25b0648af298ab0a8a715701 (diff) | |
| download | dispatch-d19f75caca6f6ad49e95b83021cb4cf00a39297f.tar.gz dispatch-d19f75caca6f6ad49e95b83021cb4cf00a39297f.zip | |
feat(concurrency): add "queued" ConversationStatus — emit when request blocks on acquire, re-emit "active" when slot granted
| -rw-r--r-- | packages/provider-concurrency/src/concurrency-manager.test.ts | 37 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/concurrency-manager.ts | 21 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/provider-wrapper.test.ts | 73 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/provider-wrapper.ts | 11 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 20 | ||||
| -rw-r--r-- | packages/transport-http/src/logic.ts | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 10 |
7 files changed, 168 insertions, 6 deletions
diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts index 6583a34..62f202f 100644 --- a/packages/provider-concurrency/src/concurrency-manager.test.ts +++ b/packages/provider-concurrency/src/concurrency-manager.test.ts @@ -447,4 +447,41 @@ describe("createConcurrencyManager", () => { expect(() => manager.destroy()).not.toThrow(); }); }); + + it("onQueued is called when the request is enqueued (not granted immediately)", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 1); + + // Hold the single slot. + const release1 = await manager.acquire("umans", "conv1", 0); + + // Second request should trigger onQueued. + let queuedCalled = false; + const promise = manager.acquire("umans", "conv2", 100, () => { + queuedCalled = true; + }); + await Promise.resolve(); + await Promise.resolve(); + + expect(queuedCalled).toBe(true); + expect(manager.getStatus("umans")?.queued).toBe(1); + + // Release the slot — the queued request should be granted. + release1(); + const release2 = await promise; + release2(); + }); + + it("onQueued is NOT called when the slot is granted immediately", async () => { + const { manager } = createManager(); + manager.setLimit("umans", 2); + + let queuedCalled = false; + const release = await manager.acquire("umans", "conv1", 0, () => { + queuedCalled = true; + }); + + expect(queuedCalled).toBe(false); + release(); + }); }); diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts index 49575f7..d4516b2 100644 --- a/packages/provider-concurrency/src/concurrency-manager.ts +++ b/packages/provider-concurrency/src/concurrency-manager.ts @@ -45,12 +45,24 @@ export interface ConcurrencyLimiter { * stream completes (in a `finally` block). For providers with no configured * limit, resolves instantly with a no-op release. * + * If `onQueued` is provided and the request cannot be granted immediately + * (at limit or paused), it is called synchronously BEFORE the Promise is + * created. This lets the caller emit a "queued" status signal. If the slot + * is granted immediately, `onQueued` is NOT called. + * * @param providerId The provider to limit (e.g. "umans", "openai-compat"). * @param conversationId The agent requesting the slot. * @param promptStartedAt When the agent's current prompt (turn) started * (epoch-ms). Used for oldest-agent-first scheduling. + * @param onQueued Called synchronously when the request is enqueued + * (not granted immediately). Optional. */ - acquire(providerId: string, conversationId: string, promptStartedAt: number): Promise<() => void>; + acquire( + providerId: string, + conversationId: string, + promptStartedAt: number, + onQueued?: () => void, + ): Promise<() => void>; /** * Report a 429 from a provider. Pauses the queue for that provider for @@ -222,7 +234,7 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre // ── Public API ───────────────────────────────────────────────────────────── const manager: ConcurrencyService = { - acquire(providerId, conversationId, promptStartedAt) { + acquire(providerId, conversationId, promptStartedAt, onQueued) { const state = states.get(providerId); if (state === undefined) { // No limit configured → unlimited. @@ -233,6 +245,11 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre return Promise.resolve(grantSlot(state, providerId, conversationId)); } + // Cannot grant immediately — the request will be queued. + // Notify the caller BEFORE creating the Promise so they can emit a + // "queued" status signal while we're still synchronous. + onQueued?.(); + // Queue (oldest-agent-first by promptStartedAt). return new Promise<() => void>((resolve) => { state.queue.push({ conversationId, promptStartedAt, resolve }); diff --git a/packages/provider-concurrency/src/provider-wrapper.test.ts b/packages/provider-concurrency/src/provider-wrapper.test.ts index e024d59..e59ab39 100644 --- a/packages/provider-concurrency/src/provider-wrapper.test.ts +++ b/packages/provider-concurrency/src/provider-wrapper.test.ts @@ -139,6 +139,79 @@ describe("wrapProviderWithConcurrency", () => { expect(models).toEqual([{ id: "model-1" }]); }); + it("calls onQueued when the request blocks and onAcquired when the slot is granted", async () => { + let queuedCalled = false; + let acquiredCalled = false; + + const blockingLimiter: ConcurrencyLimiter = { + acquire(_providerId, _convId, _promptAt, onQueued) { + // Simulate a queued request: call onQueued, then resolve on next tick. + onQueued?.(); + return new Promise((resolve) => { + setTimeout(() => { + resolve(() => {}); + }, 0); + }); + }, + reportRateLimit() {}, + }; + + const provider = fakeProvider([{ type: "finish", reason: "stop" }]); + const wrapped = wrapProviderWithConcurrency( + provider, + blockingLimiter, + "conv1", + 0, + () => { + queuedCalled = true; + }, + () => { + acquiredCalled = true; + }, + ); + + for await (const _e of wrapped.stream([], [])) { + // consume + } + + expect(queuedCalled).toBe(true); + expect(acquiredCalled).toBe(true); + }); + + it("does NOT call onQueued when the slot is granted immediately", async () => { + let queuedCalled = false; + let acquiredCalled = false; + + const immediateLimiter: ConcurrencyLimiter = { + acquire(_providerId, _convId, _promptAt, _onQueued) { + // Grant immediately — do NOT call onQueued. + return Promise.resolve(() => {}); + }, + reportRateLimit() {}, + }; + + const provider = fakeProvider([{ type: "finish", reason: "stop" }]); + const wrapped = wrapProviderWithConcurrency( + provider, + immediateLimiter, + "conv1", + 0, + () => { + queuedCalled = true; + }, + () => { + acquiredCalled = true; + }, + ); + + for await (const _e of wrapped.stream([], [])) { + // consume + } + + expect(queuedCalled).toBe(false); + expect(acquiredCalled).toBe(true); + }); + it("passes through messages, tools, and opts to the inner stream", async () => { let receivedArgs: | { diff --git a/packages/provider-concurrency/src/provider-wrapper.ts b/packages/provider-concurrency/src/provider-wrapper.ts index ee3ca85..aa08e5b 100644 --- a/packages/provider-concurrency/src/provider-wrapper.ts +++ b/packages/provider-concurrency/src/provider-wrapper.ts @@ -25,12 +25,20 @@ import type { ConcurrencyLimiter } from "./concurrency-manager.js"; * @param conversationId The agent requesting the stream (for slot attribution). * @param promptStartedAt When the agent's current prompt (turn) started * (epoch-ms, for oldest-agent-first scheduling). + * @param onQueued Called synchronously when `acquire()` decides to + * queue the request (cannot grant immediately). + * Lets the caller emit a "queued" status signal. + * @param onAcquired Called when `acquire()` resolves (slot granted, + * whether immediately or after queueing). Lets the + * caller emit an "active" status signal. */ export function wrapProviderWithConcurrency( provider: ProviderContract, limiter: ConcurrencyLimiter, conversationId: string, promptStartedAt: number, + onQueued?: () => void, + onAcquired?: () => void, ): ProviderContract { const innerStream = provider.stream; const providerId = provider.id; @@ -42,7 +50,8 @@ export function wrapProviderWithConcurrency( tools: readonly ToolContract[], opts?: ProviderStreamOptions, ): AsyncIterable<ProviderEvent> { - const release = await limiter.acquire(providerId, conversationId, promptStartedAt); + const release = await limiter.acquire(providerId, conversationId, promptStartedAt, onQueued); + onAcquired?.(); try { for await (const event of innerStream(messages, tools, opts)) { if (event.type === "error" && event.code === "429") { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index b73647d..617c079 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -611,13 +611,33 @@ export function createSessionOrchestrator( // when the stream completes (after all tokens are generated). The // promptStartedAt (turn start time) is used for oldest-agent-first // scheduling when multiple agents are queued. + // + // Status lifecycle with concurrency: "active" is emitted early (in + // payloadPromise.then, before this code runs). If acquire() blocks, + // onQueued emits "queued" (broadcast-only — persisted status stays + // "active"). When the slot is granted, onAcquired emits "active" + // again, transitioning "queued" → "active" so the FE switches from + // the loading ring back to dots. A request that gets a slot + // immediately never emits "queued" — onAcquired fires right after + // the early "active", which is a harmless no-op re-broadcast. const limiter = deps.resolveConcurrencyLimiter?.(); if (limiter !== undefined) { + const emitStatus = (status: "queued" | "active"): void => { + void deps.conversationStore.getWorkspaceId(conversationId).then((workspaceId) => { + deps.emit?.(conversationStatusChanged, { + conversationId, + status, + workspaceId, + }); + }); + }; provider = wrapProviderWithConcurrency( provider, limiter, conversationId, promptStartedAt, + () => emitStatus("queued"), + () => emitStatus("active"), ); } diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts index 97ad426..d5f2dea 100644 --- a/packages/transport-http/src/logic.ts +++ b/packages/transport-http/src/logic.ts @@ -13,7 +13,7 @@ const VALID_REASONING_EFFORTS: readonly ReasoningEffort[] = [ "max", ]; -const VALID_STATUSES: readonly ConversationStatus[] = ["active", "idle", "closed"]; +const VALID_STATUSES: readonly ConversationStatus[] = ["active", "queued", "idle", "closed"]; /** * Pure: parse a `?status=` query value into a list of valid ConversationStatus diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 16b7023..6d10e0f 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -527,12 +527,18 @@ export interface TurnSteeringEvent { /** * The lifecycle status of a conversation, used for tab persistence across - * devices. `active` = an agent is currently generating; `idle` = exists but not + * devices. `active` = an agent is currently generating; `queued` = the + * request is waiting in the per-provider concurrency queue for a slot (not yet + * generating — the FE shows a loading ring, not dots); `idle` = exists but not * generating; `closed` = user dismissed the tab (hidden from the tab bar, not * deleted). New conversations start as `idle`; transitions to `active` on * turn-start, back to `idle` on turn done/error, and to `closed` on user close. + * When the concurrency extension is loaded and a request blocks on + * `acquire()`, `queued` is broadcast (persisted status stays `active`); when + * the slot is granted, `active` is re-broadcast. A request that gets a slot + * immediately never emits `queued`. */ -export type ConversationStatus = "active" | "idle" | "closed"; +export type ConversationStatus = "active" | "queued" | "idle" | "closed"; /** * Metadata for a conversation, returned by `GET /conversations` (the list |
