summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-28 15:31:49 +0900
committerAdam Malczewski <[email protected]>2026-06-28 15:31:49 +0900
commitb60586285863f8bb82242a9df49c4d47e1235755 (patch)
treedd38669dbd8092987bc50d16dcf523c68d43c460
parentfb4a9217b55dd3ba11670104ac23536416d36940 (diff)
parent076edf7d1dfc4dc818f173f751dcb1e57b5baaeb (diff)
downloaddispatch-b60586285863f8bb82242a9df49c4d47e1235755.tar.gz
dispatch-b60586285863f8bb82242a9df49c4d47e1235755.zip
Merge branch 'feature/workspace-star' into predev
# Conflicts: # packages/provider-concurrency/src/concurrency-manager.ts # packages/provider-concurrency/src/extension.ts
-rw-r--r--bun.lock1
-rw-r--r--notes/review-bugs.md164
-rw-r--r--packages/conversation-store/src/store-workspace.test.ts129
-rw-r--r--packages/conversation-store/src/store.ts58
-rw-r--r--packages/provider-concurrency/package.json1
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.test.ts346
-rw-r--r--packages/provider-concurrency/src/concurrency-manager.ts92
-rw-r--r--packages/provider-concurrency/src/extension.ts34
-rw-r--r--packages/provider-concurrency/src/provider-wrapper.test.ts44
-rw-r--r--packages/provider-concurrency/src/provider-wrapper.ts11
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts5
-rw-r--r--packages/transport-http/src/app.test.ts149
-rw-r--r--packages/transport-http/src/app.ts79
-rw-r--r--packages/transport-http/src/extension.ts1
-rw-r--r--packages/wire/src/index.test.ts7
-rw-r--r--packages/wire/src/index.ts7
16 files changed, 1077 insertions, 51 deletions
diff --git a/bun.lock b/bun.lock
index 493da15..7c450ee 100644
--- a/bun.lock
+++ b/bun.lock
@@ -167,6 +167,7 @@
"name": "@dispatch/provider-concurrency",
"version": "0.0.0",
"dependencies": {
+ "@dispatch/conversation-store": "workspace:*",
"@dispatch/kernel": "workspace:*",
},
},
diff --git a/notes/review-bugs.md b/notes/review-bugs.md
new file mode 100644
index 0000000..85cf625
--- /dev/null
+++ b/notes/review-bugs.md
@@ -0,0 +1,164 @@
+# Code Review: workspace-star (backend)
+
+**Branch:** `feature/workspace-star`
+**Commit reviewed:** `71c635f feat(workspace-star): starred workspace priority for concurrency limiting`
+**Compared to:** `dev` (`414080e`)
+**Reviewer:** `umans/umans-kimi-k2.7`
+**Date:** 2026-06-28
+**Scope:** Review only — no code changes committed.
+
+---
+
+## Executive Summary
+
+The workspace-star feature itself (persisted workspace "star" toggle + priority scheduling in the provider concurrency manager) is conceptually sound and mostly correctly threaded through the monorepo:
+
+- `@dispatch/wire` adds `starred` to `Workspace`.
+- `conversation-store` persists and migrates legacy workspace rows.
+- `provider-concurrency` queues starred-workspace waiters ahead of non-starred ones and re-sorts on star/unstar.
+- `transport-http` exposes `PUT /workspaces/:id/star` and `DELETE /workspaces/:id/star`.
+
+However, this branch also **reverts a large amount of unrelated crash-investigation work** that lives on `dev`, including production memory telemetry, an `uncaughtException`/`unhandledRejection` guard, and the LSP-disable precaution. That reversion is the single biggest issue and looks unintended given the commit title. There is also a failing test in `provider-wrapper.test.ts` and several smaller edge cases in the star-priority logic.
+
+**Do not merge this branch as-is.**
+
+---
+
+## Verification Run
+
+| Command | Result |
+|---|---|
+| `bun run typecheck` | ✅PASS |
+| `bun run check` | ✅PASS (12 pre-existing Biome warnings, 0 errors) |
+| `bun test packages/provider-concurrency/src/concurrency-manager.test.ts packages/conversation-store/src/store-workspace.test.ts packages/wire/src/index.test.ts` | ⚠️1/78 failed (see §1) |
+
+The failing test is covered in detail below.
+
+---
+
+## Detailed Findings
+
+### 1. BLOCKING — Test suite regression in `provider-wrapper.test.ts`
+
+**File:** `packages/provider-concurrency/src/provider-wrapper.test.ts`
+**Test:** `wrapProviderWithConcurrency > releases the slot even when the stream throws`
+
+```
+error:
+Expected promise
+Received: [AsyncFunction]
+ at packages/provider-concurrency/src/provider-wrapper.test.ts:102:16
+(fail) wrapProviderWithConcurrency > releases the slot even when the stream throws
+```
+
+- The test asserts that `await expect(async () => { for await... }).rejects.toThrow("stream exploded")` throws when the wrapped async generator throws mid-stream.
+- The failure message suggests the async function passed to `expect()` is not being recognized/produced correctly.
+- This may be a Vitest/Bun async-generator interop issue, but it makes the test suite red on this branch.
+- **Recommendation:** fix or replace the failing assertion before merge. The underlying `try/finally` release logic in `provider-wrapper.ts` looks correct, so this is likely an assertion-level problem.
+
+### 2. MAJOR — Branch reverts unrelated crash telemetry & production mitigations
+
+This branch removes or reverts important work that is present on `dev`. Because the commit title is `feat(workspace-star): starred workspace priority for concurrency limiting`, these changes appear to be accidental scope creep or a botched rebase.
+
+- **Files deleted:**
+ - `crash-review-report.md`
+ - `notes/crash-investigation-findings.md`
+ - `notes/memory-leak-investigation-handoff.md`
+ - `bin/apply-memory-limits.sh`
+ - `packages/host-bin/src/mem-telemetry.ts`
+ - `packages/host-bin/src/mem-telemetry.test.ts`
+- **Behavioral reversions:**
+ - `packages/host-bin/src/main.ts`: removes `process.on("uncaughtException")` and `process.on("unhandledRejection")` guards; removes periodic memory telemetry; re-enables `import { extension as lspExt } from "@dispatch/lsp"`.
+ - `packages/session-orchestrator/src/orchestrator.ts`: removes `getActiveConversationCount()`, `SessionOrchestratorDeps.sampleMemory`, and per-turn memory telemetry.
+ - `packages/transport-http/src/extension.ts`: adds `"lsp"` to `dependsOn`, making the LSP extension required again.
+- **Risk:** if merged, production loses: the cgroup memory-limit helper script; observability used to diagnose recent crashes; the unhandled-exception guard that prevents a single SSH error from killing the whole process; and the LSP-disable precaution that was added as a stability measure.
+- **Recommendation:** strip these reversions out of `feature/workspace-star`. If removing crash telemetry is intentional, do it in a separate, explicitly titled commit/PR with rationale and operational sign-off.
+
+### 3. MEDIUM — In-memory starred cache leaks IDs for deleted workspaces
+
+**File:** `packages/provider-concurrency/src/concurrency-manager.ts`
+
+- `starredWorkspaces` is a `Set<string>` populated by `notifyWorkspaceStarred(true)`.
+- `setWorkspaceStarred(false)` deletes the ID from the set.
+- However, `deleteWorkspace()` in the conversation store does **not** publish any event/hook to the concurrency manager, so a starred workspace that is deleted remains in the cache until process restart.
+- Impact is small (a string per deleted starred workspace), but it is unbounded.
+- **Edge case:** if a new workspace later reuses the same slug, it could inherit stale star priority.
+- **Recommendation:** add a `notifyWorkspaceStarred(id, false)` call when deleting a workspace, or expose a hook that provider-concurrency can listen to. At minimum, document this minor leak.
+
+### 4. MEDIUM — Stale in-memory priority when concurrency extension is absent
+
+**File:** `packages/transport-http/src/app.ts` (lines 1513, 1535)
+
+```ts
+opts.concurrencyService?.notifyWorkspaceStarred(workspaceId, true);
+```
+
+- `transport-http` does **not** declare `provider-concurrency` in its manifest `dependsOn` (it is loaded opportunistically in `createApp`).
+- If the concurrency extension is absent or disabled, the star toggle is persisted, but the in-memory priority cache is never updated. Already-queued agents will keep their old priority until a process restart seeds the cache.
+- **Recommendation:** either add `provider-concurrency` to transport-http's `dependsOn` so the service is guaranteed, or emit a warning log when `notifyWorkspaceStarred` is skipped.
+
+### 5. LOW — `notifyWorkspaceStarred` resolves waiters synchronously inside the HTTP handler
+
+**File:** `packages/provider-concurrency/src/concurrency-manager.ts`, `notifyWorkspaceStarred`
+
+- The method sorts every provider's queue and calls `tryGrantNext`, which can resolve queued acquisition promises.
+- Those resolves happen while the transport request handler is still on the call stack.
+- In practice this is fine because Promise resolution is queued as a microtask, but it is re-entrant and worth noting if future code adds side effects to resolution.
+- **Recommendation:** no code change required unless maintainers prefer to defer granting to the next tick.
+
+### 6. LOW — Star/unstar endpoints follow existing route conventions but introduce scheduling side effects
+
+**File:** `packages/transport-http/src/app.ts` (lines 1499-1543)
+
+- `PUT /workspaces/:id/star` and `DELETE /workspaces/:id/star` reuse the same slug validation and 500-error handling as other workspace routes.
+- No additional auth/rate-limiting is added, which is consistent with the rest of the workspace API.
+- Because these endpoints now affect runtime scheduling priority, a malicious or buggy client could flip-star workspaces rapidly to disturb queue ordering.
+- **Recommendation:** acceptable if consistent with existing routes; otherwise gate star toggles behind the same authorization as workspace mutation.
+
+### 7. LOW — `compareWaiters` does not tie-break equal `promptStartedAt`
+
+**File:** `packages/provider-concurrency/src/concurrency-manager.ts`
+
+```ts
+function compareWaiters(a: QueuedWaiter, b: QueuedWaiter): number {
+ const aStarred = isWorkspaceStarred(a.workspaceId);
+ const bStarred = isWorkspaceStarred(b.workspaceId);
+ if (aStarred !== bStarred) return aStarred ? -1 : 1;
+ return a.promptStartedAt - b.promptStartedAt;
+}
+```
+
+- If two agents have the same `promptStartedAt` and the same star status, JavaScript's stable sort preserves insertion order, so FIFO is still guaranteed.
+- However, the comparator returns `0` in this case and relies on sort stability, which is true for modern engines but not obvious from the code.
+- **Recommendation:** add an explicit tie-breaker (e.g., a monotonic enqueue sequence number) to make ordering deterministic across all runtimes and future proofs.
+
+### 8. LOW/CONCERN — Default workspace can be starred
+
+- `setWorkspaceStarred` allows starring `"default"`.
+- This is probably intentional (default is just another workspace), but it means all unassigned/legacy conversations inherit star priority.
+- **Recommendation:** confirm this is the intended UX. If not, add a guard mirroring `deleteWorkspace`'s prohibition on `"default"`.
+
+### 9. POSITIVE — Backward compatibility is handled correctly
+
+- `parseWorkspaceRow` treats absent or non-boolean `starred` as `false`, so legacy workspace rows load safely.
+- `isWorkspaceStarred` is optional in `ConcurrencyManagerOpts`; tests verify that omitting it behaves like all workspaces are non-starred.
+- `WorkspaceResponse` inherits `starred` through the `Workspace` / `WorkspaceEntry` types.
+- The seeded activation path (`extension.ts`) guards `listWorkspaces()` with try/catch and logs warnings, degrading cleanly if the store is unreachable.
+
+---
+
+## Merge Checklist
+
+Before merging `feature/workspace-star`:
+
+1. **Fix or disable the failing `provider-wrapper.test.ts` test.** Do not leave the suite red.
+2. **Remove the unrelated crash-telemetry/LSP reversions** from this branch, or move them to a separate explicitly labeled commit/PR.
+3. Consider: clearing the starred cache when a workspace is deleted.
+4. Consider: declaring `provider-concurrency` as a hard dependency of `transport-http` if star scheduling is a first-class feature, or logging a warning when the optional service is absent.
+5. Confirm UX intent: can `"default"` be starred?
+
+---
+
+## Note
+
+This review was generated from `git diff dev..HEAD` on the `feature/workspace-star` worktree. No source code was modified.
diff --git a/packages/conversation-store/src/store-workspace.test.ts b/packages/conversation-store/src/store-workspace.test.ts
index 788a526..59ea990 100644
--- a/packages/conversation-store/src/store-workspace.test.ts
+++ b/packages/conversation-store/src/store-workspace.test.ts
@@ -47,6 +47,7 @@ describe("WorkspaceStore", () => {
title: "my-work",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 1000,
lastActivityAt: 1000,
});
@@ -66,6 +67,7 @@ describe("WorkspaceStore", () => {
title: "my-work",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 1000,
lastActivityAt: 1000,
});
@@ -83,6 +85,7 @@ describe("WorkspaceStore", () => {
title: "Custom",
defaultCwd: "/projects/dispatch",
defaultComputerId: null,
+ starred: false,
createdAt: 3000,
lastActivityAt: 3000,
});
@@ -96,6 +99,7 @@ describe("WorkspaceStore", () => {
title: "default",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 0,
lastActivityAt: 0,
});
@@ -117,6 +121,7 @@ describe("WorkspaceStore", () => {
title: "Renamed",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 1000,
lastActivityAt: 1000,
});
@@ -214,6 +219,7 @@ describe("WorkspaceStore", () => {
title: "default",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 0,
lastActivityAt: 0,
conversationCount: 0,
@@ -421,6 +427,125 @@ describe("WorkspaceStore", () => {
const meta = await store.getConversationMeta("conv1");
expect(meta?.workspaceId).toBe("my-work");
});
+
+ // --- starred (priority for concurrency limiting) ---
+
+ it("ensureWorkspace creates with starred: false", async () => {
+ const store = makeStore();
+ const ws = await store.ensureWorkspace("star-work");
+ expect(ws.starred).toBe(false);
+ });
+
+ it("setWorkspaceStarred true persists and reads back", async () => {
+ const store = makeStore();
+ clock = 1000;
+ await store.ensureWorkspace("star-work");
+ clock = 2000;
+ const ws = await store.setWorkspaceStarred("star-work", true);
+ expect(ws.starred).toBe(true);
+ expect(ws.id).toBe("star-work");
+ const reRead = await store.getWorkspace("star-work");
+ expect(reRead?.starred).toBe(true);
+ });
+
+ it("setWorkspaceStarred false unstars a previously starred workspace", async () => {
+ const store = makeStore();
+ await store.ensureWorkspace("star-work");
+ await store.setWorkspaceStarred("star-work", true);
+ await store.setWorkspaceStarred("star-work", false);
+ const ws = await store.getWorkspace("star-work");
+ expect(ws?.starred).toBe(false);
+ });
+
+ it("setWorkspaceStarred creates the workspace if missing", async () => {
+ const store = makeStore();
+ clock = 5000;
+ const ws = await store.setWorkspaceStarred("brand-new", true);
+ expect(ws).toEqual({
+ id: "brand-new",
+ title: "brand-new",
+ defaultCwd: null,
+ defaultComputerId: null,
+ starred: true,
+ createdAt: 5000,
+ lastActivityAt: 5000,
+ });
+ });
+
+ it("setWorkspaceStarred preserves title/defaultCwd/defaultComputerId", async () => {
+ const store = makeStore();
+ clock = 1000;
+ await store.ensureWorkspace("my-work", {
+ title: "Custom",
+ defaultCwd: "/projects",
+ defaultComputerId: "myserver",
+ });
+ clock = 2000;
+ const ws = await store.setWorkspaceStarred("my-work", true);
+ expect(ws.title).toBe("Custom");
+ expect(ws.defaultCwd).toBe("/projects");
+ expect(ws.defaultComputerId).toBe("myserver");
+ expect(ws.starred).toBe(true);
+ expect(ws.createdAt).toBe(1000);
+ });
+
+ it("listWorkspaces includes starred field", async () => {
+ const store = makeStore();
+ clock = 1000;
+ await store.ensureWorkspace("alpha");
+ await store.setWorkspaceStarred("alpha", true);
+ clock = 2000;
+ await store.ensureWorkspace("beta");
+ const list = await store.listWorkspaces();
+ const alpha = list.find((w) => w.id === "alpha");
+ expect(alpha?.starred).toBe(true);
+ const beta = list.find((w) => w.id === "beta");
+ expect(beta?.starred).toBe(false);
+ });
+
+ it("setWorkspaceTitle preserves starred state", async () => {
+ const store = makeStore();
+ await store.ensureWorkspace("my-work");
+ await store.setWorkspaceStarred("my-work", true);
+ const ws = await store.setWorkspaceTitle("my-work", "Renamed");
+ expect(ws.starred).toBe(true);
+ expect(ws.title).toBe("Renamed");
+ });
+
+ it("setWorkspaceDefaultCwd preserves starred state", async () => {
+ const store = makeStore();
+ await store.ensureWorkspace("my-work");
+ await store.setWorkspaceStarred("my-work", true);
+ const ws = await store.setWorkspaceDefaultCwd("my-work", "/new/path");
+ expect(ws.starred).toBe(true);
+ expect(ws.defaultCwd).toBe("/new/path");
+ });
+
+ it("setWorkspaceDefaultComputerId preserves starred state", async () => {
+ const store = makeStore();
+ await store.ensureWorkspace("my-work");
+ await store.setWorkspaceStarred("my-work", true);
+ const ws = await store.setWorkspaceDefaultComputerId("my-work", "new-host");
+ expect(ws.starred).toBe(true);
+ expect(ws.defaultComputerId).toBe("new-host");
+ });
+
+ it("a legacy WorkspaceRow without starred reads back as false", async () => {
+ const store = makeStore();
+ // Simulate a legacy row persisted before `starred` existed.
+ await storage.set(
+ "workspace:legacy",
+ JSON.stringify({
+ title: "legacy",
+ defaultCwd: "/legacy/cwd",
+ defaultComputerId: null,
+ createdAt: 100,
+ lastActivityAt: 200,
+ }),
+ );
+ const ws = await store.getWorkspace("legacy");
+ expect(ws?.starred).toBe(false);
+ });
});
describe("ComputerStore", () => {
@@ -497,6 +622,7 @@ describe("ComputerStore", () => {
title: "brand-new",
defaultCwd: null,
defaultComputerId: "remote-host",
+ starred: false,
createdAt: 5000,
lastActivityAt: 5000,
});
@@ -520,6 +646,7 @@ describe("ComputerStore", () => {
title: "default",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 0,
lastActivityAt: 0,
});
@@ -628,6 +755,7 @@ describe("ComputerStore", () => {
title: "Remote",
defaultCwd: null,
defaultComputerId: "prod-server",
+ starred: false,
createdAt: 1000,
lastActivityAt: 1000,
});
@@ -652,6 +780,7 @@ describe("ComputerStore", () => {
title: "legacy",
defaultCwd: "/legacy/cwd",
defaultComputerId: null,
+ starred: false,
createdAt: 100,
lastActivityAt: 200,
});
diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts
index 69334e6..ed39d8e 100644
--- a/packages/conversation-store/src/store.ts
+++ b/packages/conversation-store/src/store.ts
@@ -211,6 +211,13 @@ export interface ConversationStore {
defaultComputerId: string | null,
) => Promise<Workspace>;
/**
+ * Star or unstar a workspace. Creates the workspace if missing (like
+ * `setWorkspaceTitle`). Starred workspaces receive PRIORITY in the
+ * concurrency limiter queue — their agents jump ahead of agents from
+ * non-starred workspaces (oldest-agent-first within each group).
+ */
+ readonly setWorkspaceStarred: (id: string, starred: boolean) => Promise<Workspace>;
+ /**
* Delete a workspace: (1) find all conversations with `workspaceId === id`,
* (2) set each to `status = "closed"` and reassign `workspaceId = "default"`,
* (3) delete the workspace entity. Returns `closedCount`. Throws if `id
@@ -358,6 +365,12 @@ interface WorkspaceRow {
* workspace inherit it when they set no `computerId` of their own.
*/
readonly defaultComputerId: string | null;
+ /**
+ * Whether the workspace is starred by the user. Starred workspaces receive
+ * PRIORITY in the concurrency limiter queue. Defaults to `false` on legacy
+ * rows (normalized by `parseWorkspaceRow`).
+ */
+ readonly starred: boolean;
readonly createdAt: number;
readonly lastActivityAt: number;
}
@@ -470,10 +483,13 @@ function parseWorkspaceRow(raw: string): WorkspaceRow | null {
// (mirrors `defaultCwd`). Absent on legacy rows → null (local).
const defaultComputerId =
typeof row.defaultComputerId === "string" ? row.defaultComputerId : null;
+ // `starred` may be absent on legacy rows; treat anything non-boolean as false.
+ const starred = row.starred === true;
return {
title: row.title,
defaultCwd,
defaultComputerId,
+ starred,
createdAt: row.createdAt,
lastActivityAt: row.lastActivityAt,
};
@@ -485,6 +501,7 @@ function toWorkspace(id: string, row: WorkspaceRow): Workspace {
title: row.title,
defaultCwd: row.defaultCwd,
defaultComputerId: row.defaultComputerId,
+ starred: row.starred === true,
createdAt: row.createdAt,
lastActivityAt: row.lastActivityAt,
};
@@ -545,6 +562,7 @@ export function createConversationStore(
title: workspaceId,
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: ts,
lastActivityAt: ts,
}
@@ -552,6 +570,7 @@ export function createConversationStore(
title: existing.title,
defaultCwd: existing.defaultCwd,
defaultComputerId: existing.defaultComputerId,
+ starred: existing.starred,
createdAt: existing.createdAt,
lastActivityAt: ts,
};
@@ -1102,13 +1121,14 @@ export function createConversationStore(
if (row !== null) return toWorkspace(id, row);
// Synthesize the always-present "default" workspace when it was
// never persisted (title "default", defaultCwd null, defaultComputerId
- // null [local], timestamps 0).
+ // null [local], starred false, timestamps 0).
if (id === DEFAULT_WORKSPACE_ID) {
return {
id: DEFAULT_WORKSPACE_ID,
title: DEFAULT_WORKSPACE_ID,
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 0,
lastActivityAt: 0,
};
@@ -1126,6 +1146,7 @@ export function createConversationStore(
title: opts?.title ?? id,
defaultCwd: opts?.defaultCwd ?? null,
defaultComputerId: opts?.defaultComputerId ?? null,
+ starred: false,
createdAt: ts,
lastActivityAt: ts,
};
@@ -1142,6 +1163,7 @@ export function createConversationStore(
title: id,
defaultCwd: null as string | null,
defaultComputerId: null as string | null,
+ starred: false as boolean,
createdAt: ts,
lastActivityAt: ts,
}
@@ -1150,6 +1172,7 @@ export function createConversationStore(
title,
defaultCwd: base.defaultCwd,
defaultComputerId: base.defaultComputerId,
+ starred: base.starred,
createdAt: base.createdAt,
lastActivityAt: base.lastActivityAt,
};
@@ -1166,6 +1189,7 @@ export function createConversationStore(
title: id,
defaultCwd: null as string | null,
defaultComputerId: null as string | null,
+ starred: false as boolean,
createdAt: ts,
lastActivityAt: ts,
}
@@ -1174,6 +1198,7 @@ export function createConversationStore(
title: base.title,
defaultCwd,
defaultComputerId: base.defaultComputerId,
+ starred: base.starred,
createdAt: base.createdAt,
lastActivityAt: base.lastActivityAt,
};
@@ -1190,6 +1215,7 @@ export function createConversationStore(
title: id,
defaultCwd: null as string | null,
defaultComputerId: null as string | null,
+ starred: false as boolean,
createdAt: ts,
lastActivityAt: ts,
}
@@ -1198,6 +1224,7 @@ export function createConversationStore(
title: base.title,
defaultCwd: base.defaultCwd,
defaultComputerId,
+ starred: base.starred,
createdAt: base.createdAt,
lastActivityAt: base.lastActivityAt,
};
@@ -1205,6 +1232,34 @@ export function createConversationStore(
return toWorkspace(id, row);
},
+ async setWorkspaceStarred(id, starred) {
+ const existing = await readWorkspaceRow(id);
+ const ts = now();
+ const base =
+ existing === null
+ ? {
+ title: id,
+ defaultCwd: null as string | null,
+ defaultComputerId: null as string | null,
+ createdAt: ts,
+ lastActivityAt: ts,
+ }
+ : existing;
+ const row: WorkspaceRow = {
+ title: base.title,
+ defaultCwd: base.defaultCwd,
+ defaultComputerId: base.defaultComputerId,
+ starred,
+ createdAt: base.createdAt,
+ lastActivityAt: base.lastActivityAt,
+ };
+ await storage.set(workspaceKey(id), JSON.stringify(row));
+ if (logger !== undefined) {
+ logger.debug("workspace starred set", { workspaceId: id, starred });
+ }
+ return toWorkspace(id, row);
+ },
+
async deleteWorkspace(id) {
if (id === DEFAULT_WORKSPACE_ID) {
throw new Error('The "default" workspace cannot be deleted.');
@@ -1269,6 +1324,7 @@ export function createConversationStore(
title: DEFAULT_WORKSPACE_ID,
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 0,
lastActivityAt: 0,
});
diff --git a/packages/provider-concurrency/package.json b/packages/provider-concurrency/package.json
index 10c522a..b53d599 100644
--- a/packages/provider-concurrency/package.json
+++ b/packages/provider-concurrency/package.json
@@ -6,6 +6,7 @@
"main": "dist/index.js",
"types": "dist/index.d.ts",
"dependencies": {
+ "@dispatch/conversation-store": "workspace:*",
"@dispatch/kernel": "workspace:*"
}
}
diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts
index 185c6c2..357a5d1 100644
--- a/packages/provider-concurrency/src/concurrency-manager.test.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.test.ts
@@ -90,7 +90,7 @@ function createManager(opts?: {
describe("createConcurrencyManager", () => {
it("returns no-op release for providers with no configured limit", async () => {
const { manager } = createManager();
- const release = await manager.acquire("unknown", "conv1", 0);
+ const release = await manager.acquire("unknown", "conv1", "default", 0);
expect(typeof release).toBe("function");
// No state → release is a no-op, no error.
release();
@@ -101,7 +101,7 @@ describe("createConcurrencyManager", () => {
const { manager } = createManager();
manager.setLimit("umans", 4);
- const release1 = await manager.acquire("umans", "conv1", 0);
+ const release1 = await manager.acquire("umans", "conv1", "default", 0);
const status = manager.getStatus("umans");
expect(status).toEqual({
providerId: "umans",
@@ -120,11 +120,11 @@ describe("createConcurrencyManager", () => {
const { manager } = createManager();
manager.setLimit("umans", 1);
- const release1 = await manager.acquire("umans", "conv1", 100);
+ const release1 = await manager.acquire("umans", "conv1", "default", 100);
// Second request should block (at limit).
let resolved = false;
- const promise2 = manager.acquire("umans", "conv2", 200).then((r) => {
+ const promise2 = manager.acquire("umans", "conv2", "default", 200).then((r) => {
resolved = true;
return r;
});
@@ -150,13 +150,13 @@ describe("createConcurrencyManager", () => {
manager.setLimit("umans", 1);
// Hold the single slot.
- const release0 = await manager.acquire("umans", "holder", 0);
+ const release0 = await manager.acquire("umans", "holder", "default", 0);
// Three agents queue with different prompt start times.
// Agent C started latest (t=300), Agent A started earliest (t=100).
const results: string[] = [];
const acquireAndRecord = (conv: string, promptAt: number) =>
- manager.acquire("umans", conv, promptAt).then((r) => {
+ manager.acquire("umans", conv, "default", promptAt).then((r) => {
results.push(conv);
return r;
});
@@ -193,7 +193,7 @@ describe("createConcurrencyManager", () => {
const { manager, timers } = createManager();
manager.setLimit("umans", 1);
- const release1 = await manager.acquire("umans", "conv1", 0);
+ const release1 = await manager.acquire("umans", "conv1", "default", 0);
release1();
// Simulate a 429 → queue pauses.
@@ -204,7 +204,7 @@ describe("createConcurrencyManager", () => {
// A new acquire should block (paused, even though under limit).
let resolved = false;
- const promise = manager.acquire("umans", "conv2", 0).then((r) => {
+ const promise = manager.acquire("umans", "conv2", "default", 0).then((r) => {
resolved = true;
return r;
});
@@ -233,7 +233,7 @@ describe("createConcurrencyManager", () => {
const { manager, timers } = createManager();
manager.setLimit("umans", 1);
- const release = await manager.acquire("umans", "conv1", 0);
+ const release = await manager.acquire("umans", "conv1", "default", 0);
expect(manager.getStatus("umans")?.inFlight).toBe(1);
// Advance past the slot timeout (5000ms) and fire the watchdog.
@@ -253,11 +253,11 @@ describe("createConcurrencyManager", () => {
manager.setLimit("umans", 1);
// Hold the slot.
- await manager.acquire("umans", "holder", 0);
+ await manager.acquire("umans", "holder", "default", 0);
// Queue a waiter.
let resolved = false;
- const promise = manager.acquire("umans", "waiter", 10).then((r) => {
+ const promise = manager.acquire("umans", "waiter", "default", 10).then((r) => {
resolved = true;
return r;
});
@@ -280,11 +280,11 @@ describe("createConcurrencyManager", () => {
const { manager } = createManager();
manager.setLimit("umans", 1);
- const release1 = await manager.acquire("umans", "conv1", 0);
+ const release1 = await manager.acquire("umans", "conv1", "default", 0);
// Queue a waiter.
let resolved = false;
- const promise = manager.acquire("umans", "conv2", 100).then((r) => {
+ const promise = manager.acquire("umans", "conv2", "default", 100).then((r) => {
resolved = true;
return r;
});
@@ -307,11 +307,11 @@ describe("createConcurrencyManager", () => {
const { manager } = createManager();
manager.setLimit("umans", 1);
- const release1 = await manager.acquire("umans", "conv1", 0);
+ const release1 = await manager.acquire("umans", "conv1", "default", 0);
// Queue two waiters.
- const p2 = manager.acquire("umans", "conv2", 100);
- const p3 = manager.acquire("umans", "conv3", 200);
+ const p2 = manager.acquire("umans", "conv2", "default", 100);
+ const p3 = manager.acquire("umans", "conv3", "default", 200);
await Promise.resolve();
await Promise.resolve();
@@ -369,7 +369,7 @@ describe("createConcurrencyManager", () => {
const { manager } = createManager();
manager.setLimit("umans", 2);
- const release = await manager.acquire("umans", "conv1", 0);
+ const release = await manager.acquire("umans", "conv1", "default", 0);
expect(manager.getStatus("umans")?.inFlight).toBe(1);
release();
@@ -385,9 +385,9 @@ describe("createConcurrencyManager", () => {
manager.setLimit("umans", 3);
const releases = await Promise.all([
- manager.acquire("umans", "conv1", 0),
- manager.acquire("umans", "conv2", 0),
- manager.acquire("umans", "conv3", 0),
+ manager.acquire("umans", "conv1", "default", 0),
+ manager.acquire("umans", "conv2", "default", 0),
+ manager.acquire("umans", "conv3", "default", 0),
]);
expect(manager.getStatus("umans")?.inFlight).toBe(3);
@@ -402,12 +402,12 @@ describe("createConcurrencyManager", () => {
const { manager, timers } = createManager({ releaseCooldownMs: 200 });
manager.setLimit("umans", 1);
- const release1 = await manager.acquire("umans", "conv1", 0);
+ const release1 = await manager.acquire("umans", "conv1", "default", 0);
expect(manager.getStatus("umans")?.inFlight).toBe(1);
// Queue a waiter.
let resolved = false;
- const promise2 = manager.acquire("umans", "conv2", 100).then((r) => {
+ const promise2 = manager.acquire("umans", "conv2", "default", 100).then((r) => {
resolved = true;
return r;
});
@@ -436,7 +436,7 @@ describe("createConcurrencyManager", () => {
const { manager, timers } = createManager({ releaseCooldownMs: 200 });
manager.setLimit("umans", 2);
- const release = await manager.acquire("umans", "conv1", 0);
+ const release = await manager.acquire("umans", "conv1", "default", 0);
expect(manager.getStatus("umans")?.inFlight).toBe(1);
release();
@@ -454,7 +454,7 @@ describe("createConcurrencyManager", () => {
const { manager } = createManager({ releaseCooldownMs: 200 });
manager.setLimit("umans", 1);
// Acquire + release to schedule a cooldown timer.
- manager.acquire("umans", "conv1", 0).then((release) => {
+ manager.acquire("umans", "conv1", "default", 0).then((release) => {
release();
// Now there's a pending cooldown timer — destroy should clean it up.
expect(() => manager.destroy()).not.toThrow();
@@ -466,11 +466,11 @@ describe("createConcurrencyManager", () => {
manager.setLimit("umans", 1);
// Hold the single slot.
- const release1 = await manager.acquire("umans", "conv1", 0);
+ const release1 = await manager.acquire("umans", "conv1", "default", 0);
// Second request should trigger onQueued.
let queuedCalled = false;
- const promise = manager.acquire("umans", "conv2", 100, () => {
+ const promise = manager.acquire("umans", "conv2", "default", 100, () => {
queuedCalled = true;
});
await Promise.resolve();
@@ -490,7 +490,7 @@ describe("createConcurrencyManager", () => {
manager.setLimit("umans", 2);
let queuedCalled = false;
- const release = await manager.acquire("umans", "conv1", 0, () => {
+ const release = await manager.acquire("umans", "conv1", "default", 0, () => {
queuedCalled = true;
});
@@ -966,3 +966,295 @@ describe("createConcurrencyManager", () => {
expect(status?.notice).toBeUndefined();
});
});
+
+// ─── Starred-workspace priority tests ───────────────────────────────────────
+
+describe("starred-workspace priority", () => {
+ it("starred-workspace agents are admitted before non-starred (regardless of promptStartedAt)", async () => {
+ // Use a callback backed by a Set so we can star/unstar at runtime.
+ const starred = new Set<string>();
+ const timers = createFakeTimers();
+ const manager = createConcurrencyManager({
+ now: timers.now,
+ slotTimeoutMs: 5000,
+ watchdogIntervalMs: 1000,
+ defaultPauseMs: 30000,
+ isWorkspaceStarred: (wsId: string) => starred.has(wsId),
+ setTimeout: timers.setTimeout,
+ clearTimeout: timers.clearTimeout,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+ manager.setLimit("umans", 1);
+
+ // Hold the single slot.
+ const release0 = await manager.acquire("umans", "holder", "default", 0);
+
+ // Three agents queue:
+ // - convA (workspace "ws-normal", promptAt=100) — non-starred, earliest
+ // - convB (workspace "ws-starred", promptAt=200) — starred, later
+ // - convC (workspace "ws-normal", promptAt=300) — non-starred, latest
+ starred.add("ws-starred");
+
+ const results: string[] = [];
+ const acquireAndRecord = (conv: string, wsId: string, promptAt: number) =>
+ manager.acquire("umans", conv, wsId, promptAt).then((r) => {
+ results.push(conv);
+ return r;
+ });
+
+ const pA = acquireAndRecord("convA", "ws-normal", 100);
+ const pB = acquireAndRecord("convB", "ws-starred", 200);
+ const pC = acquireAndRecord("convC", "ws-normal", 300);
+
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(results).toEqual([]); // none resolved yet.
+
+ // Release the holder. The starred agent (convB, t=200) should get the
+ // slot FIRST, even though convA (t=100) started earlier.
+ release0();
+
+ const rB = await pB;
+ expect(results).toEqual(["convB"]);
+ rB();
+
+ // Now the oldest non-starred (convA, t=100) should be next.
+ const rA = await pA;
+ expect(results).toEqual(["convB", "convA"]);
+ rA();
+
+ // Then convC (t=300).
+ const rC = await pC;
+ expect(results).toEqual(["convB", "convA", "convC"]);
+ rC();
+
+ manager.destroy();
+ });
+
+ it("within the starred group, oldest-agent-first is preserved", async () => {
+ const starred = new Set<string>(["ws-starred"]);
+ const timers = createFakeTimers();
+ const manager = createConcurrencyManager({
+ now: timers.now,
+ slotTimeoutMs: 5000,
+ watchdogIntervalMs: 1000,
+ defaultPauseMs: 30000,
+ isWorkspaceStarred: (wsId: string) => starred.has(wsId),
+ setTimeout: timers.setTimeout,
+ clearTimeout: timers.clearTimeout,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+ manager.setLimit("umans", 1);
+
+ const release0 = await manager.acquire("umans", "holder", "default", 0);
+
+ const results: string[] = [];
+ const acquireAndRecord = (conv: string, wsId: string, promptAt: number) =>
+ manager.acquire("umans", conv, wsId, promptAt).then((r) => {
+ results.push(conv);
+ return r;
+ });
+
+ // Two starred agents: convLate (t=300) queues first, convEarly (t=100) second.
+ const pLate = acquireAndRecord("convLate", "ws-starred", 300);
+ const pEarly = acquireAndRecord("convEarly", "ws-starred", 100);
+
+ await Promise.resolve();
+ await Promise.resolve();
+
+ release0();
+
+ // convEarly (t=100) should win within the starred group (oldest-first).
+ const rEarly = await pEarly;
+ expect(results).toEqual(["convEarly"]);
+ rEarly();
+
+ const rLate = await pLate;
+ expect(results).toEqual(["convEarly", "convLate"]);
+ rLate();
+
+ manager.destroy();
+ });
+
+ it("starring a workspace while agents are queued re-prioritizes them immediately", async () => {
+ const timers = createFakeTimers();
+ const manager = createConcurrencyManager({
+ now: timers.now,
+ slotTimeoutMs: 5000,
+ watchdogIntervalMs: 1000,
+ defaultPauseMs: 30000,
+ setTimeout: timers.setTimeout,
+ clearTimeout: timers.clearTimeout,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+ manager.setLimit("umans", 1);
+
+ const release0 = await manager.acquire("umans", "holder", "default", 0);
+
+ // convA (non-starred, t=100) queues first.
+ let resolvedA = false;
+ const pA = manager.acquire("umans", "convA", "ws-normal", 100).then((r) => {
+ resolvedA = true;
+ return r;
+ });
+ // convB (non-starred, t=200) queues second.
+ let resolvedB = false;
+ const pB = manager.acquire("umans", "convB", "ws-to-star", 200).then((r) => {
+ resolvedB = true;
+ return r;
+ });
+
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(resolvedA).toBe(false);
+ expect(resolvedB).toBe(false);
+
+ // Now star convB's workspace AFTER it's queued. notifyWorkspaceStarred
+ // updates the internal cache + re-sorts + tries to grant.
+ manager.notifyWorkspaceStarred("ws-to-star", true);
+
+ // Release the holder — convB (now starred) should jump ahead of convA.
+ release0();
+
+ const rB = await pB;
+ expect(resolvedB).toBe(true);
+ expect(resolvedA).toBe(false);
+ rB();
+
+ // Now convA gets the next slot.
+ const rA = await pA;
+ expect(resolvedA).toBe(true);
+ rA();
+
+ manager.destroy();
+ });
+
+ it("unstar a workspace demotes its queued agents", async () => {
+ const timers = createFakeTimers();
+ const manager = createConcurrencyManager({
+ now: timers.now,
+ slotTimeoutMs: 5000,
+ watchdogIntervalMs: 1000,
+ defaultPauseMs: 30000,
+ setTimeout: timers.setTimeout,
+ clearTimeout: timers.clearTimeout,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+ manager.setLimit("umans", 1);
+
+ // Initially star "ws-starred" via the internal cache.
+ manager.notifyWorkspaceStarred("ws-starred", true);
+
+ const release0 = await manager.acquire("umans", "holder", "default", 0);
+
+ // convA (starred, t=200) queues first.
+ const pA = manager.acquire("umans", "convA", "ws-starred", 200);
+ // convB (non-starred, t=100) queues second but is older.
+ const pB = manager.acquire("umans", "convB", "ws-normal", 100);
+
+ await Promise.resolve();
+ await Promise.resolve();
+
+ // Unstar convA's workspace — it should now be behind convB (which is older).
+ manager.notifyWorkspaceStarred("ws-starred", false);
+
+ release0();
+
+ // convB (t=100, now non-starred but oldest) should win.
+ const rB = await pB;
+ expect(rB).toBeDefined();
+ rB();
+
+ const rA = await pA;
+ rA();
+
+ manager.destroy();
+ });
+
+ it("notifyWorkspaceStarred re-sorts queues and tries to grant when capacity is free", async () => {
+ const timers = createFakeTimers();
+ const manager = createConcurrencyManager({
+ now: timers.now,
+ slotTimeoutMs: 5000,
+ watchdogIntervalMs: 1000,
+ defaultPauseMs: 30000,
+ setTimeout: timers.setTimeout,
+ clearTimeout: timers.clearTimeout,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+ manager.setLimit("umans", 1);
+
+ // Slot is held. Two agents queued (both non-starred).
+ const release0 = await manager.acquire("umans", "holder", "default", 0);
+ const pA = manager.acquire("umans", "convA", "ws-normal", 100);
+ const pB = manager.acquire("umans", "convB", "ws-to-star", 200);
+
+ await Promise.resolve();
+ await Promise.resolve();
+
+ // Star convB's workspace — notifyWorkspaceStarred re-sorts + tries to
+ // grant. But the slot is still held, so no one is granted yet.
+ manager.notifyWorkspaceStarred("ws-to-star", true);
+
+ // Release the slot — convB (now starred) should get it.
+ release0();
+
+ const rB = await pB;
+ expect(rB).toBeDefined();
+ rB();
+
+ const rA = await pA;
+ rA();
+
+ manager.destroy();
+ });
+
+ it("without isWorkspaceStarred callback, all agents are non-starred (backward compatible)", async () => {
+ const timers = createFakeTimers();
+ const manager = createConcurrencyManager({
+ now: timers.now,
+ slotTimeoutMs: 5000,
+ watchdogIntervalMs: 1000,
+ defaultPauseMs: 30000,
+ setTimeout: timers.setTimeout,
+ clearTimeout: timers.clearTimeout,
+ setInterval: timers.setInterval,
+ clearInterval: timers.clearInterval,
+ });
+ manager.setLimit("umans", 1);
+
+ const release0 = await manager.acquire("umans", "holder", "default", 0);
+
+ const results: string[] = [];
+ const acquireAndRecord = (conv: string, wsId: string, promptAt: number) =>
+ manager.acquire("umans", conv, wsId, promptAt).then((r) => {
+ results.push(conv);
+ return r;
+ });
+
+ // Queue in non-sorted order: B (t=200), A (t=100).
+ const pB = acquireAndRecord("convB", "ws-any", 200);
+ const pA = acquireAndRecord("convA", "ws-any", 100);
+
+ await Promise.resolve();
+ await Promise.resolve();
+
+ release0();
+
+ // Without a callback, oldest-first ordering applies (no starred priority).
+ const rA = await pA;
+ expect(results).toEqual(["convA"]);
+ rA();
+
+ const rB = await pB;
+ expect(results).toEqual(["convA", "convB"]);
+ rB();
+
+ manager.destroy();
+ });
+});
diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts
index 1d05bb0..ea66a49 100644
--- a/packages/provider-concurrency/src/concurrency-manager.ts
+++ b/packages/provider-concurrency/src/concurrency-manager.ts
@@ -77,10 +77,19 @@ export interface ProviderConcurrencyStatus {
export interface ConcurrencyLimiter {
/**
* Acquire a concurrency slot for `providerId`. Resolves immediately when a
- * slot is available; otherwise blocks (queued by oldest-agent-first) until
- * one frees up. The returned function MUST be called when the response
- * stream completes (in a `finally` block). For providers with no configured
- * limit, resolves instantly with a no-op release.
+ * slot is available; otherwise blocks (queued by starred-workspace-first,
+ * then oldest-agent-first) until one frees up. The returned function MUST be
+ * called when the response stream completes (in a `finally` block). For
+ * providers with no configured limit, resolves instantly with a no-op
+ * release.
+ *
+ * **Priority:** agents from **starred workspaces** are always admitted before
+ * agents from non-starred workspaces (regardless of `promptStartedAt`).
+ * Within each group (starred vs non-starred), oldest-agent-first ordering is
+ * preserved. The starred status is looked up via the injected
+ * `isWorkspaceStarred` callback at sort time, so starring a workspace while
+ * agents are queued takes effect on the next sort (new acquire or slot
+ * release).
*
* If `onQueued` is provided and the request cannot be granted immediately
* (at limit or paused), it is called synchronously BEFORE the Promise is
@@ -89,14 +98,18 @@ export interface ConcurrencyLimiter {
*
* @param providerId The provider to limit (e.g. "umans", "openai-compat").
* @param conversationId The agent requesting the slot.
+ * @param workspaceId The workspace the agent belongs to (for starred
+ * priority scheduling). Defaults to `"default"`.
* @param promptStartedAt When the agent's current prompt (turn) started
- * (epoch-ms). Used for oldest-agent-first scheduling.
+ * (epoch-ms). Used for oldest-agent-first scheduling
+ * within each starred group.
* @param onQueued Called synchronously when the request is enqueued
* (not granted immediately). Optional.
*/
acquire(
providerId: string,
conversationId: string,
+ workspaceId: string,
promptStartedAt: number,
onQueued?: () => void,
): Promise<() => void>;
@@ -146,6 +159,13 @@ export interface ConcurrencyService extends ConcurrencyLimiter {
getStatus(providerId: string): ProviderConcurrencyStatus | undefined;
/** Status for every provider with a configured limit. */
getStatusAll(): readonly ProviderConcurrencyStatus[];
+ /**
+ * Notify the limiter that a workspace's starred state changed. Updates the
+ * in-memory starred cache so subsequent queue sorts re-evaluate priority
+ * (a newly-starred workspace's already-queued agents jump ahead). Called by
+ * the transport layer after persisting the starred toggle.
+ */
+ notifyWorkspaceStarred(workspaceId: string, starred: boolean): void;
/** Stop the watchdog + clear all timers. */
destroy(): void;
}
@@ -161,6 +181,7 @@ interface Slot {
interface QueuedWaiter {
readonly conversationId: string;
+ readonly workspaceId: string;
readonly promptStartedAt: number;
readonly resolve: (release: () => void) => void;
}
@@ -232,6 +253,14 @@ export interface ConcurrencyManagerOpts {
* poll never becomes an unhandled rejection.
*/
readonly onUsagePollError?: (providerId: string, err: unknown) => void;
+ /**
+ * Injected callback: returns whether a workspace is starred (for priority
+ * scheduling). When provided, agents from starred workspaces jump ahead of
+ * non-starred agents in the queue. When omitted (or returns `false`), all
+ * agents are treated as non-starred (backward-compatible). This is an I/O
+ * effect injected so the manager stays pure + unit-testable with a fake.
+ */
+ readonly isWorkspaceStarred?: (workspaceId: string) => boolean;
}
/** Min interval between usage-gate fallback repolls (ms). The release trigger is immediate. */
@@ -254,6 +283,16 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
const setInterval = opts.setInterval ?? globalThis.setInterval.bind(globalThis);
const clearInterval = opts.clearInterval ?? globalThis.clearInterval.bind(globalThis);
+ // In-memory cache of starred workspace IDs. Populated by the extension on
+ // activation (from the conversation store) + updated via
+ // `notifyWorkspaceStarred`. The `isWorkspaceStarred` callback reads this
+ // synchronously so the queue sort comparator (sync) can re-evaluate priority
+ // on every sort — a newly-starred workspace's already-queued agents jump
+ // ahead on the next sort (new acquire or slot release).
+ const starredWorkspaces = new Set<string>();
+ const isWorkspaceStarred =
+ opts.isWorkspaceStarred ?? ((wsId: string) => starredWorkspaces.has(wsId));
+
const states = new Map<string, ProviderState>();
const cooldownOverrides = new Map<string, number>();
const cooldownTimers = new Set<ReturnType<typeof setTimeout>>();
@@ -343,6 +382,25 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
* (`inFlight < limit`). Synchronous.
*/
function grantLoop(state: ProviderState, providerId: string): void {
+ * Priority comparator for queued waiters: starred-workspace agents first,
+ * then oldest-agent-first (ascending `promptStartedAt`) within each group.
+ * Called at sort time (both on insert and before granting) so a workspace
+ * starred AFTER an agent queued is re-evaluated on the next sort.
+ */
+ function compareWaiters(a: QueuedWaiter, b: QueuedWaiter): number {
+ const aStarred = isWorkspaceStarred(a.workspaceId);
+ const bStarred = isWorkspaceStarred(b.workspaceId);
+ if (aStarred !== bStarred) return aStarred ? -1 : 1; // starred first
+ return a.promptStartedAt - b.promptStartedAt; // oldest first within group
+ }
+
+ function tryGrantNext(providerId: string): void {
+ const state = states.get(providerId);
+ if (state === undefined) return;
+ if (state.paused) return;
+ // Re-sort before granting: a workspace may have been starred/unstarred
+ // since the waiters were enqueued, so priority may have changed.
+ state.queue.sort(compareWaiters);
while (state.queue.length > 0 && state.inFlight < state.limit) {
const waiter = state.queue[0];
if (waiter === undefined) break;
@@ -533,7 +591,7 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
// ── Public API ─────────────────────────────────────────────────────────────
const manager: ConcurrencyService = {
- acquire(providerId, conversationId, promptStartedAt, onQueued) {
+ acquire(providerId, conversationId, workspaceId, promptStartedAt, onQueued) {
const state = states.get(providerId);
if (state === undefined) {
// No limit configured → unlimited.
@@ -561,7 +619,7 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
// "queued" status signal while we're still synchronous.
onQueued?.();
- // Queue (oldest-agent-first by promptStartedAt).
+ // Queue (starred-workspace-first, then oldest-agent-first).
return new Promise<() => void>((resolve) => {
state.queue.push({ conversationId, promptStartedAt, resolve });
// Keep sorted ascending by promptStartedAt (oldest first).
@@ -572,6 +630,10 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
if (fetchUsage !== undefined) {
armGateRepoll(providerId, state);
}
+ state.queue.push({ conversationId, workspaceId, promptStartedAt, resolve });
+ // Keep sorted by priority (starred first, then oldest-agent-first).
+ // The queue is typically tiny (<20), so a simple sort is fine.
+ state.queue.sort(compareWaiters);
});
},
@@ -751,6 +813,22 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre
.filter((s): s is ProviderConcurrencyStatus => s !== undefined);
},
+ notifyWorkspaceStarred(workspaceId, starred) {
+ if (starred) {
+ starredWorkspaces.add(workspaceId);
+ } else {
+ starredWorkspaces.delete(workspaceId);
+ }
+ // Re-sort all queues so a newly-starred workspace's already-queued
+ // agents jump ahead immediately (no need to wait for the next acquire).
+ for (const [providerId, state] of states) {
+ if (state.queue.length > 0) {
+ state.queue.sort(compareWaiters);
+ tryGrantNext(providerId);
+ }
+ }
+ },
+
destroy() {
clearInterval(watchdogTimer);
for (const timer of cooldownTimers) {
diff --git a/packages/provider-concurrency/src/extension.ts b/packages/provider-concurrency/src/extension.ts
index 378655b..48c019b 100644
--- a/packages/provider-concurrency/src/extension.ts
+++ b/packages/provider-concurrency/src/extension.ts
@@ -1,3 +1,4 @@
+import { conversationStoreHandle } from "@dispatch/conversation-store";
import type { Extension, HostAPI, Logger, Manifest, StorageNamespace } from "@dispatch/kernel";
import type { ConcurrencyManagerOpts, ConcurrencyService } from "./concurrency-manager.js";
import { createConcurrencyManager } from "./concurrency-manager.js";
@@ -11,6 +12,7 @@ export const manifest: Manifest = {
trust: "bundled",
activation: "eager",
capabilities: { db: true },
+ dependsOn: ["conversation-store"],
contributes: { services: ["provider-concurrency/service"] },
};
@@ -109,6 +111,7 @@ function createPersistedService(
getCooldowns: inner.getCooldowns.bind(inner),
getStatus: inner.getStatus.bind(inner),
getStatusAll: inner.getStatusAll.bind(inner),
+ notifyWorkspaceStarred: inner.notifyWorkspaceStarred.bind(inner),
destroy: inner.destroy.bind(inner),
};
}
@@ -215,6 +218,16 @@ export async function activate(host: HostAPI): Promise<void> {
return provider.getUsage();
};
+ // Resolve the conversation store to seed the in-memory starred-workspace
+ // cache. The `isWorkspaceStarred` callback reads this cache synchronously
+ // (the queue sort comparator is sync), so we must populate it before the
+ // manager handles its first acquire. `dependsOn: ["conversation-store"]`
+ // in the manifest guarantees the store is registered before we activate.
+ const conversationStore = host.getService(conversationStoreHandle);
+
+ // The manager owns the in-memory `starredWorkspaces` set internally (the
+ // default `isWorkspaceStarred` callback checks it). We seed it by calling
+ // `notifyWorkspaceStarred` for each starred workspace found in the store.
const managerOpts: ConcurrencyManagerOpts = {
now: () => Date.now(),
slotTimeoutMs: SLOT_TIMEOUT_MS,
@@ -274,6 +287,27 @@ export async function activate(host: HostAPI): Promise<void> {
await loadAutoReduce(storage, inner, logger);
await loadCooldowns(storage, inner, logger);
+ // Seed the in-memory starred cache from the conversation store so the
+ // priority scheduling is correct on a fresh server start (previously-starred
+ // workspaces are respected without requiring the user to re-star them).
+ try {
+ const workspaces = await conversationStore.listWorkspaces();
+ for (const ws of workspaces) {
+ if (ws.starred) {
+ inner.notifyWorkspaceStarred(ws.id, true);
+ }
+ }
+ if (workspaces.some((w) => w.starred)) {
+ logger.info("provider-concurrency: restored starred workspaces", {
+ count: workspaces.filter((w) => w.starred).length,
+ });
+ }
+ } catch (err) {
+ logger.warn("provider-concurrency: failed to load starred workspaces", {
+ err: err instanceof Error ? err.message : String(err),
+ });
+ }
+
const service = createPersistedService(inner, storage, logger);
host.provideService(concurrencyServiceHandle, service);
logger.info("provider-concurrency: registered");
diff --git a/packages/provider-concurrency/src/provider-wrapper.test.ts b/packages/provider-concurrency/src/provider-wrapper.test.ts
index e59ab39..7554e64 100644
--- a/packages/provider-concurrency/src/provider-wrapper.test.ts
+++ b/packages/provider-concurrency/src/provider-wrapper.test.ts
@@ -17,12 +17,21 @@ function fakeProvider(events: ProviderEvent[]): ProviderContract {
/** A fake limiter that records acquire/release calls. */
function recordingLimiter(): ConcurrencyLimiter & {
- acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[];
+ acquireCalls: {
+ providerId: string;
+ conversationId: string;
+ workspaceId: string;
+ promptStartedAt: number;
+ }[];
releaseCalls: number;
rateLimitReports: string[];
} {
- const acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[] =
- [];
+ const acquireCalls: {
+ providerId: string;
+ conversationId: string;
+ workspaceId: string;
+ promptStartedAt: number;
+ }[] = [];
const releaseCalls: { count: number } = { count: 0 };
const rateLimitReports: string[] = [];
@@ -32,8 +41,8 @@ function recordingLimiter(): ConcurrencyLimiter & {
return releaseCalls.count;
},
rateLimitReports,
- acquire(providerId, conversationId, promptStartedAt) {
- acquireCalls.push({ providerId, conversationId, promptStartedAt });
+ acquire(providerId, conversationId, workspaceId, promptStartedAt) {
+ acquireCalls.push({ providerId, conversationId, workspaceId, promptStartedAt });
return Promise.resolve(() => {
releaseCalls.count++;
});
@@ -52,7 +61,7 @@ describe("wrapProviderWithConcurrency", () => {
]);
const limiter = recordingLimiter();
- const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 12345);
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 12345);
const events: ProviderEvent[] = [];
for await (const e of wrapped.stream([], [])) {
@@ -61,7 +70,12 @@ describe("wrapProviderWithConcurrency", () => {
// Slot acquired before stream, released after.
expect(limiter.acquireCalls).toEqual([
- { providerId: "test-provider", conversationId: "conv1", promptStartedAt: 12345 },
+ {
+ providerId: "test-provider",
+ conversationId: "conv1",
+ workspaceId: "default",
+ promptStartedAt: 12345,
+ },
]);
expect(limiter.releaseCalls).toBe(1);
expect(events).toEqual([
@@ -79,7 +93,7 @@ describe("wrapProviderWithConcurrency", () => {
},
};
const limiter = recordingLimiter();
- const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0);
await expect(async () => {
for await (const _e of wrapped.stream([], [])) {
@@ -95,7 +109,7 @@ describe("wrapProviderWithConcurrency", () => {
{ type: "error", message: "Too many requests", code: "429", retryable: true },
]);
const limiter = recordingLimiter();
- const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0);
const events: ProviderEvent[] = [];
for await (const e of wrapped.stream([], [])) {
@@ -113,7 +127,7 @@ describe("wrapProviderWithConcurrency", () => {
{ type: "error", message: "Internal error", code: "500", retryable: true },
]);
const limiter = recordingLimiter();
- const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0);
for await (const _e of wrapped.stream([], [])) {
// consume
@@ -131,7 +145,7 @@ describe("wrapProviderWithConcurrency", () => {
listModels: async () => [{ id: "model-1" }],
};
const limiter = recordingLimiter();
- const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0);
expect(wrapped.id).toBe("my-provider");
expect(wrapped.listModels).toBeDefined();
@@ -144,7 +158,7 @@ describe("wrapProviderWithConcurrency", () => {
let acquiredCalled = false;
const blockingLimiter: ConcurrencyLimiter = {
- acquire(_providerId, _convId, _promptAt, onQueued) {
+ acquire(_providerId, _convId, _wsId, _promptAt, onQueued) {
// Simulate a queued request: call onQueued, then resolve on next tick.
onQueued?.();
return new Promise((resolve) => {
@@ -161,6 +175,7 @@ describe("wrapProviderWithConcurrency", () => {
provider,
blockingLimiter,
"conv1",
+ "default",
0,
() => {
queuedCalled = true;
@@ -183,7 +198,7 @@ describe("wrapProviderWithConcurrency", () => {
let acquiredCalled = false;
const immediateLimiter: ConcurrencyLimiter = {
- acquire(_providerId, _convId, _promptAt, _onQueued) {
+ acquire(_providerId, _convId, _wsId, _promptAt, _onQueued) {
// Grant immediately — do NOT call onQueued.
return Promise.resolve(() => {});
},
@@ -195,6 +210,7 @@ describe("wrapProviderWithConcurrency", () => {
provider,
immediateLimiter,
"conv1",
+ "default",
0,
() => {
queuedCalled = true;
@@ -229,7 +245,7 @@ describe("wrapProviderWithConcurrency", () => {
},
};
const limiter = recordingLimiter();
- const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0);
+ const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0);
const messages = [{ role: "user" as const, chunks: [{ type: "text" as const, text: "hi" }] }];
const tools = [{ name: "test_tool", description: "test", parameters: {} }];
diff --git a/packages/provider-concurrency/src/provider-wrapper.ts b/packages/provider-concurrency/src/provider-wrapper.ts
index aa08e5b..1e3f2c0 100644
--- a/packages/provider-concurrency/src/provider-wrapper.ts
+++ b/packages/provider-concurrency/src/provider-wrapper.ts
@@ -23,6 +23,8 @@ import type { ConcurrencyLimiter } from "./concurrency-manager.js";
* @param provider The underlying provider to wrap.
* @param limiter The concurrency limiter (acquire/release/reportRateLimit).
* @param conversationId The agent requesting the stream (for slot attribution).
+ * @param workspaceId The workspace the agent belongs to (for starred
+ * priority scheduling in the limiter queue).
* @param promptStartedAt When the agent's current prompt (turn) started
* (epoch-ms, for oldest-agent-first scheduling).
* @param onQueued Called synchronously when `acquire()` decides to
@@ -36,6 +38,7 @@ export function wrapProviderWithConcurrency(
provider: ProviderContract,
limiter: ConcurrencyLimiter,
conversationId: string,
+ workspaceId: string,
promptStartedAt: number,
onQueued?: () => void,
onAcquired?: () => void,
@@ -50,7 +53,13 @@ export function wrapProviderWithConcurrency(
tools: readonly ToolContract[],
opts?: ProviderStreamOptions,
): AsyncIterable<ProviderEvent> {
- const release = await limiter.acquire(providerId, conversationId, promptStartedAt, onQueued);
+ const release = await limiter.acquire(
+ providerId,
+ conversationId,
+ workspaceId,
+ promptStartedAt,
+ onQueued,
+ );
onAcquired?.();
try {
for await (const event of innerStream(messages, tools, opts)) {
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 8cc0a97..aaf418a 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -747,6 +747,7 @@ export function createSessionOrchestrator(
provider,
limiter,
conversationId,
+ workspaceId,
promptStartedAt,
() => emitStatus("queued"),
() => emitStatus("active"),
@@ -1219,10 +1220,12 @@ export function createWarmService(
// Wrap with concurrency limiting (same as the main turn path).
const warmLimiter = deps.resolveConcurrencyLimiter?.();
if (warmLimiter !== undefined) {
+ const warmWorkspaceId = await deps.conversationStore.getWorkspaceId(conversationId);
provider = wrapProviderWithConcurrency(
provider,
warmLimiter,
conversationId,
+ warmWorkspaceId,
deps.now?.() ?? Date.now(),
);
}
@@ -1376,10 +1379,12 @@ export function createCompactionService(
// Wrap with concurrency limiting (same as the main turn path).
const compactionLimiter = deps.resolveConcurrencyLimiter?.();
if (compactionLimiter !== undefined) {
+ const compactionWorkspaceId = await deps.conversationStore.getWorkspaceId(conversationId);
provider = wrapProviderWithConcurrency(
provider,
compactionLimiter,
conversationId,
+ compactionWorkspaceId,
deps.now?.() ?? Date.now(),
);
}
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts
index 7ae0354..03f1959 100644
--- a/packages/transport-http/src/app.test.ts
+++ b/packages/transport-http/src/app.test.ts
@@ -110,6 +110,7 @@ function createFakeConversationStore(
title: "default",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 0,
lastActivityAt: 0,
};
@@ -207,6 +208,9 @@ function createFakeConversationStore(
async setWorkspaceDefaultComputerId(id, defaultComputerId) {
return { ...sampleWorkspace, id, defaultComputerId };
},
+ async setWorkspaceStarred(id, starred) {
+ return { ...sampleWorkspace, id, starred };
+ },
async deleteWorkspace() {
return { closedCount: 0 };
},
@@ -3562,6 +3566,7 @@ describe("Workspaces", () => {
title: "proj",
defaultCwd: null,
defaultComputerId: null,
+ starred: false,
createdAt: 1000,
lastActivityAt: 2000,
};
@@ -3756,6 +3761,150 @@ describe("Workspaces", () => {
const res = await app.request("/workspaces/default", { method: "DELETE" });
expect(res.status).toBe(409);
});
+
+ // ─── Star/unstar workspace (concurrency priority) ────────────────────────
+
+ it("PUT /workspaces/:id/star persists + notifies the concurrency service", async () => {
+ let starredCalled: { id: string; starred: boolean } | null = null;
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async setWorkspaceStarred(id, starred) {
+ return { ...sampleWorkspace, id, starred };
+ },
+ };
+ const concurrencyService = {
+ acquire: async () => () => {},
+ reportRateLimit() {},
+ setLimit() {},
+ getLimit: () => undefined,
+ getLimits: () => [],
+ getStatus: () => undefined,
+ getStatusAll: () => [],
+ notifyWorkspaceStarred(id: string, starred: boolean) {
+ starredCalled = { id, starred };
+ },
+ destroy() {},
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ ...(concurrencyService !== undefined ? { concurrencyService } : {}),
+ logger: noopLogger,
+ });
+ const res = await app.request("/workspaces/proj/star", { method: "PUT" });
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as WorkspaceResponse;
+ expect(body.starred).toBe(true);
+ expect(starredCalled).toEqual({ id: "proj", starred: true });
+ });
+
+ it("DELETE /workspaces/:id/star persists + notifies the concurrency service", async () => {
+ let starredCalled: { id: string; starred: boolean } | null = null;
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async setWorkspaceStarred(id, starred) {
+ return { ...sampleWorkspace, id, starred };
+ },
+ };
+ const concurrencyService = {
+ acquire: async () => () => {},
+ reportRateLimit() {},
+ setLimit() {},
+ getLimit: () => undefined,
+ getLimits: () => [],
+ getStatus: () => undefined,
+ getStatusAll: () => [],
+ notifyWorkspaceStarred(id: string, starred: boolean) {
+ starredCalled = { id, starred };
+ },
+ destroy() {},
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ concurrencyService,
+ logger: noopLogger,
+ });
+ const res = await app.request("/workspaces/proj/star", { method: "DELETE" });
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as WorkspaceResponse;
+ expect(body.starred).toBe(false);
+ expect(starredCalled).toEqual({ id: "proj", starred: false });
+ });
+
+ it("PUT /workspaces/:id/star rejects invalid slug", async () => {
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger: noopLogger,
+ });
+ const res = await app.request("/workspaces/Bad Slug!/star", { method: "PUT" });
+ expect(res.status).toBe(400);
+ });
+
+ it("DELETE /workspaces/:id cleans up the in-memory starred cache (bug fix)", async () => {
+ // Bug 1 fix: deleting a workspace must notify the concurrency service to
+ // remove the workspace ID from the starred cache, preventing stale IDs.
+ let starredCalled: { id: string; starred: boolean } | null = null;
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async deleteWorkspace() {
+ return { closedCount: 2 };
+ },
+ };
+ const concurrencyService = {
+ acquire: async () => () => {},
+ reportRateLimit() {},
+ setLimit() {},
+ getLimit: () => undefined,
+ getLimits: () => [],
+ getStatus: () => undefined,
+ getStatusAll: () => [],
+ notifyWorkspaceStarred(id: string, starred: boolean) {
+ starredCalled = { id, starred };
+ },
+ destroy() {},
+ };
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ concurrencyService,
+ logger: noopLogger,
+ });
+ const res = await app.request("/workspaces/proj", { method: "DELETE" });
+ expect(res.status).toBe(200);
+ // The concurrency service must be notified to clear the starred cache.
+ expect(starredCalled).toEqual({ id: "proj", starred: false });
+ });
+
+ it("PUT /workspaces/:id/star logs warning when concurrency service is absent (bug fix)", async () => {
+ // Bug 2 fix: when the concurrency service is not loaded, the star toggle
+ // persists but the priority cache is not updated. A warning log makes this
+ // degraded behavior visible.
+ const logger = createFakeLogger();
+ const store: ConversationStore = {
+ ...createFakeConversationStore(),
+ async setWorkspaceStarred(id, starred) {
+ return { ...sampleWorkspace, id, starred };
+ },
+ };
+ // NOTE: no concurrencyService provided — simulates the extension being absent.
+ const app = createApp({
+ conversationStore: store,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger,
+ });
+ const res = await app.request("/workspaces/proj/star", { method: "PUT" });
+ expect(res.status).toBe(200);
+ // The star persisted, but a warning was logged about the missing service.
+ const warnings = logger.records.filter((r) => r.level === "warn");
+ expect(warnings.some((r) => r.msg.includes("concurrency service is not loaded"))).toBe(true);
+ });
});
it("POST /chat threads workspaceId", async () => {
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index ee7b2de..656be9d 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -1529,6 +1529,10 @@ export function createApp(opts: CreateServerOptions): Hono {
try {
const { closedCount } = await opts.conversationStore.deleteWorkspace(workspaceId);
+ // Clean up the in-memory starred cache so a deleted workspace's ID
+ // doesn't linger (and so a future workspace re-created with the same
+ // slug doesn't inherit the stale starred state).
+ opts.concurrencyService?.notifyWorkspaceStarred(workspaceId, false);
log.info("workspaces: deleted", { workspaceId, closedCount });
const response: DeleteWorkspaceResponse = { workspaceId, closedCount };
return c.json(response, 200);
@@ -1538,6 +1542,81 @@ export function createApp(opts: CreateServerOptions): Hono {
}
});
+ // ─── Star/unstar workspace (concurrency priority) ───────────────────────────
+ // Starred workspaces receive PRIORITY in the concurrency limiter queue —
+ // their agents jump ahead of agents from non-starred workspaces. The
+ // starred state is persisted in the conversation store AND the in-memory
+ // cache in the concurrency service is notified so already-queued agents
+ // are re-prioritized immediately.
+
+ app.put("/workspaces/:id/star", async (c) => {
+ const workspaceId = c.req.param("id");
+ if (!isValidWorkspaceSlug(workspaceId)) {
+ return c.json(
+ {
+ error: "Workspace id must be a valid slug (lowercase alphanumeric + hyphens, 1–40 chars)",
+ },
+ 400,
+ );
+ }
+ try {
+ const workspace = await opts.conversationStore.setWorkspaceStarred(workspaceId, true);
+ // Notify the concurrency service's in-memory cache so queued agents
+ // from this workspace jump ahead immediately. When the concurrency
+ // service is absent (extension not loaded), the starred state is
+ // persisted but the in-memory priority cache is NOT updated — log a
+ // warning so the degraded behavior is visible (queued agents keep
+ // their old priority until restart or the extension is loaded).
+ if (opts.concurrencyService !== undefined) {
+ opts.concurrencyService.notifyWorkspaceStarred(workspaceId, true);
+ } else {
+ log.warn(
+ "workspaces: starred but concurrency service is not loaded — priority cache not updated",
+ {
+ workspaceId,
+ },
+ );
+ }
+ log.info("workspaces: starred", { workspaceId });
+ const response: WorkspaceResponse = workspace;
+ return c.json(response, 200);
+ } catch (err) {
+ log.error("workspaces: star failure", { err, workspaceId });
+ return c.json({ error: "Failed to star workspace" }, 500);
+ }
+ });
+
+ app.delete("/workspaces/:id/star", async (c) => {
+ const workspaceId = c.req.param("id");
+ if (!isValidWorkspaceSlug(workspaceId)) {
+ return c.json(
+ {
+ error: "Workspace id must be a valid slug (lowercase alphanumeric + hyphens, 1–40 chars)",
+ },
+ 400,
+ );
+ }
+ try {
+ const workspace = await opts.conversationStore.setWorkspaceStarred(workspaceId, false);
+ if (opts.concurrencyService !== undefined) {
+ opts.concurrencyService.notifyWorkspaceStarred(workspaceId, false);
+ } else {
+ log.warn(
+ "workspaces: unstarred but concurrency service is not loaded — priority cache not updated",
+ {
+ workspaceId,
+ },
+ );
+ }
+ log.info("workspaces: unstarred", { workspaceId });
+ const response: WorkspaceResponse = workspace;
+ return c.json(response, 200);
+ } catch (err) {
+ log.error("workspaces: unstar failure", { err, workspaceId });
+ return c.json({ error: "Failed to unstar workspace" }, 500);
+ }
+ });
+
// ─── Heartbeat (per-workspace AI loop) ─────────────────────────────────────
// The config + run history for a workspace's heartbeat loop. Delegated to
// the HeartbeatService (provided by the `heartbeat` extension). When
diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts
index 7eb39c9..f424e42 100644
--- a/packages/transport-http/src/extension.ts
+++ b/packages/transport-http/src/extension.ts
@@ -72,6 +72,7 @@ export const manifest: Manifest = {
"/workspaces/:id/heartbeat",
"/workspaces/:id/heartbeat/runs",
"/workspaces/:id/heartbeat/runs/:runId/stop",
+ "/workspaces/:id/star",
"/workspaces/:id/title",
],
},
diff --git a/packages/wire/src/index.test.ts b/packages/wire/src/index.test.ts
index 81d10c1..2886ccb 100644
--- a/packages/wire/src/index.test.ts
+++ b/packages/wire/src/index.test.ts
@@ -42,19 +42,24 @@ describe("@dispatch/wire — Computer / Workspace shapes", () => {
expect(entry.usageCount).toBe(3);
});
- it("a Workspace carries defaultComputerId (null = local)", () => {
+ it("a Workspace carries defaultComputerId (null = local) and starred", () => {
const remote: Workspace = {
id: "default",
title: "Default",
defaultCwd: null,
defaultComputerId: "myserver",
+ starred: false,
createdAt: 0,
lastActivityAt: 0,
};
expect(remote.defaultComputerId).toBe("myserver");
+ expect(remote.starred).toBe(false);
const local: Workspace = { ...remote, defaultComputerId: null };
expect(local.defaultComputerId).toBeNull();
+
+ const starred: Workspace = { ...remote, starred: true };
+ expect(starred.starred).toBe(true);
});
});
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 113f684..aa5b6a8 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -651,6 +651,13 @@ export interface Workspace {
* (per-conv `computerId` → this → `null`/local).
*/
readonly defaultComputerId: string | null;
+ /**
+ * Whether the workspace is starred by the user. Starred workspaces receive
+ * PRIORITY in the concurrency limiter queue — their agents jump ahead of
+ * agents from non-starred workspaces (oldest-agent-first within each group).
+ * Defaults to `false` on creation.
+ */
+ readonly starred: boolean;
/** Epoch-ms when the workspace was first created. */
readonly createdAt: number;
/** Epoch-ms of the most recent conversation activity in this workspace. */