summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-27 18:49:50 +0900
committerAdam Malczewski <[email protected]>2026-06-27 18:49:50 +0900
commitd19f75caca6f6ad49e95b83021cb4cf00a39297f (patch)
treebc106b082cc492095d502be9702a65365acbbd2a
parent87e85e026e54b1dc25b0648af298ab0a8a715701 (diff)
downloaddispatch-d19f75caca6f6ad49e95b83021cb4cf00a39297f.tar.gz
dispatch-d19f75caca6f6ad49e95b83021cb4cf00a39297f.zip
feat(concurrency): add "queued" ConversationStatus — emit when request blocks on acquire, re-emit "active" when slot granted
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.test.ts37
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.ts21
-rw-r--r--packages/provider-concurrency/src/provider-wrapper.test.ts73
-rw-r--r--packages/provider-concurrency/src/provider-wrapper.ts11
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts20
-rw-r--r--packages/transport-http/src/logic.ts2
-rw-r--r--packages/wire/src/index.ts10
7 files changed, 168 insertions, 6 deletions
diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts
index 6583a34..62f202f 100644
--- a/packages/provider-concurrency/src/concurrency-manager.test.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.test.ts
@@ -447,4 +447,41 @@ describe("createConcurrencyManager", () => {
expect(() => manager.destroy()).not.toThrow();
});
});
+
+ it("onQueued is called when the request is enqueued (not granted immediately)", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 1);
+
+ // Hold the single slot.
+ const release1 = await manager.acquire("umans", "conv1", 0);
+
+ // Second request should trigger onQueued.
+ let queuedCalled = false;
+ const promise = manager.acquire("umans", "conv2", 100, () => {
+ queuedCalled = true;
+ });
+ await Promise.resolve();
+ await Promise.resolve();
+
+ expect(queuedCalled).toBe(true);
+ expect(manager.getStatus("umans")?.queued).toBe(1);
+
+ // Release the slot — the queued request should be granted.
+ release1();
+ const release2 = await promise;
+ release2();
+ });
+
+ it("onQueued is NOT called when the slot is granted immediately", async () => {
+ const { manager } = createManager();
+ manager.setLimit("umans", 2);
+
+ let queuedCalled = false;
+ const release = await manager.acquire("umans", "conv1", 0, () => {
+ queuedCalled = true;
+ });
+
+ expect(queuedCalled).toBe(false);
+ release();
+ });
});
diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts
index 49575f7..d4516b2 100644
--- a/packages/provider-concurrency/src/concurrency-manager.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.ts
@@ -45,12 +45,24 @@ export interface ConcurrencyLimiter {
* stream completes (in a `finally` block). For providers with no configured
* limit, resolves instantly with a no-op release.
*
+ * If `onQueued` is provided and the request cannot be granted immediately
+ * (at limit or paused), it is called synchronously BEFORE the Promise is
+ * created. This lets the caller emit a "queued" status signal. If the slot
+ * is granted immediately, `onQueued` is NOT called.
+ *
* @param providerId The provider to limit (e.g. "umans", "openai-compat").
* @param conversationId The agent requesting the slot.
* @param promptStartedAt When the agent's current prompt (turn) started
* (epoch-ms). Used for oldest-agent-first scheduling.
+ * @param onQueued Called synchronously when the request is enqueued
+ * (not granted immediately). Optional.
*/
- acquire(providerId: string, conversationId: string, promptStartedAt: number): Promise<() => void>;
+ acquire(
+ providerId: string,
+ conversationId: string,
+ promptStartedAt: number,
+ onQueued?: () => void,
+ ): Promise<() => void>;
/**
* Report a 429 from a provider. Pauses the queue for that provider for
@@ -222,7 +234,7 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
// ── Public API ─────────────────────────────────────────────────────────────
const manager: ConcurrencyService = {
- acquire(providerId, conversationId, promptStartedAt) {
+ acquire(providerId, conversationId, promptStartedAt, onQueued) {
const state = states.get(providerId);
if (state === undefined) {
// No limit configured → unlimited.
@@ -233,6 +245,11 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
return Promise.resolve(grantSlot(state, providerId, conversationId));
}
+ // Cannot grant immediately — the request will be queued.
+ // Notify the caller BEFORE creating the Promise so they can emit a
+ // "queued" status signal while we're still synchronous.
+ onQueued?.();
+
// Queue (oldest-agent-first by promptStartedAt).
return new Promise<() => void>((resolve) => {
state.queue.push({ conversationId, promptStartedAt, resolve });
diff --git a/packages/provider-concurrency/src/provider-wrapper.test.ts b/packages/provider-concurrency/src/provider-wrapper.test.ts
index e024d59..e59ab39 100644
--- a/packages/provider-concurrency/src/provider-wrapper.test.ts
+++ b/packages/provider-concurrency/src/provider-wrapper.test.ts
@@ -139,6 +139,79 @@ describe("wrapProviderWithConcurrency", () => {
expect(models).toEqual([{ id: "model-1" }]);
});
+ it("calls onQueued when the request blocks and onAcquired when the slot is granted", async () => {
+ let queuedCalled = false;
+ let acquiredCalled = false;
+
+ const blockingLimiter: ConcurrencyLimiter = {
+ acquire(_providerId, _convId, _promptAt, onQueued) {
+ // Simulate a queued request: call onQueued, then resolve on next tick.
+ onQueued?.();
+ return new Promise((resolve) => {
+ setTimeout(() => {
+ resolve(() => {});
+ }, 0);
+ });
+ },
+ reportRateLimit() {},
+ };
+
+ const provider = fakeProvider([{ type: "finish", reason: "stop" }]);
+ const wrapped = wrapProviderWithConcurrency(
+ provider,
+ blockingLimiter,
+ "conv1",
+ 0,
+ () => {
+ queuedCalled = true;
+ },
+ () => {
+ acquiredCalled = true;
+ },
+ );
+
+ for await (const _e of wrapped.stream([], [])) {
+ // consume
+ }
+
+ expect(queuedCalled).toBe(true);
+ expect(acquiredCalled).toBe(true);
+ });
+
+ it("does NOT call onQueued when the slot is granted immediately", async () => {
+ let queuedCalled = false;
+ let acquiredCalled = false;
+
+ const immediateLimiter: ConcurrencyLimiter = {
+ acquire(_providerId, _convId, _promptAt, _onQueued) {
+ // Grant immediately — do NOT call onQueued.
+ return Promise.resolve(() => {});
+ },
+ reportRateLimit() {},
+ };
+
+ const provider = fakeProvider([{ type: "finish", reason: "stop" }]);
+ const wrapped = wrapProviderWithConcurrency(
+ provider,
+ immediateLimiter,
+ "conv1",
+ 0,
+ () => {
+ queuedCalled = true;
+ },
+ () => {
+ acquiredCalled = true;
+ },
+ );
+
+ for await (const _e of wrapped.stream([], [])) {
+ // consume
+ }
+
+ expect(queuedCalled).toBe(false);
+ expect(acquiredCalled).toBe(true);
+ });
+
it("passes through messages, tools, and opts to the inner stream", async () => {
let receivedArgs:
| {
diff --git a/packages/provider-concurrency/src/provider-wrapper.ts b/packages/provider-concurrency/src/provider-wrapper.ts
index ee3ca85..aa08e5b 100644
--- a/packages/provider-concurrency/src/provider-wrapper.ts
+++ b/packages/provider-concurrency/src/provider-wrapper.ts
@@ -25,12 +25,20 @@ import type { ConcurrencyLimiter } from "./concurrency-manager.js";
* @param conversationId The agent requesting the stream (for slot attribution).
* @param promptStartedAt When the agent's current prompt (turn) started
* (epoch-ms, for oldest-agent-first scheduling).
+ * @param onQueued Called synchronously when `acquire()` decides to
+ * queue the request (cannot grant immediately).
+ * Lets the caller emit a "queued" status signal.
+ * @param onAcquired Called when `acquire()` resolves (slot granted,
+ * whether immediately or after queueing). Lets the
+ * caller emit an "active" status signal.
*/
export function wrapProviderWithConcurrency(
provider: ProviderContract,
limiter: ConcurrencyLimiter,
conversationId: string,
promptStartedAt: number,
+ onQueued?: () => void,
+ onAcquired?: () => void,
): ProviderContract {
const innerStream = provider.stream;
const providerId = provider.id;
@@ -42,7 +50,8 @@ export function wrapProviderWithConcurrency(
tools: readonly ToolContract[],
opts?: ProviderStreamOptions,
): AsyncIterable<ProviderEvent> {
- const release = await limiter.acquire(providerId, conversationId, promptStartedAt);
+ const release = await limiter.acquire(providerId, conversationId, promptStartedAt, onQueued);
+ onAcquired?.();
try {
for await (const event of innerStream(messages, tools, opts)) {
if (event.type === "error" && event.code === "429") {
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index b73647d..617c079 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -611,13 +611,33 @@ export function createSessionOrchestrator(
// when the stream completes (after all tokens are generated). The
// promptStartedAt (turn start time) is used for oldest-agent-first
// scheduling when multiple agents are queued.
+ //
+ // Status lifecycle with concurrency: "active" is emitted early (in
+ // payloadPromise.then, before this code runs). If acquire() blocks,
+ // onQueued emits "queued" (broadcast-only — persisted status stays
+ // "active"). When the slot is granted, onAcquired emits "active"
+ // again, transitioning "queued" → "active" so the FE switches from
+ // the loading ring back to dots. A request that gets a slot
+ // immediately never emits "queued" — onAcquired fires right after
+ // the early "active", which is a harmless no-op re-broadcast.
const limiter = deps.resolveConcurrencyLimiter?.();
if (limiter !== undefined) {
+ const emitStatus = (status: "queued" | "active"): void => {
+ void deps.conversationStore.getWorkspaceId(conversationId).then((workspaceId) => {
+ deps.emit?.(conversationStatusChanged, {
+ conversationId,
+ status,
+ workspaceId,
+ });
+ });
+ };
provider = wrapProviderWithConcurrency(
provider,
limiter,
conversationId,
promptStartedAt,
+ () => emitStatus("queued"),
+ () => emitStatus("active"),
);
}
diff --git a/packages/transport-http/src/logic.ts b/packages/transport-http/src/logic.ts
index 97ad426..d5f2dea 100644
--- a/packages/transport-http/src/logic.ts
+++ b/packages/transport-http/src/logic.ts
@@ -13,7 +13,7 @@ const VALID_REASONING_EFFORTS: readonly ReasoningEffort[] = [
"max",
];
-const VALID_STATUSES: readonly ConversationStatus[] = ["active", "idle", "closed"];
+const VALID_STATUSES: readonly ConversationStatus[] = ["active", "queued", "idle", "closed"];
/**
* Pure: parse a `?status=` query value into a list of valid ConversationStatus
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 16b7023..6d10e0f 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -527,12 +527,18 @@ export interface TurnSteeringEvent {
/**
* The lifecycle status of a conversation, used for tab persistence across
- * devices. `active` = an agent is currently generating; `idle` = exists but not
+ * devices. `active` = an agent is currently generating; `queued` = the
+ * request is waiting in the per-provider concurrency queue for a slot (not yet
+ * generating — the FE shows a loading ring, not dots); `idle` = exists but not
* generating; `closed` = user dismissed the tab (hidden from the tab bar, not
* deleted). New conversations start as `idle`; transitions to `active` on
* turn-start, back to `idle` on turn done/error, and to `closed` on user close.
+ * When the concurrency extension is loaded and a request blocks on
+ * `acquire()`, `queued` is broadcast (persisted status stays `active`); when
+ * the slot is granted, `active` is re-broadcast. A request that gets a slot
+ * immediately never emits `queued`.
*/
-export type ConversationStatus = "active" | "idle" | "closed";
+export type ConversationStatus = "active" | "queued" | "idle" | "closed";
/**
* Metadata for a conversation, returned by `GET /conversations` (the list