diff options
| author | Adam Malczewski <[email protected]> | 2026-06-28 15:31:49 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-28 15:31:49 +0900 |
| commit | b60586285863f8bb82242a9df49c4d47e1235755 (patch) | |
| tree | dd38669dbd8092987bc50d16dcf523c68d43c460 | |
| parent | fb4a9217b55dd3ba11670104ac23536416d36940 (diff) | |
| parent | 076edf7d1dfc4dc818f173f751dcb1e57b5baaeb (diff) | |
| download | dispatch-b60586285863f8bb82242a9df49c4d47e1235755.tar.gz dispatch-b60586285863f8bb82242a9df49c4d47e1235755.zip | |
Merge branch 'feature/workspace-star' into predev
# Conflicts:
# packages/provider-concurrency/src/concurrency-manager.ts
# packages/provider-concurrency/src/extension.ts
| -rw-r--r-- | bun.lock | 1 | ||||
| -rw-r--r-- | notes/review-bugs.md | 164 | ||||
| -rw-r--r-- | packages/conversation-store/src/store-workspace.test.ts | 129 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.ts | 58 | ||||
| -rw-r--r-- | packages/provider-concurrency/package.json | 1 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/concurrency-manager.test.ts | 346 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/concurrency-manager.ts | 92 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/extension.ts | 34 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/provider-wrapper.test.ts | 44 | ||||
| -rw-r--r-- | packages/provider-concurrency/src/provider-wrapper.ts | 11 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 5 | ||||
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 149 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 79 | ||||
| -rw-r--r-- | packages/transport-http/src/extension.ts | 1 | ||||
| -rw-r--r-- | packages/wire/src/index.test.ts | 7 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 7 |
16 files changed, 1077 insertions, 51 deletions
@@ -167,6 +167,7 @@ "name": "@dispatch/provider-concurrency", "version": "0.0.0", "dependencies": { + "@dispatch/conversation-store": "workspace:*", "@dispatch/kernel": "workspace:*", }, }, diff --git a/notes/review-bugs.md b/notes/review-bugs.md new file mode 100644 index 0000000..85cf625 --- /dev/null +++ b/notes/review-bugs.md @@ -0,0 +1,164 @@ +# Code Review: workspace-star (backend) + +**Branch:** `feature/workspace-star` +**Commit reviewed:** `71c635f feat(workspace-star): starred workspace priority for concurrency limiting` +**Compared to:** `dev` (`414080e`) +**Reviewer:** `umans/umans-kimi-k2.7` +**Date:** 2026-06-28 +**Scope:** Review only — no code changes committed. + +--- + +## Executive Summary + +The workspace-star feature itself (persisted workspace "star" toggle + priority scheduling in the provider concurrency manager) is conceptually sound and mostly correctly threaded through the monorepo: + +- `@dispatch/wire` adds `starred` to `Workspace`. +- `conversation-store` persists and migrates legacy workspace rows. +- `provider-concurrency` queues starred-workspace waiters ahead of non-starred ones and re-sorts on star/unstar. +- `transport-http` exposes `PUT /workspaces/:id/star` and `DELETE /workspaces/:id/star`. + +However, this branch also **reverts a large amount of unrelated crash-investigation work** that lives on `dev`, including production memory telemetry, an `uncaughtException`/`unhandledRejection` guard, and the LSP-disable precaution. That reversion is the single biggest issue and looks unintended given the commit title. There is also a failing test in `provider-wrapper.test.ts` and several smaller edge cases in the star-priority logic. + +**Do not merge this branch as-is.** + +--- + +## Verification Run + +| Command | Result | +|---|---| +| `bun run typecheck` | ✅PASS | +| `bun run check` | ✅PASS (12 pre-existing Biome warnings, 0 errors) | +| `bun test packages/provider-concurrency/src/concurrency-manager.test.ts packages/conversation-store/src/store-workspace.test.ts packages/wire/src/index.test.ts` | ⚠️1/78 failed (see §1) | + +The failing test is covered in detail below. + +--- + +## Detailed Findings + +### 1. BLOCKING — Test suite regression in `provider-wrapper.test.ts` + +**File:** `packages/provider-concurrency/src/provider-wrapper.test.ts` +**Test:** `wrapProviderWithConcurrency > releases the slot even when the stream throws` + +``` +error: +Expected promise +Received: [AsyncFunction] + at packages/provider-concurrency/src/provider-wrapper.test.ts:102:16 +(fail) wrapProviderWithConcurrency > releases the slot even when the stream throws +``` + +- The test asserts that `await expect(async () => { for await... }).rejects.toThrow("stream exploded")` throws when the wrapped async generator throws mid-stream. +- The failure message suggests the async function passed to `expect()` is not being recognized/produced correctly. +- This may be a Vitest/Bun async-generator interop issue, but it makes the test suite red on this branch. +- **Recommendation:** fix or replace the failing assertion before merge. The underlying `try/finally` release logic in `provider-wrapper.ts` looks correct, so this is likely an assertion-level problem. + +### 2. MAJOR — Branch reverts unrelated crash telemetry & production mitigations + +This branch removes or reverts important work that is present on `dev`. Because the commit title is `feat(workspace-star): starred workspace priority for concurrency limiting`, these changes appear to be accidental scope creep or a botched rebase. + +- **Files deleted:** + - `crash-review-report.md` + - `notes/crash-investigation-findings.md` + - `notes/memory-leak-investigation-handoff.md` + - `bin/apply-memory-limits.sh` + - `packages/host-bin/src/mem-telemetry.ts` + - `packages/host-bin/src/mem-telemetry.test.ts` +- **Behavioral reversions:** + - `packages/host-bin/src/main.ts`: removes `process.on("uncaughtException")` and `process.on("unhandledRejection")` guards; removes periodic memory telemetry; re-enables `import { extension as lspExt } from "@dispatch/lsp"`. + - `packages/session-orchestrator/src/orchestrator.ts`: removes `getActiveConversationCount()`, `SessionOrchestratorDeps.sampleMemory`, and per-turn memory telemetry. + - `packages/transport-http/src/extension.ts`: adds `"lsp"` to `dependsOn`, making the LSP extension required again. +- **Risk:** if merged, production loses: the cgroup memory-limit helper script; observability used to diagnose recent crashes; the unhandled-exception guard that prevents a single SSH error from killing the whole process; and the LSP-disable precaution that was added as a stability measure. +- **Recommendation:** strip these reversions out of `feature/workspace-star`. If removing crash telemetry is intentional, do it in a separate, explicitly titled commit/PR with rationale and operational sign-off. + +### 3. MEDIUM — In-memory starred cache leaks IDs for deleted workspaces + +**File:** `packages/provider-concurrency/src/concurrency-manager.ts` + +- `starredWorkspaces` is a `Set<string>` populated by `notifyWorkspaceStarred(true)`. +- `setWorkspaceStarred(false)` deletes the ID from the set. +- However, `deleteWorkspace()` in the conversation store does **not** publish any event/hook to the concurrency manager, so a starred workspace that is deleted remains in the cache until process restart. +- Impact is small (a string per deleted starred workspace), but it is unbounded. +- **Edge case:** if a new workspace later reuses the same slug, it could inherit stale star priority. +- **Recommendation:** add a `notifyWorkspaceStarred(id, false)` call when deleting a workspace, or expose a hook that provider-concurrency can listen to. At minimum, document this minor leak. + +### 4. MEDIUM — Stale in-memory priority when concurrency extension is absent + +**File:** `packages/transport-http/src/app.ts` (lines 1513, 1535) + +```ts +opts.concurrencyService?.notifyWorkspaceStarred(workspaceId, true); +``` + +- `transport-http` does **not** declare `provider-concurrency` in its manifest `dependsOn` (it is loaded opportunistically in `createApp`). +- If the concurrency extension is absent or disabled, the star toggle is persisted, but the in-memory priority cache is never updated. Already-queued agents will keep their old priority until a process restart seeds the cache. +- **Recommendation:** either add `provider-concurrency` to transport-http's `dependsOn` so the service is guaranteed, or emit a warning log when `notifyWorkspaceStarred` is skipped. + +### 5. LOW — `notifyWorkspaceStarred` resolves waiters synchronously inside the HTTP handler + +**File:** `packages/provider-concurrency/src/concurrency-manager.ts`, `notifyWorkspaceStarred` + +- The method sorts every provider's queue and calls `tryGrantNext`, which can resolve queued acquisition promises. +- Those resolves happen while the transport request handler is still on the call stack. +- In practice this is fine because Promise resolution is queued as a microtask, but it is re-entrant and worth noting if future code adds side effects to resolution. +- **Recommendation:** no code change required unless maintainers prefer to defer granting to the next tick. + +### 6. LOW — Star/unstar endpoints follow existing route conventions but introduce scheduling side effects + +**File:** `packages/transport-http/src/app.ts` (lines 1499-1543) + +- `PUT /workspaces/:id/star` and `DELETE /workspaces/:id/star` reuse the same slug validation and 500-error handling as other workspace routes. +- No additional auth/rate-limiting is added, which is consistent with the rest of the workspace API. +- Because these endpoints now affect runtime scheduling priority, a malicious or buggy client could flip-star workspaces rapidly to disturb queue ordering. +- **Recommendation:** acceptable if consistent with existing routes; otherwise gate star toggles behind the same authorization as workspace mutation. + +### 7. LOW — `compareWaiters` does not tie-break equal `promptStartedAt` + +**File:** `packages/provider-concurrency/src/concurrency-manager.ts` + +```ts +function compareWaiters(a: QueuedWaiter, b: QueuedWaiter): number { + const aStarred = isWorkspaceStarred(a.workspaceId); + const bStarred = isWorkspaceStarred(b.workspaceId); + if (aStarred !== bStarred) return aStarred ? -1 : 1; + return a.promptStartedAt - b.promptStartedAt; +} +``` + +- If two agents have the same `promptStartedAt` and the same star status, JavaScript's stable sort preserves insertion order, so FIFO is still guaranteed. +- However, the comparator returns `0` in this case and relies on sort stability, which is true for modern engines but not obvious from the code. +- **Recommendation:** add an explicit tie-breaker (e.g., a monotonic enqueue sequence number) to make ordering deterministic across all runtimes and future proofs. + +### 8. LOW/CONCERN — Default workspace can be starred + +- `setWorkspaceStarred` allows starring `"default"`. +- This is probably intentional (default is just another workspace), but it means all unassigned/legacy conversations inherit star priority. +- **Recommendation:** confirm this is the intended UX. If not, add a guard mirroring `deleteWorkspace`'s prohibition on `"default"`. + +### 9. POSITIVE — Backward compatibility is handled correctly + +- `parseWorkspaceRow` treats absent or non-boolean `starred` as `false`, so legacy workspace rows load safely. +- `isWorkspaceStarred` is optional in `ConcurrencyManagerOpts`; tests verify that omitting it behaves like all workspaces are non-starred. +- `WorkspaceResponse` inherits `starred` through the `Workspace` / `WorkspaceEntry` types. +- The seeded activation path (`extension.ts`) guards `listWorkspaces()` with try/catch and logs warnings, degrading cleanly if the store is unreachable. + +--- + +## Merge Checklist + +Before merging `feature/workspace-star`: + +1. **Fix or disable the failing `provider-wrapper.test.ts` test.** Do not leave the suite red. +2. **Remove the unrelated crash-telemetry/LSP reversions** from this branch, or move them to a separate explicitly labeled commit/PR. +3. Consider: clearing the starred cache when a workspace is deleted. +4. Consider: declaring `provider-concurrency` as a hard dependency of `transport-http` if star scheduling is a first-class feature, or logging a warning when the optional service is absent. +5. Confirm UX intent: can `"default"` be starred? + +--- + +## Note + +This review was generated from `git diff dev..HEAD` on the `feature/workspace-star` worktree. No source code was modified. diff --git a/packages/conversation-store/src/store-workspace.test.ts b/packages/conversation-store/src/store-workspace.test.ts index 788a526..59ea990 100644 --- a/packages/conversation-store/src/store-workspace.test.ts +++ b/packages/conversation-store/src/store-workspace.test.ts @@ -47,6 +47,7 @@ describe("WorkspaceStore", () => { title: "my-work", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 1000, lastActivityAt: 1000, }); @@ -66,6 +67,7 @@ describe("WorkspaceStore", () => { title: "my-work", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 1000, lastActivityAt: 1000, }); @@ -83,6 +85,7 @@ describe("WorkspaceStore", () => { title: "Custom", defaultCwd: "/projects/dispatch", defaultComputerId: null, + starred: false, createdAt: 3000, lastActivityAt: 3000, }); @@ -96,6 +99,7 @@ describe("WorkspaceStore", () => { title: "default", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 0, lastActivityAt: 0, }); @@ -117,6 +121,7 @@ describe("WorkspaceStore", () => { title: "Renamed", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 1000, lastActivityAt: 1000, }); @@ -214,6 +219,7 @@ describe("WorkspaceStore", () => { title: "default", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 0, lastActivityAt: 0, conversationCount: 0, @@ -421,6 +427,125 @@ describe("WorkspaceStore", () => { const meta = await store.getConversationMeta("conv1"); expect(meta?.workspaceId).toBe("my-work"); }); + + // --- starred (priority for concurrency limiting) --- + + it("ensureWorkspace creates with starred: false", async () => { + const store = makeStore(); + const ws = await store.ensureWorkspace("star-work"); + expect(ws.starred).toBe(false); + }); + + it("setWorkspaceStarred true persists and reads back", async () => { + const store = makeStore(); + clock = 1000; + await store.ensureWorkspace("star-work"); + clock = 2000; + const ws = await store.setWorkspaceStarred("star-work", true); + expect(ws.starred).toBe(true); + expect(ws.id).toBe("star-work"); + const reRead = await store.getWorkspace("star-work"); + expect(reRead?.starred).toBe(true); + }); + + it("setWorkspaceStarred false unstars a previously starred workspace", async () => { + const store = makeStore(); + await store.ensureWorkspace("star-work"); + await store.setWorkspaceStarred("star-work", true); + await store.setWorkspaceStarred("star-work", false); + const ws = await store.getWorkspace("star-work"); + expect(ws?.starred).toBe(false); + }); + + it("setWorkspaceStarred creates the workspace if missing", async () => { + const store = makeStore(); + clock = 5000; + const ws = await store.setWorkspaceStarred("brand-new", true); + expect(ws).toEqual({ + id: "brand-new", + title: "brand-new", + defaultCwd: null, + defaultComputerId: null, + starred: true, + createdAt: 5000, + lastActivityAt: 5000, + }); + }); + + it("setWorkspaceStarred preserves title/defaultCwd/defaultComputerId", async () => { + const store = makeStore(); + clock = 1000; + await store.ensureWorkspace("my-work", { + title: "Custom", + defaultCwd: "/projects", + defaultComputerId: "myserver", + }); + clock = 2000; + const ws = await store.setWorkspaceStarred("my-work", true); + expect(ws.title).toBe("Custom"); + expect(ws.defaultCwd).toBe("/projects"); + expect(ws.defaultComputerId).toBe("myserver"); + expect(ws.starred).toBe(true); + expect(ws.createdAt).toBe(1000); + }); + + it("listWorkspaces includes starred field", async () => { + const store = makeStore(); + clock = 1000; + await store.ensureWorkspace("alpha"); + await store.setWorkspaceStarred("alpha", true); + clock = 2000; + await store.ensureWorkspace("beta"); + const list = await store.listWorkspaces(); + const alpha = list.find((w) => w.id === "alpha"); + expect(alpha?.starred).toBe(true); + const beta = list.find((w) => w.id === "beta"); + expect(beta?.starred).toBe(false); + }); + + it("setWorkspaceTitle preserves starred state", async () => { + const store = makeStore(); + await store.ensureWorkspace("my-work"); + await store.setWorkspaceStarred("my-work", true); + const ws = await store.setWorkspaceTitle("my-work", "Renamed"); + expect(ws.starred).toBe(true); + expect(ws.title).toBe("Renamed"); + }); + + it("setWorkspaceDefaultCwd preserves starred state", async () => { + const store = makeStore(); + await store.ensureWorkspace("my-work"); + await store.setWorkspaceStarred("my-work", true); + const ws = await store.setWorkspaceDefaultCwd("my-work", "/new/path"); + expect(ws.starred).toBe(true); + expect(ws.defaultCwd).toBe("/new/path"); + }); + + it("setWorkspaceDefaultComputerId preserves starred state", async () => { + const store = makeStore(); + await store.ensureWorkspace("my-work"); + await store.setWorkspaceStarred("my-work", true); + const ws = await store.setWorkspaceDefaultComputerId("my-work", "new-host"); + expect(ws.starred).toBe(true); + expect(ws.defaultComputerId).toBe("new-host"); + }); + + it("a legacy WorkspaceRow without starred reads back as false", async () => { + const store = makeStore(); + // Simulate a legacy row persisted before `starred` existed. + await storage.set( + "workspace:legacy", + JSON.stringify({ + title: "legacy", + defaultCwd: "/legacy/cwd", + defaultComputerId: null, + createdAt: 100, + lastActivityAt: 200, + }), + ); + const ws = await store.getWorkspace("legacy"); + expect(ws?.starred).toBe(false); + }); }); describe("ComputerStore", () => { @@ -497,6 +622,7 @@ describe("ComputerStore", () => { title: "brand-new", defaultCwd: null, defaultComputerId: "remote-host", + starred: false, createdAt: 5000, lastActivityAt: 5000, }); @@ -520,6 +646,7 @@ describe("ComputerStore", () => { title: "default", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 0, lastActivityAt: 0, }); @@ -628,6 +755,7 @@ describe("ComputerStore", () => { title: "Remote", defaultCwd: null, defaultComputerId: "prod-server", + starred: false, createdAt: 1000, lastActivityAt: 1000, }); @@ -652,6 +780,7 @@ describe("ComputerStore", () => { title: "legacy", defaultCwd: "/legacy/cwd", defaultComputerId: null, + starred: false, createdAt: 100, lastActivityAt: 200, }); diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts index 69334e6..ed39d8e 100644 --- a/packages/conversation-store/src/store.ts +++ b/packages/conversation-store/src/store.ts @@ -211,6 +211,13 @@ export interface ConversationStore { defaultComputerId: string | null, ) => Promise<Workspace>; /** + * Star or unstar a workspace. Creates the workspace if missing (like + * `setWorkspaceTitle`). Starred workspaces receive PRIORITY in the + * concurrency limiter queue — their agents jump ahead of agents from + * non-starred workspaces (oldest-agent-first within each group). + */ + readonly setWorkspaceStarred: (id: string, starred: boolean) => Promise<Workspace>; + /** * Delete a workspace: (1) find all conversations with `workspaceId === id`, * (2) set each to `status = "closed"` and reassign `workspaceId = "default"`, * (3) delete the workspace entity. Returns `closedCount`. Throws if `id @@ -358,6 +365,12 @@ interface WorkspaceRow { * workspace inherit it when they set no `computerId` of their own. */ readonly defaultComputerId: string | null; + /** + * Whether the workspace is starred by the user. Starred workspaces receive + * PRIORITY in the concurrency limiter queue. Defaults to `false` on legacy + * rows (normalized by `parseWorkspaceRow`). + */ + readonly starred: boolean; readonly createdAt: number; readonly lastActivityAt: number; } @@ -470,10 +483,13 @@ function parseWorkspaceRow(raw: string): WorkspaceRow | null { // (mirrors `defaultCwd`). Absent on legacy rows → null (local). const defaultComputerId = typeof row.defaultComputerId === "string" ? row.defaultComputerId : null; + // `starred` may be absent on legacy rows; treat anything non-boolean as false. + const starred = row.starred === true; return { title: row.title, defaultCwd, defaultComputerId, + starred, createdAt: row.createdAt, lastActivityAt: row.lastActivityAt, }; @@ -485,6 +501,7 @@ function toWorkspace(id: string, row: WorkspaceRow): Workspace { title: row.title, defaultCwd: row.defaultCwd, defaultComputerId: row.defaultComputerId, + starred: row.starred === true, createdAt: row.createdAt, lastActivityAt: row.lastActivityAt, }; @@ -545,6 +562,7 @@ export function createConversationStore( title: workspaceId, defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: ts, lastActivityAt: ts, } @@ -552,6 +570,7 @@ export function createConversationStore( title: existing.title, defaultCwd: existing.defaultCwd, defaultComputerId: existing.defaultComputerId, + starred: existing.starred, createdAt: existing.createdAt, lastActivityAt: ts, }; @@ -1102,13 +1121,14 @@ export function createConversationStore( if (row !== null) return toWorkspace(id, row); // Synthesize the always-present "default" workspace when it was // never persisted (title "default", defaultCwd null, defaultComputerId - // null [local], timestamps 0). + // null [local], starred false, timestamps 0). if (id === DEFAULT_WORKSPACE_ID) { return { id: DEFAULT_WORKSPACE_ID, title: DEFAULT_WORKSPACE_ID, defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 0, lastActivityAt: 0, }; @@ -1126,6 +1146,7 @@ export function createConversationStore( title: opts?.title ?? id, defaultCwd: opts?.defaultCwd ?? null, defaultComputerId: opts?.defaultComputerId ?? null, + starred: false, createdAt: ts, lastActivityAt: ts, }; @@ -1142,6 +1163,7 @@ export function createConversationStore( title: id, defaultCwd: null as string | null, defaultComputerId: null as string | null, + starred: false as boolean, createdAt: ts, lastActivityAt: ts, } @@ -1150,6 +1172,7 @@ export function createConversationStore( title, defaultCwd: base.defaultCwd, defaultComputerId: base.defaultComputerId, + starred: base.starred, createdAt: base.createdAt, lastActivityAt: base.lastActivityAt, }; @@ -1166,6 +1189,7 @@ export function createConversationStore( title: id, defaultCwd: null as string | null, defaultComputerId: null as string | null, + starred: false as boolean, createdAt: ts, lastActivityAt: ts, } @@ -1174,6 +1198,7 @@ export function createConversationStore( title: base.title, defaultCwd, defaultComputerId: base.defaultComputerId, + starred: base.starred, createdAt: base.createdAt, lastActivityAt: base.lastActivityAt, }; @@ -1190,6 +1215,7 @@ export function createConversationStore( title: id, defaultCwd: null as string | null, defaultComputerId: null as string | null, + starred: false as boolean, createdAt: ts, lastActivityAt: ts, } @@ -1198,6 +1224,7 @@ export function createConversationStore( title: base.title, defaultCwd: base.defaultCwd, defaultComputerId, + starred: base.starred, createdAt: base.createdAt, lastActivityAt: base.lastActivityAt, }; @@ -1205,6 +1232,34 @@ export function createConversationStore( return toWorkspace(id, row); }, + async setWorkspaceStarred(id, starred) { + const existing = await readWorkspaceRow(id); + const ts = now(); + const base = + existing === null + ? { + title: id, + defaultCwd: null as string | null, + defaultComputerId: null as string | null, + createdAt: ts, + lastActivityAt: ts, + } + : existing; + const row: WorkspaceRow = { + title: base.title, + defaultCwd: base.defaultCwd, + defaultComputerId: base.defaultComputerId, + starred, + createdAt: base.createdAt, + lastActivityAt: base.lastActivityAt, + }; + await storage.set(workspaceKey(id), JSON.stringify(row)); + if (logger !== undefined) { + logger.debug("workspace starred set", { workspaceId: id, starred }); + } + return toWorkspace(id, row); + }, + async deleteWorkspace(id) { if (id === DEFAULT_WORKSPACE_ID) { throw new Error('The "default" workspace cannot be deleted.'); @@ -1269,6 +1324,7 @@ export function createConversationStore( title: DEFAULT_WORKSPACE_ID, defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 0, lastActivityAt: 0, }); diff --git a/packages/provider-concurrency/package.json b/packages/provider-concurrency/package.json index 10c522a..b53d599 100644 --- a/packages/provider-concurrency/package.json +++ b/packages/provider-concurrency/package.json @@ -6,6 +6,7 @@ "main": "dist/index.js", "types": "dist/index.d.ts", "dependencies": { + "@dispatch/conversation-store": "workspace:*", "@dispatch/kernel": "workspace:*" } } diff --git a/packages/provider-concurrency/src/concurrency-manager.test.ts b/packages/provider-concurrency/src/concurrency-manager.test.ts index 185c6c2..357a5d1 100644 --- a/packages/provider-concurrency/src/concurrency-manager.test.ts +++ b/packages/provider-concurrency/src/concurrency-manager.test.ts @@ -90,7 +90,7 @@ function createManager(opts?: { describe("createConcurrencyManager", () => { it("returns no-op release for providers with no configured limit", async () => { const { manager } = createManager(); - const release = await manager.acquire("unknown", "conv1", 0); + const release = await manager.acquire("unknown", "conv1", "default", 0); expect(typeof release).toBe("function"); // No state → release is a no-op, no error. release(); @@ -101,7 +101,7 @@ describe("createConcurrencyManager", () => { const { manager } = createManager(); manager.setLimit("umans", 4); - const release1 = await manager.acquire("umans", "conv1", 0); + const release1 = await manager.acquire("umans", "conv1", "default", 0); const status = manager.getStatus("umans"); expect(status).toEqual({ providerId: "umans", @@ -120,11 +120,11 @@ describe("createConcurrencyManager", () => { const { manager } = createManager(); manager.setLimit("umans", 1); - const release1 = await manager.acquire("umans", "conv1", 100); + const release1 = await manager.acquire("umans", "conv1", "default", 100); // Second request should block (at limit). let resolved = false; - const promise2 = manager.acquire("umans", "conv2", 200).then((r) => { + const promise2 = manager.acquire("umans", "conv2", "default", 200).then((r) => { resolved = true; return r; }); @@ -150,13 +150,13 @@ describe("createConcurrencyManager", () => { manager.setLimit("umans", 1); // Hold the single slot. - const release0 = await manager.acquire("umans", "holder", 0); + const release0 = await manager.acquire("umans", "holder", "default", 0); // Three agents queue with different prompt start times. // Agent C started latest (t=300), Agent A started earliest (t=100). const results: string[] = []; const acquireAndRecord = (conv: string, promptAt: number) => - manager.acquire("umans", conv, promptAt).then((r) => { + manager.acquire("umans", conv, "default", promptAt).then((r) => { results.push(conv); return r; }); @@ -193,7 +193,7 @@ describe("createConcurrencyManager", () => { const { manager, timers } = createManager(); manager.setLimit("umans", 1); - const release1 = await manager.acquire("umans", "conv1", 0); + const release1 = await manager.acquire("umans", "conv1", "default", 0); release1(); // Simulate a 429 → queue pauses. @@ -204,7 +204,7 @@ describe("createConcurrencyManager", () => { // A new acquire should block (paused, even though under limit). let resolved = false; - const promise = manager.acquire("umans", "conv2", 0).then((r) => { + const promise = manager.acquire("umans", "conv2", "default", 0).then((r) => { resolved = true; return r; }); @@ -233,7 +233,7 @@ describe("createConcurrencyManager", () => { const { manager, timers } = createManager(); manager.setLimit("umans", 1); - const release = await manager.acquire("umans", "conv1", 0); + const release = await manager.acquire("umans", "conv1", "default", 0); expect(manager.getStatus("umans")?.inFlight).toBe(1); // Advance past the slot timeout (5000ms) and fire the watchdog. @@ -253,11 +253,11 @@ describe("createConcurrencyManager", () => { manager.setLimit("umans", 1); // Hold the slot. - await manager.acquire("umans", "holder", 0); + await manager.acquire("umans", "holder", "default", 0); // Queue a waiter. let resolved = false; - const promise = manager.acquire("umans", "waiter", 10).then((r) => { + const promise = manager.acquire("umans", "waiter", "default", 10).then((r) => { resolved = true; return r; }); @@ -280,11 +280,11 @@ describe("createConcurrencyManager", () => { const { manager } = createManager(); manager.setLimit("umans", 1); - const release1 = await manager.acquire("umans", "conv1", 0); + const release1 = await manager.acquire("umans", "conv1", "default", 0); // Queue a waiter. let resolved = false; - const promise = manager.acquire("umans", "conv2", 100).then((r) => { + const promise = manager.acquire("umans", "conv2", "default", 100).then((r) => { resolved = true; return r; }); @@ -307,11 +307,11 @@ describe("createConcurrencyManager", () => { const { manager } = createManager(); manager.setLimit("umans", 1); - const release1 = await manager.acquire("umans", "conv1", 0); + const release1 = await manager.acquire("umans", "conv1", "default", 0); // Queue two waiters. - const p2 = manager.acquire("umans", "conv2", 100); - const p3 = manager.acquire("umans", "conv3", 200); + const p2 = manager.acquire("umans", "conv2", "default", 100); + const p3 = manager.acquire("umans", "conv3", "default", 200); await Promise.resolve(); await Promise.resolve(); @@ -369,7 +369,7 @@ describe("createConcurrencyManager", () => { const { manager } = createManager(); manager.setLimit("umans", 2); - const release = await manager.acquire("umans", "conv1", 0); + const release = await manager.acquire("umans", "conv1", "default", 0); expect(manager.getStatus("umans")?.inFlight).toBe(1); release(); @@ -385,9 +385,9 @@ describe("createConcurrencyManager", () => { manager.setLimit("umans", 3); const releases = await Promise.all([ - manager.acquire("umans", "conv1", 0), - manager.acquire("umans", "conv2", 0), - manager.acquire("umans", "conv3", 0), + manager.acquire("umans", "conv1", "default", 0), + manager.acquire("umans", "conv2", "default", 0), + manager.acquire("umans", "conv3", "default", 0), ]); expect(manager.getStatus("umans")?.inFlight).toBe(3); @@ -402,12 +402,12 @@ describe("createConcurrencyManager", () => { const { manager, timers } = createManager({ releaseCooldownMs: 200 }); manager.setLimit("umans", 1); - const release1 = await manager.acquire("umans", "conv1", 0); + const release1 = await manager.acquire("umans", "conv1", "default", 0); expect(manager.getStatus("umans")?.inFlight).toBe(1); // Queue a waiter. let resolved = false; - const promise2 = manager.acquire("umans", "conv2", 100).then((r) => { + const promise2 = manager.acquire("umans", "conv2", "default", 100).then((r) => { resolved = true; return r; }); @@ -436,7 +436,7 @@ describe("createConcurrencyManager", () => { const { manager, timers } = createManager({ releaseCooldownMs: 200 }); manager.setLimit("umans", 2); - const release = await manager.acquire("umans", "conv1", 0); + const release = await manager.acquire("umans", "conv1", "default", 0); expect(manager.getStatus("umans")?.inFlight).toBe(1); release(); @@ -454,7 +454,7 @@ describe("createConcurrencyManager", () => { const { manager } = createManager({ releaseCooldownMs: 200 }); manager.setLimit("umans", 1); // Acquire + release to schedule a cooldown timer. - manager.acquire("umans", "conv1", 0).then((release) => { + manager.acquire("umans", "conv1", "default", 0).then((release) => { release(); // Now there's a pending cooldown timer — destroy should clean it up. expect(() => manager.destroy()).not.toThrow(); @@ -466,11 +466,11 @@ describe("createConcurrencyManager", () => { manager.setLimit("umans", 1); // Hold the single slot. - const release1 = await manager.acquire("umans", "conv1", 0); + const release1 = await manager.acquire("umans", "conv1", "default", 0); // Second request should trigger onQueued. let queuedCalled = false; - const promise = manager.acquire("umans", "conv2", 100, () => { + const promise = manager.acquire("umans", "conv2", "default", 100, () => { queuedCalled = true; }); await Promise.resolve(); @@ -490,7 +490,7 @@ describe("createConcurrencyManager", () => { manager.setLimit("umans", 2); let queuedCalled = false; - const release = await manager.acquire("umans", "conv1", 0, () => { + const release = await manager.acquire("umans", "conv1", "default", 0, () => { queuedCalled = true; }); @@ -966,3 +966,295 @@ describe("createConcurrencyManager", () => { expect(status?.notice).toBeUndefined(); }); }); + +// ─── Starred-workspace priority tests ─────────────────────────────────────── + +describe("starred-workspace priority", () => { + it("starred-workspace agents are admitted before non-starred (regardless of promptStartedAt)", async () => { + // Use a callback backed by a Set so we can star/unstar at runtime. + const starred = new Set<string>(); + const timers = createFakeTimers(); + const manager = createConcurrencyManager({ + now: timers.now, + slotTimeoutMs: 5000, + watchdogIntervalMs: 1000, + defaultPauseMs: 30000, + isWorkspaceStarred: (wsId: string) => starred.has(wsId), + setTimeout: timers.setTimeout, + clearTimeout: timers.clearTimeout, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + manager.setLimit("umans", 1); + + // Hold the single slot. + const release0 = await manager.acquire("umans", "holder", "default", 0); + + // Three agents queue: + // - convA (workspace "ws-normal", promptAt=100) — non-starred, earliest + // - convB (workspace "ws-starred", promptAt=200) — starred, later + // - convC (workspace "ws-normal", promptAt=300) — non-starred, latest + starred.add("ws-starred"); + + const results: string[] = []; + const acquireAndRecord = (conv: string, wsId: string, promptAt: number) => + manager.acquire("umans", conv, wsId, promptAt).then((r) => { + results.push(conv); + return r; + }); + + const pA = acquireAndRecord("convA", "ws-normal", 100); + const pB = acquireAndRecord("convB", "ws-starred", 200); + const pC = acquireAndRecord("convC", "ws-normal", 300); + + await Promise.resolve(); + await Promise.resolve(); + expect(results).toEqual([]); // none resolved yet. + + // Release the holder. The starred agent (convB, t=200) should get the + // slot FIRST, even though convA (t=100) started earlier. + release0(); + + const rB = await pB; + expect(results).toEqual(["convB"]); + rB(); + + // Now the oldest non-starred (convA, t=100) should be next. + const rA = await pA; + expect(results).toEqual(["convB", "convA"]); + rA(); + + // Then convC (t=300). + const rC = await pC; + expect(results).toEqual(["convB", "convA", "convC"]); + rC(); + + manager.destroy(); + }); + + it("within the starred group, oldest-agent-first is preserved", async () => { + const starred = new Set<string>(["ws-starred"]); + const timers = createFakeTimers(); + const manager = createConcurrencyManager({ + now: timers.now, + slotTimeoutMs: 5000, + watchdogIntervalMs: 1000, + defaultPauseMs: 30000, + isWorkspaceStarred: (wsId: string) => starred.has(wsId), + setTimeout: timers.setTimeout, + clearTimeout: timers.clearTimeout, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + manager.setLimit("umans", 1); + + const release0 = await manager.acquire("umans", "holder", "default", 0); + + const results: string[] = []; + const acquireAndRecord = (conv: string, wsId: string, promptAt: number) => + manager.acquire("umans", conv, wsId, promptAt).then((r) => { + results.push(conv); + return r; + }); + + // Two starred agents: convLate (t=300) queues first, convEarly (t=100) second. + const pLate = acquireAndRecord("convLate", "ws-starred", 300); + const pEarly = acquireAndRecord("convEarly", "ws-starred", 100); + + await Promise.resolve(); + await Promise.resolve(); + + release0(); + + // convEarly (t=100) should win within the starred group (oldest-first). + const rEarly = await pEarly; + expect(results).toEqual(["convEarly"]); + rEarly(); + + const rLate = await pLate; + expect(results).toEqual(["convEarly", "convLate"]); + rLate(); + + manager.destroy(); + }); + + it("starring a workspace while agents are queued re-prioritizes them immediately", async () => { + const timers = createFakeTimers(); + const manager = createConcurrencyManager({ + now: timers.now, + slotTimeoutMs: 5000, + watchdogIntervalMs: 1000, + defaultPauseMs: 30000, + setTimeout: timers.setTimeout, + clearTimeout: timers.clearTimeout, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + manager.setLimit("umans", 1); + + const release0 = await manager.acquire("umans", "holder", "default", 0); + + // convA (non-starred, t=100) queues first. + let resolvedA = false; + const pA = manager.acquire("umans", "convA", "ws-normal", 100).then((r) => { + resolvedA = true; + return r; + }); + // convB (non-starred, t=200) queues second. + let resolvedB = false; + const pB = manager.acquire("umans", "convB", "ws-to-star", 200).then((r) => { + resolvedB = true; + return r; + }); + + await Promise.resolve(); + await Promise.resolve(); + expect(resolvedA).toBe(false); + expect(resolvedB).toBe(false); + + // Now star convB's workspace AFTER it's queued. notifyWorkspaceStarred + // updates the internal cache + re-sorts + tries to grant. + manager.notifyWorkspaceStarred("ws-to-star", true); + + // Release the holder — convB (now starred) should jump ahead of convA. + release0(); + + const rB = await pB; + expect(resolvedB).toBe(true); + expect(resolvedA).toBe(false); + rB(); + + // Now convA gets the next slot. + const rA = await pA; + expect(resolvedA).toBe(true); + rA(); + + manager.destroy(); + }); + + it("unstar a workspace demotes its queued agents", async () => { + const timers = createFakeTimers(); + const manager = createConcurrencyManager({ + now: timers.now, + slotTimeoutMs: 5000, + watchdogIntervalMs: 1000, + defaultPauseMs: 30000, + setTimeout: timers.setTimeout, + clearTimeout: timers.clearTimeout, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + manager.setLimit("umans", 1); + + // Initially star "ws-starred" via the internal cache. + manager.notifyWorkspaceStarred("ws-starred", true); + + const release0 = await manager.acquire("umans", "holder", "default", 0); + + // convA (starred, t=200) queues first. + const pA = manager.acquire("umans", "convA", "ws-starred", 200); + // convB (non-starred, t=100) queues second but is older. + const pB = manager.acquire("umans", "convB", "ws-normal", 100); + + await Promise.resolve(); + await Promise.resolve(); + + // Unstar convA's workspace — it should now be behind convB (which is older). + manager.notifyWorkspaceStarred("ws-starred", false); + + release0(); + + // convB (t=100, now non-starred but oldest) should win. + const rB = await pB; + expect(rB).toBeDefined(); + rB(); + + const rA = await pA; + rA(); + + manager.destroy(); + }); + + it("notifyWorkspaceStarred re-sorts queues and tries to grant when capacity is free", async () => { + const timers = createFakeTimers(); + const manager = createConcurrencyManager({ + now: timers.now, + slotTimeoutMs: 5000, + watchdogIntervalMs: 1000, + defaultPauseMs: 30000, + setTimeout: timers.setTimeout, + clearTimeout: timers.clearTimeout, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + manager.setLimit("umans", 1); + + // Slot is held. Two agents queued (both non-starred). + const release0 = await manager.acquire("umans", "holder", "default", 0); + const pA = manager.acquire("umans", "convA", "ws-normal", 100); + const pB = manager.acquire("umans", "convB", "ws-to-star", 200); + + await Promise.resolve(); + await Promise.resolve(); + + // Star convB's workspace — notifyWorkspaceStarred re-sorts + tries to + // grant. But the slot is still held, so no one is granted yet. + manager.notifyWorkspaceStarred("ws-to-star", true); + + // Release the slot — convB (now starred) should get it. + release0(); + + const rB = await pB; + expect(rB).toBeDefined(); + rB(); + + const rA = await pA; + rA(); + + manager.destroy(); + }); + + it("without isWorkspaceStarred callback, all agents are non-starred (backward compatible)", async () => { + const timers = createFakeTimers(); + const manager = createConcurrencyManager({ + now: timers.now, + slotTimeoutMs: 5000, + watchdogIntervalMs: 1000, + defaultPauseMs: 30000, + setTimeout: timers.setTimeout, + clearTimeout: timers.clearTimeout, + setInterval: timers.setInterval, + clearInterval: timers.clearInterval, + }); + manager.setLimit("umans", 1); + + const release0 = await manager.acquire("umans", "holder", "default", 0); + + const results: string[] = []; + const acquireAndRecord = (conv: string, wsId: string, promptAt: number) => + manager.acquire("umans", conv, wsId, promptAt).then((r) => { + results.push(conv); + return r; + }); + + // Queue in non-sorted order: B (t=200), A (t=100). + const pB = acquireAndRecord("convB", "ws-any", 200); + const pA = acquireAndRecord("convA", "ws-any", 100); + + await Promise.resolve(); + await Promise.resolve(); + + release0(); + + // Without a callback, oldest-first ordering applies (no starred priority). + const rA = await pA; + expect(results).toEqual(["convA"]); + rA(); + + const rB = await pB; + expect(results).toEqual(["convA", "convB"]); + rB(); + + manager.destroy(); + }); +}); diff --git a/packages/provider-concurrency/src/concurrency-manager.ts b/packages/provider-concurrency/src/concurrency-manager.ts index 1d05bb0..ea66a49 100644 --- a/packages/provider-concurrency/src/concurrency-manager.ts +++ b/packages/provider-concurrency/src/concurrency-manager.ts @@ -77,10 +77,19 @@ export interface ProviderConcurrencyStatus { export interface ConcurrencyLimiter { /** * Acquire a concurrency slot for `providerId`. Resolves immediately when a - * slot is available; otherwise blocks (queued by oldest-agent-first) until - * one frees up. The returned function MUST be called when the response - * stream completes (in a `finally` block). For providers with no configured - * limit, resolves instantly with a no-op release. + * slot is available; otherwise blocks (queued by starred-workspace-first, + * then oldest-agent-first) until one frees up. The returned function MUST be + * called when the response stream completes (in a `finally` block). For + * providers with no configured limit, resolves instantly with a no-op + * release. + * + * **Priority:** agents from **starred workspaces** are always admitted before + * agents from non-starred workspaces (regardless of `promptStartedAt`). + * Within each group (starred vs non-starred), oldest-agent-first ordering is + * preserved. The starred status is looked up via the injected + * `isWorkspaceStarred` callback at sort time, so starring a workspace while + * agents are queued takes effect on the next sort (new acquire or slot + * release). * * If `onQueued` is provided and the request cannot be granted immediately * (at limit or paused), it is called synchronously BEFORE the Promise is @@ -89,14 +98,18 @@ export interface ConcurrencyLimiter { * * @param providerId The provider to limit (e.g. "umans", "openai-compat"). * @param conversationId The agent requesting the slot. + * @param workspaceId The workspace the agent belongs to (for starred + * priority scheduling). Defaults to `"default"`. * @param promptStartedAt When the agent's current prompt (turn) started - * (epoch-ms). Used for oldest-agent-first scheduling. + * (epoch-ms). Used for oldest-agent-first scheduling + * within each starred group. * @param onQueued Called synchronously when the request is enqueued * (not granted immediately). Optional. */ acquire( providerId: string, conversationId: string, + workspaceId: string, promptStartedAt: number, onQueued?: () => void, ): Promise<() => void>; @@ -146,6 +159,13 @@ export interface ConcurrencyService extends ConcurrencyLimiter { getStatus(providerId: string): ProviderConcurrencyStatus | undefined; /** Status for every provider with a configured limit. */ getStatusAll(): readonly ProviderConcurrencyStatus[]; + /** + * Notify the limiter that a workspace's starred state changed. Updates the + * in-memory starred cache so subsequent queue sorts re-evaluate priority + * (a newly-starred workspace's already-queued agents jump ahead). Called by + * the transport layer after persisting the starred toggle. + */ + notifyWorkspaceStarred(workspaceId: string, starred: boolean): void; /** Stop the watchdog + clear all timers. */ destroy(): void; } @@ -161,6 +181,7 @@ interface Slot { interface QueuedWaiter { readonly conversationId: string; + readonly workspaceId: string; readonly promptStartedAt: number; readonly resolve: (release: () => void) => void; } @@ -232,6 +253,14 @@ export interface ConcurrencyManagerOpts { * poll never becomes an unhandled rejection. */ readonly onUsagePollError?: (providerId: string, err: unknown) => void; + /** + * Injected callback: returns whether a workspace is starred (for priority + * scheduling). When provided, agents from starred workspaces jump ahead of + * non-starred agents in the queue. When omitted (or returns `false`), all + * agents are treated as non-starred (backward-compatible). This is an I/O + * effect injected so the manager stays pure + unit-testable with a fake. + */ + readonly isWorkspaceStarred?: (workspaceId: string) => boolean; } /** Min interval between usage-gate fallback repolls (ms). The release trigger is immediate. */ @@ -254,6 +283,16 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre const setInterval = opts.setInterval ?? globalThis.setInterval.bind(globalThis); const clearInterval = opts.clearInterval ?? globalThis.clearInterval.bind(globalThis); + // In-memory cache of starred workspace IDs. Populated by the extension on + // activation (from the conversation store) + updated via + // `notifyWorkspaceStarred`. The `isWorkspaceStarred` callback reads this + // synchronously so the queue sort comparator (sync) can re-evaluate priority + // on every sort — a newly-starred workspace's already-queued agents jump + // ahead on the next sort (new acquire or slot release). + const starredWorkspaces = new Set<string>(); + const isWorkspaceStarred = + opts.isWorkspaceStarred ?? ((wsId: string) => starredWorkspaces.has(wsId)); + const states = new Map<string, ProviderState>(); const cooldownOverrides = new Map<string, number>(); const cooldownTimers = new Set<ReturnType<typeof setTimeout>>(); @@ -343,6 +382,25 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre * (`inFlight < limit`). Synchronous. */ function grantLoop(state: ProviderState, providerId: string): void { + * Priority comparator for queued waiters: starred-workspace agents first, + * then oldest-agent-first (ascending `promptStartedAt`) within each group. + * Called at sort time (both on insert and before granting) so a workspace + * starred AFTER an agent queued is re-evaluated on the next sort. + */ + function compareWaiters(a: QueuedWaiter, b: QueuedWaiter): number { + const aStarred = isWorkspaceStarred(a.workspaceId); + const bStarred = isWorkspaceStarred(b.workspaceId); + if (aStarred !== bStarred) return aStarred ? -1 : 1; // starred first + return a.promptStartedAt - b.promptStartedAt; // oldest first within group + } + + function tryGrantNext(providerId: string): void { + const state = states.get(providerId); + if (state === undefined) return; + if (state.paused) return; + // Re-sort before granting: a workspace may have been starred/unstarred + // since the waiters were enqueued, so priority may have changed. + state.queue.sort(compareWaiters); while (state.queue.length > 0 && state.inFlight < state.limit) { const waiter = state.queue[0]; if (waiter === undefined) break; @@ -533,7 +591,7 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre // ── Public API ───────────────────────────────────────────────────────────── const manager: ConcurrencyService = { - acquire(providerId, conversationId, promptStartedAt, onQueued) { + acquire(providerId, conversationId, workspaceId, promptStartedAt, onQueued) { const state = states.get(providerId); if (state === undefined) { // No limit configured → unlimited. @@ -561,7 +619,7 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre // "queued" status signal while we're still synchronous. onQueued?.(); - // Queue (oldest-agent-first by promptStartedAt). + // Queue (starred-workspace-first, then oldest-agent-first). return new Promise<() => void>((resolve) => { state.queue.push({ conversationId, promptStartedAt, resolve }); // Keep sorted ascending by promptStartedAt (oldest first). @@ -572,6 +630,10 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre if (fetchUsage !== undefined) { armGateRepoll(providerId, state); } + state.queue.push({ conversationId, workspaceId, promptStartedAt, resolve }); + // Keep sorted by priority (starred first, then oldest-agent-first). + // The queue is typically tiny (<20), so a simple sort is fine. + state.queue.sort(compareWaiters); }); }, @@ -751,6 +813,22 @@ export function createConcurrencyManager(opts: ConcurrencyManagerOpts): Concurre .filter((s): s is ProviderConcurrencyStatus => s !== undefined); }, + notifyWorkspaceStarred(workspaceId, starred) { + if (starred) { + starredWorkspaces.add(workspaceId); + } else { + starredWorkspaces.delete(workspaceId); + } + // Re-sort all queues so a newly-starred workspace's already-queued + // agents jump ahead immediately (no need to wait for the next acquire). + for (const [providerId, state] of states) { + if (state.queue.length > 0) { + state.queue.sort(compareWaiters); + tryGrantNext(providerId); + } + } + }, + destroy() { clearInterval(watchdogTimer); for (const timer of cooldownTimers) { diff --git a/packages/provider-concurrency/src/extension.ts b/packages/provider-concurrency/src/extension.ts index 378655b..48c019b 100644 --- a/packages/provider-concurrency/src/extension.ts +++ b/packages/provider-concurrency/src/extension.ts @@ -1,3 +1,4 @@ +import { conversationStoreHandle } from "@dispatch/conversation-store"; import type { Extension, HostAPI, Logger, Manifest, StorageNamespace } from "@dispatch/kernel"; import type { ConcurrencyManagerOpts, ConcurrencyService } from "./concurrency-manager.js"; import { createConcurrencyManager } from "./concurrency-manager.js"; @@ -11,6 +12,7 @@ export const manifest: Manifest = { trust: "bundled", activation: "eager", capabilities: { db: true }, + dependsOn: ["conversation-store"], contributes: { services: ["provider-concurrency/service"] }, }; @@ -109,6 +111,7 @@ function createPersistedService( getCooldowns: inner.getCooldowns.bind(inner), getStatus: inner.getStatus.bind(inner), getStatusAll: inner.getStatusAll.bind(inner), + notifyWorkspaceStarred: inner.notifyWorkspaceStarred.bind(inner), destroy: inner.destroy.bind(inner), }; } @@ -215,6 +218,16 @@ export async function activate(host: HostAPI): Promise<void> { return provider.getUsage(); }; + // Resolve the conversation store to seed the in-memory starred-workspace + // cache. The `isWorkspaceStarred` callback reads this cache synchronously + // (the queue sort comparator is sync), so we must populate it before the + // manager handles its first acquire. `dependsOn: ["conversation-store"]` + // in the manifest guarantees the store is registered before we activate. + const conversationStore = host.getService(conversationStoreHandle); + + // The manager owns the in-memory `starredWorkspaces` set internally (the + // default `isWorkspaceStarred` callback checks it). We seed it by calling + // `notifyWorkspaceStarred` for each starred workspace found in the store. const managerOpts: ConcurrencyManagerOpts = { now: () => Date.now(), slotTimeoutMs: SLOT_TIMEOUT_MS, @@ -274,6 +287,27 @@ export async function activate(host: HostAPI): Promise<void> { await loadAutoReduce(storage, inner, logger); await loadCooldowns(storage, inner, logger); + // Seed the in-memory starred cache from the conversation store so the + // priority scheduling is correct on a fresh server start (previously-starred + // workspaces are respected without requiring the user to re-star them). + try { + const workspaces = await conversationStore.listWorkspaces(); + for (const ws of workspaces) { + if (ws.starred) { + inner.notifyWorkspaceStarred(ws.id, true); + } + } + if (workspaces.some((w) => w.starred)) { + logger.info("provider-concurrency: restored starred workspaces", { + count: workspaces.filter((w) => w.starred).length, + }); + } + } catch (err) { + logger.warn("provider-concurrency: failed to load starred workspaces", { + err: err instanceof Error ? err.message : String(err), + }); + } + const service = createPersistedService(inner, storage, logger); host.provideService(concurrencyServiceHandle, service); logger.info("provider-concurrency: registered"); diff --git a/packages/provider-concurrency/src/provider-wrapper.test.ts b/packages/provider-concurrency/src/provider-wrapper.test.ts index e59ab39..7554e64 100644 --- a/packages/provider-concurrency/src/provider-wrapper.test.ts +++ b/packages/provider-concurrency/src/provider-wrapper.test.ts @@ -17,12 +17,21 @@ function fakeProvider(events: ProviderEvent[]): ProviderContract { /** A fake limiter that records acquire/release calls. */ function recordingLimiter(): ConcurrencyLimiter & { - acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[]; + acquireCalls: { + providerId: string; + conversationId: string; + workspaceId: string; + promptStartedAt: number; + }[]; releaseCalls: number; rateLimitReports: string[]; } { - const acquireCalls: { providerId: string; conversationId: string; promptStartedAt: number }[] = - []; + const acquireCalls: { + providerId: string; + conversationId: string; + workspaceId: string; + promptStartedAt: number; + }[] = []; const releaseCalls: { count: number } = { count: 0 }; const rateLimitReports: string[] = []; @@ -32,8 +41,8 @@ function recordingLimiter(): ConcurrencyLimiter & { return releaseCalls.count; }, rateLimitReports, - acquire(providerId, conversationId, promptStartedAt) { - acquireCalls.push({ providerId, conversationId, promptStartedAt }); + acquire(providerId, conversationId, workspaceId, promptStartedAt) { + acquireCalls.push({ providerId, conversationId, workspaceId, promptStartedAt }); return Promise.resolve(() => { releaseCalls.count++; }); @@ -52,7 +61,7 @@ describe("wrapProviderWithConcurrency", () => { ]); const limiter = recordingLimiter(); - const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 12345); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 12345); const events: ProviderEvent[] = []; for await (const e of wrapped.stream([], [])) { @@ -61,7 +70,12 @@ describe("wrapProviderWithConcurrency", () => { // Slot acquired before stream, released after. expect(limiter.acquireCalls).toEqual([ - { providerId: "test-provider", conversationId: "conv1", promptStartedAt: 12345 }, + { + providerId: "test-provider", + conversationId: "conv1", + workspaceId: "default", + promptStartedAt: 12345, + }, ]); expect(limiter.releaseCalls).toBe(1); expect(events).toEqual([ @@ -79,7 +93,7 @@ describe("wrapProviderWithConcurrency", () => { }, }; const limiter = recordingLimiter(); - const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0); await expect(async () => { for await (const _e of wrapped.stream([], [])) { @@ -95,7 +109,7 @@ describe("wrapProviderWithConcurrency", () => { { type: "error", message: "Too many requests", code: "429", retryable: true }, ]); const limiter = recordingLimiter(); - const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0); const events: ProviderEvent[] = []; for await (const e of wrapped.stream([], [])) { @@ -113,7 +127,7 @@ describe("wrapProviderWithConcurrency", () => { { type: "error", message: "Internal error", code: "500", retryable: true }, ]); const limiter = recordingLimiter(); - const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0); for await (const _e of wrapped.stream([], [])) { // consume @@ -131,7 +145,7 @@ describe("wrapProviderWithConcurrency", () => { listModels: async () => [{ id: "model-1" }], }; const limiter = recordingLimiter(); - const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0); expect(wrapped.id).toBe("my-provider"); expect(wrapped.listModels).toBeDefined(); @@ -144,7 +158,7 @@ describe("wrapProviderWithConcurrency", () => { let acquiredCalled = false; const blockingLimiter: ConcurrencyLimiter = { - acquire(_providerId, _convId, _promptAt, onQueued) { + acquire(_providerId, _convId, _wsId, _promptAt, onQueued) { // Simulate a queued request: call onQueued, then resolve on next tick. onQueued?.(); return new Promise((resolve) => { @@ -161,6 +175,7 @@ describe("wrapProviderWithConcurrency", () => { provider, blockingLimiter, "conv1", + "default", 0, () => { queuedCalled = true; @@ -183,7 +198,7 @@ describe("wrapProviderWithConcurrency", () => { let acquiredCalled = false; const immediateLimiter: ConcurrencyLimiter = { - acquire(_providerId, _convId, _promptAt, _onQueued) { + acquire(_providerId, _convId, _wsId, _promptAt, _onQueued) { // Grant immediately — do NOT call onQueued. return Promise.resolve(() => {}); }, @@ -195,6 +210,7 @@ describe("wrapProviderWithConcurrency", () => { provider, immediateLimiter, "conv1", + "default", 0, () => { queuedCalled = true; @@ -229,7 +245,7 @@ describe("wrapProviderWithConcurrency", () => { }, }; const limiter = recordingLimiter(); - const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", 0); + const wrapped = wrapProviderWithConcurrency(provider, limiter, "conv1", "default", 0); const messages = [{ role: "user" as const, chunks: [{ type: "text" as const, text: "hi" }] }]; const tools = [{ name: "test_tool", description: "test", parameters: {} }]; diff --git a/packages/provider-concurrency/src/provider-wrapper.ts b/packages/provider-concurrency/src/provider-wrapper.ts index aa08e5b..1e3f2c0 100644 --- a/packages/provider-concurrency/src/provider-wrapper.ts +++ b/packages/provider-concurrency/src/provider-wrapper.ts @@ -23,6 +23,8 @@ import type { ConcurrencyLimiter } from "./concurrency-manager.js"; * @param provider The underlying provider to wrap. * @param limiter The concurrency limiter (acquire/release/reportRateLimit). * @param conversationId The agent requesting the stream (for slot attribution). + * @param workspaceId The workspace the agent belongs to (for starred + * priority scheduling in the limiter queue). * @param promptStartedAt When the agent's current prompt (turn) started * (epoch-ms, for oldest-agent-first scheduling). * @param onQueued Called synchronously when `acquire()` decides to @@ -36,6 +38,7 @@ export function wrapProviderWithConcurrency( provider: ProviderContract, limiter: ConcurrencyLimiter, conversationId: string, + workspaceId: string, promptStartedAt: number, onQueued?: () => void, onAcquired?: () => void, @@ -50,7 +53,13 @@ export function wrapProviderWithConcurrency( tools: readonly ToolContract[], opts?: ProviderStreamOptions, ): AsyncIterable<ProviderEvent> { - const release = await limiter.acquire(providerId, conversationId, promptStartedAt, onQueued); + const release = await limiter.acquire( + providerId, + conversationId, + workspaceId, + promptStartedAt, + onQueued, + ); onAcquired?.(); try { for await (const event of innerStream(messages, tools, opts)) { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 8cc0a97..aaf418a 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -747,6 +747,7 @@ export function createSessionOrchestrator( provider, limiter, conversationId, + workspaceId, promptStartedAt, () => emitStatus("queued"), () => emitStatus("active"), @@ -1219,10 +1220,12 @@ export function createWarmService( // Wrap with concurrency limiting (same as the main turn path). const warmLimiter = deps.resolveConcurrencyLimiter?.(); if (warmLimiter !== undefined) { + const warmWorkspaceId = await deps.conversationStore.getWorkspaceId(conversationId); provider = wrapProviderWithConcurrency( provider, warmLimiter, conversationId, + warmWorkspaceId, deps.now?.() ?? Date.now(), ); } @@ -1376,10 +1379,12 @@ export function createCompactionService( // Wrap with concurrency limiting (same as the main turn path). const compactionLimiter = deps.resolveConcurrencyLimiter?.(); if (compactionLimiter !== undefined) { + const compactionWorkspaceId = await deps.conversationStore.getWorkspaceId(conversationId); provider = wrapProviderWithConcurrency( provider, compactionLimiter, conversationId, + compactionWorkspaceId, deps.now?.() ?? Date.now(), ); } diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index 7ae0354..03f1959 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -110,6 +110,7 @@ function createFakeConversationStore( title: "default", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 0, lastActivityAt: 0, }; @@ -207,6 +208,9 @@ function createFakeConversationStore( async setWorkspaceDefaultComputerId(id, defaultComputerId) { return { ...sampleWorkspace, id, defaultComputerId }; }, + async setWorkspaceStarred(id, starred) { + return { ...sampleWorkspace, id, starred }; + }, async deleteWorkspace() { return { closedCount: 0 }; }, @@ -3562,6 +3566,7 @@ describe("Workspaces", () => { title: "proj", defaultCwd: null, defaultComputerId: null, + starred: false, createdAt: 1000, lastActivityAt: 2000, }; @@ -3756,6 +3761,150 @@ describe("Workspaces", () => { const res = await app.request("/workspaces/default", { method: "DELETE" }); expect(res.status).toBe(409); }); + + // ─── Star/unstar workspace (concurrency priority) ──────────────────────── + + it("PUT /workspaces/:id/star persists + notifies the concurrency service", async () => { + let starredCalled: { id: string; starred: boolean } | null = null; + const store: ConversationStore = { + ...createFakeConversationStore(), + async setWorkspaceStarred(id, starred) { + return { ...sampleWorkspace, id, starred }; + }, + }; + const concurrencyService = { + acquire: async () => () => {}, + reportRateLimit() {}, + setLimit() {}, + getLimit: () => undefined, + getLimits: () => [], + getStatus: () => undefined, + getStatusAll: () => [], + notifyWorkspaceStarred(id: string, starred: boolean) { + starredCalled = { id, starred }; + }, + destroy() {}, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + ...(concurrencyService !== undefined ? { concurrencyService } : {}), + logger: noopLogger, + }); + const res = await app.request("/workspaces/proj/star", { method: "PUT" }); + expect(res.status).toBe(200); + const body = (await res.json()) as WorkspaceResponse; + expect(body.starred).toBe(true); + expect(starredCalled).toEqual({ id: "proj", starred: true }); + }); + + it("DELETE /workspaces/:id/star persists + notifies the concurrency service", async () => { + let starredCalled: { id: string; starred: boolean } | null = null; + const store: ConversationStore = { + ...createFakeConversationStore(), + async setWorkspaceStarred(id, starred) { + return { ...sampleWorkspace, id, starred }; + }, + }; + const concurrencyService = { + acquire: async () => () => {}, + reportRateLimit() {}, + setLimit() {}, + getLimit: () => undefined, + getLimits: () => [], + getStatus: () => undefined, + getStatusAll: () => [], + notifyWorkspaceStarred(id: string, starred: boolean) { + starredCalled = { id, starred }; + }, + destroy() {}, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + concurrencyService, + logger: noopLogger, + }); + const res = await app.request("/workspaces/proj/star", { method: "DELETE" }); + expect(res.status).toBe(200); + const body = (await res.json()) as WorkspaceResponse; + expect(body.starred).toBe(false); + expect(starredCalled).toEqual({ id: "proj", starred: false }); + }); + + it("PUT /workspaces/:id/star rejects invalid slug", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger: noopLogger, + }); + const res = await app.request("/workspaces/Bad Slug!/star", { method: "PUT" }); + expect(res.status).toBe(400); + }); + + it("DELETE /workspaces/:id cleans up the in-memory starred cache (bug fix)", async () => { + // Bug 1 fix: deleting a workspace must notify the concurrency service to + // remove the workspace ID from the starred cache, preventing stale IDs. + let starredCalled: { id: string; starred: boolean } | null = null; + const store: ConversationStore = { + ...createFakeConversationStore(), + async deleteWorkspace() { + return { closedCount: 2 }; + }, + }; + const concurrencyService = { + acquire: async () => () => {}, + reportRateLimit() {}, + setLimit() {}, + getLimit: () => undefined, + getLimits: () => [], + getStatus: () => undefined, + getStatusAll: () => [], + notifyWorkspaceStarred(id: string, starred: boolean) { + starredCalled = { id, starred }; + }, + destroy() {}, + }; + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + concurrencyService, + logger: noopLogger, + }); + const res = await app.request("/workspaces/proj", { method: "DELETE" }); + expect(res.status).toBe(200); + // The concurrency service must be notified to clear the starred cache. + expect(starredCalled).toEqual({ id: "proj", starred: false }); + }); + + it("PUT /workspaces/:id/star logs warning when concurrency service is absent (bug fix)", async () => { + // Bug 2 fix: when the concurrency service is not loaded, the star toggle + // persists but the priority cache is not updated. A warning log makes this + // degraded behavior visible. + const logger = createFakeLogger(); + const store: ConversationStore = { + ...createFakeConversationStore(), + async setWorkspaceStarred(id, starred) { + return { ...sampleWorkspace, id, starred }; + }, + }; + // NOTE: no concurrencyService provided — simulates the extension being absent. + const app = createApp({ + conversationStore: store, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger, + }); + const res = await app.request("/workspaces/proj/star", { method: "PUT" }); + expect(res.status).toBe(200); + // The star persisted, but a warning was logged about the missing service. + const warnings = logger.records.filter((r) => r.level === "warn"); + expect(warnings.some((r) => r.msg.includes("concurrency service is not loaded"))).toBe(true); + }); }); it("POST /chat threads workspaceId", async () => { diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index ee7b2de..656be9d 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -1529,6 +1529,10 @@ export function createApp(opts: CreateServerOptions): Hono { try { const { closedCount } = await opts.conversationStore.deleteWorkspace(workspaceId); + // Clean up the in-memory starred cache so a deleted workspace's ID + // doesn't linger (and so a future workspace re-created with the same + // slug doesn't inherit the stale starred state). + opts.concurrencyService?.notifyWorkspaceStarred(workspaceId, false); log.info("workspaces: deleted", { workspaceId, closedCount }); const response: DeleteWorkspaceResponse = { workspaceId, closedCount }; return c.json(response, 200); @@ -1538,6 +1542,81 @@ export function createApp(opts: CreateServerOptions): Hono { } }); + // ─── Star/unstar workspace (concurrency priority) ─────────────────────────── + // Starred workspaces receive PRIORITY in the concurrency limiter queue — + // their agents jump ahead of agents from non-starred workspaces. The + // starred state is persisted in the conversation store AND the in-memory + // cache in the concurrency service is notified so already-queued agents + // are re-prioritized immediately. + + app.put("/workspaces/:id/star", async (c) => { + const workspaceId = c.req.param("id"); + if (!isValidWorkspaceSlug(workspaceId)) { + return c.json( + { + error: "Workspace id must be a valid slug (lowercase alphanumeric + hyphens, 1–40 chars)", + }, + 400, + ); + } + try { + const workspace = await opts.conversationStore.setWorkspaceStarred(workspaceId, true); + // Notify the concurrency service's in-memory cache so queued agents + // from this workspace jump ahead immediately. When the concurrency + // service is absent (extension not loaded), the starred state is + // persisted but the in-memory priority cache is NOT updated — log a + // warning so the degraded behavior is visible (queued agents keep + // their old priority until restart or the extension is loaded). + if (opts.concurrencyService !== undefined) { + opts.concurrencyService.notifyWorkspaceStarred(workspaceId, true); + } else { + log.warn( + "workspaces: starred but concurrency service is not loaded — priority cache not updated", + { + workspaceId, + }, + ); + } + log.info("workspaces: starred", { workspaceId }); + const response: WorkspaceResponse = workspace; + return c.json(response, 200); + } catch (err) { + log.error("workspaces: star failure", { err, workspaceId }); + return c.json({ error: "Failed to star workspace" }, 500); + } + }); + + app.delete("/workspaces/:id/star", async (c) => { + const workspaceId = c.req.param("id"); + if (!isValidWorkspaceSlug(workspaceId)) { + return c.json( + { + error: "Workspace id must be a valid slug (lowercase alphanumeric + hyphens, 1–40 chars)", + }, + 400, + ); + } + try { + const workspace = await opts.conversationStore.setWorkspaceStarred(workspaceId, false); + if (opts.concurrencyService !== undefined) { + opts.concurrencyService.notifyWorkspaceStarred(workspaceId, false); + } else { + log.warn( + "workspaces: unstarred but concurrency service is not loaded — priority cache not updated", + { + workspaceId, + }, + ); + } + log.info("workspaces: unstarred", { workspaceId }); + const response: WorkspaceResponse = workspace; + return c.json(response, 200); + } catch (err) { + log.error("workspaces: unstar failure", { err, workspaceId }); + return c.json({ error: "Failed to unstar workspace" }, 500); + } + }); + // ─── Heartbeat (per-workspace AI loop) ───────────────────────────────────── // The config + run history for a workspace's heartbeat loop. Delegated to // the HeartbeatService (provided by the `heartbeat` extension). When diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts index 7eb39c9..f424e42 100644 --- a/packages/transport-http/src/extension.ts +++ b/packages/transport-http/src/extension.ts @@ -72,6 +72,7 @@ export const manifest: Manifest = { "/workspaces/:id/heartbeat", "/workspaces/:id/heartbeat/runs", "/workspaces/:id/heartbeat/runs/:runId/stop", + "/workspaces/:id/star", "/workspaces/:id/title", ], }, diff --git a/packages/wire/src/index.test.ts b/packages/wire/src/index.test.ts index 81d10c1..2886ccb 100644 --- a/packages/wire/src/index.test.ts +++ b/packages/wire/src/index.test.ts @@ -42,19 +42,24 @@ describe("@dispatch/wire — Computer / Workspace shapes", () => { expect(entry.usageCount).toBe(3); }); - it("a Workspace carries defaultComputerId (null = local)", () => { + it("a Workspace carries defaultComputerId (null = local) and starred", () => { const remote: Workspace = { id: "default", title: "Default", defaultCwd: null, defaultComputerId: "myserver", + starred: false, createdAt: 0, lastActivityAt: 0, }; expect(remote.defaultComputerId).toBe("myserver"); + expect(remote.starred).toBe(false); const local: Workspace = { ...remote, defaultComputerId: null }; expect(local.defaultComputerId).toBeNull(); + + const starred: Workspace = { ...remote, starred: true }; + expect(starred.starred).toBe(true); }); }); diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 113f684..aa5b6a8 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -651,6 +651,13 @@ export interface Workspace { * (per-conv `computerId` → this → `null`/local). */ readonly defaultComputerId: string | null; + /** + * Whether the workspace is starred by the user. Starred workspaces receive + * PRIORITY in the concurrency limiter queue — their agents jump ahead of + * agents from non-starred workspaces (oldest-agent-first within each group). + * Defaults to `false` on creation. + */ + readonly starred: boolean; /** Epoch-ms when the workspace was first created. */ readonly createdAt: number; /** Epoch-ms of the most recent conversation activity in this workspace. */ |
