From 59c481dc96aa40b8c8f0a81bb294a77d4e5aa533 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Thu, 25 Jun 2026 11:09:21 +0900 Subject: docs(notes): research — list conversations filtered by worktree/workspace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Investigation of whether the backend supports listing open conversations filtered by a specific worktree/workspace. Findings: - 'worktree' is not a Dispatch domain concept; canonical term is 'workspace' (logical grouping) vs 'working directory' (cwd, filesystem path). - GET /conversations already supports composable ?workspaceId=, ?status=, ?q= filters. 'Open conversations in workspace X' = ?workspaceId=X&status=active,idle. - Every conversation carries a workspaceId (default 'default'); ConversationMeta is in @dispatch/wire; filter lives in conversation-store listConversations. - A literal directory (git worktree) filter (?cwd=) is NOT supported; §3b documents the small additive change needed across wire/store/transport-http. - Test coverage verified: store-workspace.test.ts:369, store.test.ts:1463, app.test.ts:3696. Research notes only — no code/contract changes. --- notes/conv-list-by-worktree-research.md | 247 ++++++++++++++++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100644 notes/conv-list-by-worktree-research.md (limited to 'notes') diff --git a/notes/conv-list-by-worktree-research.md b/notes/conv-list-by-worktree-research.md new file mode 100644 index 0000000..636f06e --- /dev/null +++ b/notes/conv-list-by-worktree-research.md @@ -0,0 +1,247 @@ +# Research — List conversations filtered by worktree / workspace + +> **Branch:** `feature/conv-list-by-worktree` (research notes only — no code changes). +> **Date:** 2026-06-25. **Repo:** `dispatch-backend` worktree +> `worktrees/conv-list-by-worktree/backend`. +> +> **Research question:** Does the Dispatch backend currently support getting a +> list of *open* conversations filtered by a specific worktree (or workspace)? + +## TL;DR + +**Yes — fully supported, via the `workspace` concept.** `GET /conversations` +accepts a composable `?workspaceId=` query filter, and every conversation +already carries a `workspaceId` (default `"default"`). Combined with the +`?status=` filter, "open conversations in workspace X" is: + +``` +GET /conversations?workspaceId=X&status=active,idle +``` + +("open" = not `closed`; the lifecycle statuses are `active | idle | closed`, +where `closed` is the archived state.) + +**Important terminology caveat:** **"worktree" is NOT a Dispatch domain term** — +it appears nowhere in `packages/` (zero matches; it only occurs in `ORCHESTRATOR.md` +and `notes/restructure-plan.md` as part of file *paths*). The two closest canonical +concepts are: + +| Canonical term (GLOSSARY) | Meaning | "Worktree" as directory? | +|---|---|---| +| **workspace** | A named, URL-driven *logical grouping* of conversations that owns a default cwd. Every conversation belongs to exactly one. (`@dispatch/wire` `Workspace`) | No — it's a slug, not a path | +| **working directory (cwd)** | The per-conversation *filesystem directory* tools/language-servers operate within. | **Yes** — a git worktree *is* a directory | + +The existing `GET /conversations` filter supports `?workspaceId=` but **not** +`?cwd=`. So if the intent is "conversations whose working directory is a *specific +git worktree path*", that is NOT supported today (see §3b). If the intent is the +logical grouping, it's fully supported (see §1–§2). + +--- + +## 1. Is there an existing endpoint? — YES + +`GET /conversations` lists all known conversations and supports three +**composable** query filters: + +| Filter | Values | Effect | +|---|---|---| +| `?workspaceId=` | workspace slug | Restrict to conversations in that workspace | +| `?status=` | `active`,`idle`,`closed` | Restrict to those lifecycle statuses | +| `?q=` | id prefix | Short-id resolution; id-prefix match (applied in-memory) | + +**Open conversations in a workspace** = `?status=active,idle` (excludes `closed`) +combined with `?workspaceId=`. + +There is also a `GET /workspaces` endpoint that lists workspaces (each with a +`conversationCount`), and per-workspace CRUD — see §4. + +--- + +## 2. How it works (endpoint, parameters, response shape) + +### 2a. The route — `packages/transport-http/src/app.ts:775-814` + +```ts +app.get("/conversations", async (c) => { + // ?status= comma-separated (e.g. "active,idle"). Default: all. Invalid dropped. + const statusFilter = parseStatusFilter(c.req.query("status")); + // ?workspaceId= . Missing/empty/whitespace → ignored (all workspaces). + const workspaceId = rawWorkspaceId !== undefined && rawWorkspaceId.trim().length > 0 + ? rawWorkspaceId.trim() : undefined; + const filter = (statusFilter !== undefined || workspaceId !== undefined) + ? { ...(statusFilter ? { status: statusFilter } : {}), + ...(workspaceId ? { workspaceId } : {}) } + : undefined; + const all = await opts.conversationStore.listConversations(filter); + // ?q= prefix filter applied in-memory on the result. + const conversations = q.length > 0 ? all.filter((m) => m.id.startsWith(q)) : all; + return c.json({ conversations }, 200); // 500 on store error +}); +``` + +- `?status=` parsing: `parseStatusFilter` (`packages/transport-http/src/logic.ts:24-38`). + Valid values are `"active" | "idle" | "closed"` (`VALID_STATUSES`, logic.ts:16). + Invalid values are silently dropped; if *all* values are invalid → no filter + (returns all). +- `?workspaceId=` is whitespace-trimmed; empty/whitespace-only is ignored. +- The `filter` object passed to the store is `{ status?, workspaceId? }` — either + or both optional. + +### 2b. Response shape + +`200` → `ConversationListResponse` (`packages/transport-contract/src/index.ts:710-713`, +re-exported from `@dispatch/wire`): + +```ts +interface ConversationListResponse { + readonly conversations: readonly ConversationMeta[]; +} + +interface ConversationMeta { // @dispatch/wire, wire/src/index.ts:518-536 + readonly id: string; + readonly createdAt: number; // epoch-ms, set on first write + readonly lastActivityAt: number; // epoch-ms, updated on every append + readonly title: string; // first user message (truncated 80) or PUT /title + readonly status: ConversationStatus; // "active" | "idle" | "closed" + readonly workspaceId: string; // always present; "default" fallback + readonly compactedFrom?: string; // present iff post-compaction +} +``` + +- Sorted by `lastActivityAt` **descending** (most recent first); stable sort keeps + first-seen (index) order for ties. +- `workspaceId` is always present on the response — conversations never assigned + read as `"default"` (see `toMeta`, `store.ts:329-337`). +- Errors: store failure → `500 { error: "Failed to list conversations" }`. + +### 2c. The store filter — `packages/conversation-store/src/store.ts` + +`ConversationStore.listConversations` filter type (`store.ts:91-94`): + +```ts +readonly listConversations: (filter?: { + readonly status?: readonly ConversationStatus[]; + readonly workspaceId?: string; +}) => Promise; +``` + +Implementation (`store.ts:695-733`): reads the conversation index, dedups +(first-seen order), reads each conversation's meta row, applies both filters, sorts +desc by `lastActivityAt`. Specifically: + +- `statusFilter` → membership test against `row.status`. +- `workspaceFilter` → equality against `row.workspaceId ?? DEFAULT_WORKSPACE_ID` + (so legacy rows with no stored workspaceId match `"default"`). + +`DEFAULT_WORKSPACE_ID` is `"default"`. + +--- + +## 3. "What would it take to add it?" + +### 3a. If "worktree" means the logical **workspace** → already done. Nothing to add. + +The capability, data model, contract types, and tests all exist. Pin +`@dispatch/transport-contract` (currently `0.22.0`) and `@dispatch/wire` +(`0.12.0`) and call `GET /conversations?workspaceId=&status=active,idle`. + +### 3b. If "worktree" literally means a **filesystem directory / git worktree path** → +NOT supported today; small, well-contained change. + +The directory concept maps to **working directory (cwd)**, which is per-conversation +(`conversation-store` `getCwd`/`setCwd`, keyed per conversation; `GET/PUT +/conversations/:id/cwd`). The list endpoint does NOT support a `?cwd=` filter, and +`ConversationMeta` does NOT carry a `cwd` field (confirmed: `wire/src/index.ts:518-536` +has no `cwd`; filter type `store.ts:91-94` has no `cwd`). + +To add a `?cwd=` filter (filter conversations by their working directory), the change +touches three layers, all additive: + +1. **Contract (`@dispatch/wire`)** — (optional) add `readonly cwd?: string | null` to + `ConversationMeta` so the caller can see each conversation's directory. Additive + type bump (e.g. `0.12.0 → 0.13.0`). Not strictly required if filtering is + server-side-only, but useful for the FE to render. +2. **Store (`conversation-store/src/store.ts`)**: + - Widen the `listConversations` filter to `{ status?, workspaceId?, cwd? }`. + - In the scan loop (`store.ts:718-728`), for each candidate call `getCwd(id)` + (or the effective cwd via `getEffectiveCwd`) and compare. **Cost note:** this is + an extra storage read per conversation (cwd is stored in its own key, not in the + meta row) — fine for typical counts; if scale matters, add a `cwd` field to + `ConversationMetaRow` (populated on `setCwd`) so the filter is a row comparison + with no extra reads. The latter is the better design and mirrors how + `workspaceId` was added to the meta row. + - Decide exact-match (path equality) vs. prefix/normalized match (e.g. a worktree + and its subdirs). Equality is simplest and probably sufficient. +3. **Transport (`transport-http/src/app.ts` + `logic.ts`)** — parse `?cwd=` (trim; + empty → ignore, mirroring `?workspaceId=`) and pass `cwd` into the store filter. + Update `ConversationListResponse` doc. + +The worktree-as-directory case is the only one that requires new code; the +workspace case requires none. + +--- + +## 4. Related workspace capabilities (for context) + +The workspace model is fully built out (courier doc `frontend-workspaces-handoff.md` +→ implemented in `@dispatch/wire@0.12.0` / `@dispatch/transport-contract`): + +- `GET /workspaces` → `WorkspaceListResponse` (workspaces sorted by `lastActivityAt` + desc, each with `conversationCount`). The `"default"` workspace is always + synthesized/present. +- `PUT /workspaces/:id` (create-on-miss, idempotent), `GET /workspaces/:id` (pure + read, 404 if missing), `PUT /workspaces/:id/title`, `PUT /workspaces/:id/default-cwd`, + `DELETE /workspaces/:id` (closes all its conversations → `status="closed"`, + reassigns them to `"default"`, returns `closedCount`; 409 for `"default"`). +- Workspace slug regex: `^[a-z0-9](?:[a-z0-9-]{0,38}[a-z0-9])?$` (1–40, lowercase, + digits, internal hyphens). Validated by `isValidWorkspaceSlug`. +- `workspaceId` is auto-created on turn start if missing (`title = id`, + `defaultCwd = null`). Auto-assigned on `/chat` and `POST /conversations/:id/queue`. +- `DELETE /conversations/:id/cwd` clears the explicit per-conversation cwd + (falls back to the workspace `defaultCwd`, then the server default). +- cwd resolution: explicit per-conversation cwd → workspace `defaultCwd` → server + default (`process.cwd()`). `GET /conversations/:id/cwd` returns the explicit cwd + only; `GET /conversations/:id/lsp` roots at the *effective* cwd. + +`Workspace` type (`wire/src/index.ts:566-577`): `{ id, title, defaultCwd: string|null, +createdAt, lastActivityAt }`. `WorkspaceEntry extends Workspace { conversationCount }`. + +--- + +## 5. Test coverage (verified green in-repo) + +- `conversation-store/src/store-workspace.test.ts:369` — "listConversations filtered + by workspaceId" (asserts work-a returns `[a1,a2]`, work-b returns `[b1]`). +- `conversation-store/src/store.test.ts:1463` — "listConversations filters by status", + incl. `{ status: ["active","idle"] }` (= "open") returns `[conv1, conv3]` + (excludes the `closed` conv2). Also `store.test.ts:1485` — status persists across a + fresh store instance. +- `transport-http/src/app.test.ts:3696` — "GET /conversations?workspaceId= filters" + (asserts the store receives `{ workspaceId: "proj" }` and responds `200`). +- `transport-http/src/app.test.ts` (q-filter block ~3133-3156) — `?q=` prefix + + empty/whitespace handling; 500 on store throw. + +The two filters are independently testable and composable at the store level +(same `filter` object carries both); HTTP-level composition of `?status=…&workspaceId=…` +is not asserted by an explicit combined test, but the handler builds one merged +`filter` object so composition is structural. + +--- + +## 6. Verdict & recommendation + +- **For the workspace interpretation (the canonical reading of "grouping of + conversations"): the feature already exists.** Use + `GET /conversations?workspaceId=&status=active,idle`. No code changes, no + data-model changes, no contract bumps needed. +- **For the literal "git worktree as a directory" interpretation: not supported** as a + list filter; the closest concept is per-conversation **cwd**. Adding a `?cwd=` filter + is a small additive change across `@dispatch/wire` (optional `cwd` on + `ConversationMeta`), `conversation-store` (`listConversations` filter + ideally a + `cwd` column on the meta row for cheap filtering), and `transport-http` (parse + `?cwd=`). See §3b. +- **Terminology:** recommend the user confirm which concept they mean. If they mean + the logical grouping, "workspace" is already the canonical GLOSSARY term — no new + vocabulary. If they truly need directory-based grouping, that is a distinct feature + from workspaces and should be scoped as such (it overlaps with, but is not, a + workspace; a single directory could be shared by multiple workspaces or none). -- cgit v1.2.3 From 4c42ec9c7df067e0e0e309610b61e25752d73f9f Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Thu, 25 Jun 2026 18:09:12 +0900 Subject: feat(kernel): retry-with-backoff on retryable provider errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the upstream LLM API returns a retryable error (HTTP 429 / 5xx "overloaded"), the kernel now retries provider.stream() with a stepped backoff, visibly, until the 8h cumulative-sleep budget is exhausted — then emits the final error and seals the turn. Retries fire only when no content was emitted yet this step (safety invariant: never duplicate partial output). - wire: new transient TurnProviderRetryEvent AgentEvent variant (emitted before each sleep; not persisted to model history). - kernel contracts: RetryStrategy (pure delayFor + injected sleep) + optional retry? on RunTurnInput (omit = no retry, backward-compatible). - kernel run-turn: retry loop in executeStep; providerRetryEvent constructor. Kernel imports no timer (sleep injected). - session-orchestrator: concrete schedule (5s..30m, repeat 30m, 8h budget) + abortable setTimeout sleep, wired into RunTurnInput.retry. tsc -b EXIT 0; biome clean; 1574 vitest pass (+16 new: 11 kernel retry tests with injected fake sleep + pure delayFor, zero @dispatch/* mocks; 5 schedule tests). Transports unchanged (transport-ws forwards AgentEvent verbatim in chat.delta; transport-http is generic JSON.stringify). Plan: notes/retry-with-backoff-plan.md. tasks.md updated with milestone + optional CLI-renderer roadmap follow-up. --- notes/retry-with-backoff-plan.md | 138 ++++++ packages/kernel/src/contracts/events.ts | 1 + packages/kernel/src/contracts/index.ts | 2 + packages/kernel/src/contracts/runtime.ts | 44 ++ packages/kernel/src/runtime/events.ts | 14 + packages/kernel/src/runtime/index.ts | 1 + packages/kernel/src/runtime/run-turn.test.ts | 535 ++++++++++++++++++++++ packages/kernel/src/runtime/run-turn.ts | 167 +++++-- packages/session-orchestrator/src/index.ts | 6 + packages/session-orchestrator/src/orchestrator.ts | 36 ++ packages/session-orchestrator/src/pure.test.ts | 61 +++ packages/session-orchestrator/src/pure.ts | 47 ++ packages/wire/src/index.ts | 26 ++ tasks.md | 38 +- 14 files changed, 1089 insertions(+), 27 deletions(-) create mode 100644 notes/retry-with-backoff-plan.md (limited to 'notes') diff --git a/notes/retry-with-backoff-plan.md b/notes/retry-with-backoff-plan.md new file mode 100644 index 0000000..99f6f5d --- /dev/null +++ b/notes/retry-with-backoff-plan.md @@ -0,0 +1,138 @@ +# Plan — Retry-with-backoff on retryable provider errors (FINALIZED) + +**Goal:** When the upstream LLM API returns a retryable error (e.g. "server +overloaded"), retry the request with a stepped backoff, visibly, until the +budget is exhausted. + +## The error (from the prod DB) — detection is already done + +- **HTTP 429** (46×) and **HTTP 502** (1×), **no 503s**. +- Body: `{"error":{"type":"overloaded_error","message":"The service is temporarily overloaded. Please retry."}}` +- `packages/openai-stream/src/stream.ts:201` **already sets** + `retryable: response.status >= 500 || response.status === 429` on the error + event, and `ProviderErrorEvent` (`kernel/contracts/provider.ts:72`) **already + declares `retryable?: boolean`**. The kernel's `processEvent` just ignores it. +- The error is **emitted (not thrown) and before any content** → retrying + `provider.stream()` is safe (no partial chunks to roll back). + +## Decision 1 — the backoff schedule + +`5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, then **repeat 30m** until **8h of +cumulative retry-wait** is reached, then give up (emit the final error + seal). + +Pure function of the attempt index (0 = first retry): +```ts +const SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000]; +const TAIL_MS = 1_800_000; // 30m +const BUDGET_MS = 8 * 60 * 60 * 1000; // 8h + +// pure, deterministic, no I/O +function delayFor(attempt: number): number | undefined { + const delay = attempt < SCHEDULE_MS.length ? SCHEDULE_MS[attempt] : TAIL_MS; + if (cumulativeSleepMs(attempt) > BUDGET_MS) return undefined; // over budget → stop + return delay; +} +``` +- `cumulativeSleepMs(attempt)` = sum of delay[0..attempt]; head (8 steps) sums to + 3,705s, then +1,800s per extra step. 8h (28,800s) is reached at attempt ~21 + → ~21 retries, ~7h32m of sleeping, then give up. +- Budget = cumulative *scheduled sleep* (pure/testable). If you prefer wall-clock + since first error, it switches to using the injected `now` — easy change. + +## Decision 2 — visible (yellow system-message warning) + 5d3f handoff + +Add a new **transient** `AgentEvent` variant (emitted to the frontend, NOT +persisted into the model's message history — so it never pollutes the prompt): + +```ts +// @dispatch/wire (AgentEvent union gains this member) +export interface TurnProviderRetryEvent { + readonly type: "provider-retry"; + readonly conversationId: string; + readonly turnId: string; + /** 0-based: this is the Nth retry about to happen. */ + readonly attempt: number; + /** ms the client should expect to wait before the retry fires. */ + readonly delayMs: number; + /** The endpoint's error verbatim (e.g. "HTTP 429: {…overloaded_error…}"). */ + readonly message: string; + /** The HTTP code when known (e.g. "429"). */ + readonly code?: string; +} +``` +- Emitted once per scheduled retry, BEFORE the sleep, so the UI shows + "⚠ Server overloaded — retrying in 5s…" immediately. +- When retries are exhausted (8h), the existing `error` event is emitted (as + today) and the turn seals — so the final failure is still a persisted error. + +**Frontend handoff to 5d3f:** render `provider-retry` as a yellow warning +system-message bubble showing `message` (+ `code`), with the countdown. (I do +the backend; 5d3f does the renderer — handoff via dispatch CLI.) + +## Decision 3 — retry ANY retryable error + +Retry trigger (both paths), **only when no content has been emitted yet** +(the safety invariant — never duplicate partial output): + +- **Emitted** `error` ProviderEvent with `retryable === true` → retry. (429/502/5xx + network fetch errors — all pre-content.) +- **Thrown** error (mid-stream, caught in `executeStep`'s `catch`) → treated as **retryable-by-default when pre-content** (most mid-stream throws are transient network/SSE issues). A thrown error after content is emitted is NOT retried (can't safely). + +So "if it's retryable, retry it" = the `retryable` flag drives emitted errors; +thrown errors default to retryable when nothing was streamed yet. Non-retryable +emitted errors (`retryable: false`/absent) end the step as today. + +## Architecture — kernel provides the HOOK, shell provides POLICY + I/O + +(Constitution: kernel touches no I/O; effects injected; decision pure.) + +### Kernel contract (`kernel/src/contracts/runtime.ts`) — add to `RunTurnInput`: +```ts +export interface RetryStrategy { + /** Pure: attempt → delay ms, or undefined to stop (budget exhausted). */ + readonly delayFor: (attempt: number) => number | undefined; + /** Injected effect: actually sleep. Kernel imports no timer. Abortable. */ + readonly sleep: (ms: number, signal: AbortSignal) => Promise; +} +export interface RunTurnInput { + // …existing… + /** Optional injected retry. Omit = no retry (backward-compatible). */ + readonly retry?: RetryStrategy; +} +``` + +### Kernel loop (`kernel/src/runtime/run-turn.ts`, `executeStep`): +Wrap stream consumption in a retry loop: +- track `hadContent` (any text/reasoning/tool-call/usage seen); +- on a retryable error (emitted `retryable:true` OR thrown) with `!hadContent`: + - `delay = retry.delayFor(attempt)`; if `undefined` → give up (emit the + suppressed error, end step); + - else emit `providerRetryEvent(attempt, delay, message, code)`, `await + retry.sleep(delay, signal)`, `attempt++`, re-call `provider.stream()`; +- on abort during sleep → reject, seal turn `aborted` (existing flow). + +### Shell wiring (`session-orchestrator/src/orchestrator.ts`): +- Provide the concrete `RetryStrategy`: `delayFor` = the schedule + 8h budget + above; `sleep` = abortable `setTimeout`-based promise. +- Pass `retry` into the `RunTurnInput` it builds (line 589). + +## Build breakdown by unit (execution) + +| Unit (owner) | Change | +|---|---| +| `@dispatch/wire` | add `TurnProviderRetryEvent` to `AgentEvent` union | +| `kernel` contracts | add `RetryStrategy` + `retry?` on `RunTurnInput` | +| `kernel` events.ts | `providerRetryEvent(...)` constructor | +| `kernel` run-turn.ts | retry loop in `executeStep` (the core logic) | +| `kernel` run-turn.test.ts | pure tests: fake `sleep` + pure `delayFor`; assert schedule, no-after-content retry, give-up emits error, abort-during-sleep | +| `session-orchestrator` | wire concrete schedule + real `setTimeout` sleep | +| `transport-ws` | if it has an exhaustive `switch(event.type)`, add the `provider-retry` case | +| `transport-http` (mine) | **no change** — `serializeEventLine` is generic `JSON.stringify` | +| frontend (5d3f) | render `provider-retry` as a yellow warning system message | + +## Open items +- **8h budget = cumulative scheduled sleep** (pure). Confirm OK vs wall-clock. +- **Thrown errors default retryable-when-pre-content.** Confirm (vs only the + flagged emitted path). +- **Execution mode:** this spans kernel + wire + orchestrator (outside my + transport-http unit). Build it directly across units, or dispatch each slice + to its unit owner via the dispatch CLI? diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index 6c9652d..dca34c2 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -11,6 +11,7 @@ export type { TurnDoneEvent, TurnErrorEvent, TurnInputEvent, + TurnProviderRetryEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index c67607b..f3e5bca 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -40,6 +40,7 @@ export type { TurnDoneEvent, TurnErrorEvent, TurnInputEvent, + TurnProviderRetryEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, @@ -109,6 +110,7 @@ export type { export type { EventEmitter, FinishReason, + RetryStrategy, RunTurnInput, RunTurnResult, } from "./runtime.js"; diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index 02fc446..8376e42 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -129,6 +129,22 @@ export interface RunTurnInput { * double-persist them. */ readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise | void; + + /** + * Optional injected retry strategy for retryable provider errors (e.g. HTTP + * 429 / 5xx "overloaded"). When omitted, a retryable error ends the step + * exactly as before (backward-compatible). When provided, the runtime wraps + * `provider.stream()` consumption in a retry loop: on a retryable error + * (an emitted `error` ProviderEvent with `retryable === true`, OR a thrown + * error) — ONLY when no content was emitted yet this step (the safety + * invariant — never duplicate partial output) — it asks `retry.delayFor` + * for a delay, emits a transient `provider-retry` AgentEvent, sleeps via the + * injected `retry.sleep` (abortable), and re-calls `provider.stream()`. + * + * Injected (not ambient): the kernel imports no timer and owns no schedule. + * Mirrors the `now`/`logger` injection pattern — optional + backward-compatible. + */ + readonly retry?: RetryStrategy; } /** @@ -145,3 +161,31 @@ export interface RunTurnResult { /** Why the turn ended. */ readonly finishReason: FinishReason; } + +/** + * Injected retry strategy for retryable provider errors (e.g. HTTP 429 / 5xx). + * + * The kernel provides the HOOK (this contract + the retry loop in `runTurn`); + * the shell (session-orchestrator) provides the POLICY (the concrete schedule) + * and the I/O (the actual sleep). The kernel imports no timer — `sleep` is an + * injected effect so the runtime stays pure and deterministic in tests. + * + * Retries are ONLY attempted when NO content was emitted yet this step (the + * safety invariant — never duplicate partial output). When omitted on + * `RunTurnInput`, no retry happens (backward-compatible: a retryable error ends + * the step exactly as before). + */ +export interface RetryStrategy { + /** + * Pure, deterministic decision: given the 0-based attempt index, return the + * delay in ms to sleep before the next retry, or `undefined` to stop (budget + * exhausted). No I/O, no clock — fully testable. + */ + readonly delayFor: (attempt: number) => number | undefined; + /** + * Injected effect: actually sleep for the given ms. Must honor the abort + * signal — reject when aborted so the turn seals `aborted`. The kernel + * imports no timer; the shell provides a `setTimeout`-based implementation. + */ + readonly sleep: (ms: number, signal: AbortSignal) => Promise; +} diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index b194577..5805e28 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -164,3 +164,17 @@ export function errorEvent( } return { type: "error", conversationId, turnId, message }; } + +export function providerRetryEvent( + conversationId: string, + turnId: string, + attempt: number, + delayMs: number, + message: string, + code?: string, +): AgentEvent { + if (code !== undefined) { + return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message, code }; + } + return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message }; +} diff --git a/packages/kernel/src/runtime/index.ts b/packages/kernel/src/runtime/index.ts index e1156e3..e0dd656 100644 --- a/packages/kernel/src/runtime/index.ts +++ b/packages/kernel/src/runtime/index.ts @@ -2,6 +2,7 @@ export type { StepDispatcher } from "./dispatch.js"; export { createStepDispatcher, executeToolCall } from "./dispatch.js"; export { errorEvent, + providerRetryEvent, reasoningDeltaEvent, textDeltaEvent, toolCallEvent, diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 0d4c59d..dba9d80 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -2821,4 +2821,539 @@ describe("runTurn", () => { expect(drainCallCount).toBe(MAX_STEPS - 1); }); }); + + // ── Retry with backoff ────────────────────────────────────────────────── + // + // PURE tests: a fake `sleep` (records calls, resolves instantly, can abort + // on a chosen call) + a pure `delayFor` (the canonical schedule + 8h budget). + // A stub `ProviderContract` whose `stream` yields a retryable error N times + // then a finish. ZERO mocks of `@dispatch/*` modules — effects injected. + + /** The canonical backoff schedule (matches the orchestrator's concrete strategy). */ + const RETRY_SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000]; + const RETRY_TAIL_MS = 1_800_000; // 30m + const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; // 8h + + /** Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). */ + function cumulativeSleepMs(attempt: number): number { + let sum = 0; + for (let i = 0; i <= attempt; i++) { + sum += i < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[i] : RETRY_TAIL_MS; + } + return sum; + } + + /** Pure, deterministic delay decision (no I/O, no clock). */ + function delayFor(attempt: number): number | undefined { + const delay = attempt < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[attempt] : RETRY_TAIL_MS; + if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop + return delay; + } + + /** The full schedule delayFor would emit (until budget exhausted). */ + function fullSchedule(): number[] { + const result: number[] = []; + let attempt = 0; + while (true) { + const delay = delayFor(attempt); + if (delay === undefined) break; + result.push(delay); + attempt++; + } + return result; + } + + /** + * Fake, controllable `sleep`: records every call's delay, resolves + * instantly (no real waiting), and can abort the controller on a chosen + * 1-based call index to simulate "abort during sleep". + */ + function createFakeSleep(controller: AbortController): { + sleep: (ms: number, signal: AbortSignal) => Promise; + calls: number[]; + abortOnCall: (n: number) => void; + } { + const calls: number[] = []; + let abortAt: number | undefined; + const sleep = async (ms: number, _signal: AbortSignal): Promise => { + calls.push(ms); + if (abortAt !== undefined && calls.length === abortAt) { + controller.abort(); + throw new Error("aborted"); + } + // Otherwise resolve instantly (no real waiting). + }; + return { + sleep, + calls, + abortOnCall: (n: number) => { + abortAt = n; + }, + }; + } + + /** A provider that yields a retryable error `errorCount` times, then success. */ + function createRetryingProvider(opts: { + errorCount: number; + error?: { message: string; code?: string; retryable?: boolean }; + success?: ProviderEvent[]; + }): { provider: ProviderContract; streamCalls: { value: number } } { + const streamCalls = { value: 0 }; + const error: ProviderEvent = { + type: "error", + message: opts.error?.message ?? "overloaded", + ...(opts.error?.code !== undefined ? { code: opts.error.code } : {}), + ...(opts.error?.retryable !== undefined ? { retryable: opts.error.retryable } : {}), + }; + const success = opts.success ?? [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ]; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = streamCalls.value++; + return (async function* () { + if (idx < opts.errorCount) { + yield error; + return; + } + for (const event of success) yield event; + })(); + }, + }; + return { provider, streamCalls }; + } + + describe("retry with backoff", () => { + it("retries a retryable emitted error on schedule then succeeds", async () => { + const { provider } = createRetryingProvider({ + errorCount: 3, + error: { message: "HTTP 429: overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("stop"); + // 3 retries: 5s, 10s, 30s. + expect(fake.calls).toEqual([5_000, 10_000, 30_000]); + // 3 provider-retry events (one per sleep), then the successful text. + const retryEvents = events.filter((e) => e.type === "provider-retry"); + expect(retryEvents).toHaveLength(3); + if (retryEvents[0]?.type === "provider-retry") { + expect(retryEvents[0].attempt).toBe(0); + expect(retryEvents[0].delayMs).toBe(5_000); + expect(retryEvents[0].message).toBe("HTTP 429: overloaded"); + expect(retryEvents[0].code).toBe("429"); + expect(retryEvents[0].conversationId).toBe("conv-1"); + expect(retryEvents[0].turnId).toBe("turn-1"); + } + if (retryEvents[1]?.type === "provider-retry") { + expect(retryEvents[1].attempt).toBe(1); + expect(retryEvents[1].delayMs).toBe(10_000); + } + if (retryEvents[2]?.type === "provider-retry") { + expect(retryEvents[2].attempt).toBe(2); + expect(retryEvents[2].delayMs).toBe(30_000); + } + // The error was suppressed (no error event emitted — retry succeeded). + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + // The successful content still streams. + const deltas = events.filter((e) => e.type === "text-delta"); + expect(deltas).toHaveLength(1); + }); + + it("sleep is called with the full schedule [5s,10s,30s,60s,5m,10m,15m,30m,30m…]", async () => { + // Provider errors forever → retries until budget exhausted → gives up. + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + // Budget exhausted → give up → error. + expect(result.finishReason).toBe("error"); + + // The sleep schedule matches the pure delayFor output exactly. + expect(fake.calls).toEqual(fullSchedule()); + + // Head of the schedule (the 8 stepped delays). + expect(fake.calls.slice(0, 8)).toEqual([ + 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000, + ]); + // Tail repeats 30m. + expect(fake.calls[8]).toBe(1_800_000); + expect(fake.calls.at(-1)).toBe(1_800_000); + + // 8h cumulative budget cap: head (3705s) + 13×30m = ~7h31m, then stop. + // 21 retries (attempts 0..20), then delayFor(21) → undefined → give up. + expect(fake.calls).toHaveLength(21); + const totalSlept = fake.calls.reduce((a, b) => a + b, 0); + expect(totalSlept).toBeLessThanOrEqual(RETRY_BUDGET_MS); + expect(totalSlept).toBe(3_705_000 + 13 * 1_800_000); // 27_105_000 + + // One provider-retry per sleep, plus a final error (give-up). + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(21); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + const errEvt = events.find((e) => e.type === "error"); + if (errEvt?.type === "error") { + expect(errEvt.message).toBe("overloaded"); + expect(errEvt.code).toBe("429"); + } + }); + + it("does NOT retry after content was emitted (safety invariant)", async () => { + // Provider yields text (content) THEN a retryable error. Because content + // was emitted, retrying is unsafe (would duplicate partial output). + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + yield { + type: "error", + message: "overloaded", + code: "429", + retryable: true, + } as ProviderEvent; + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + // No retries: stream called exactly once. + expect(callCount).toBe(1); + expect(fake.calls).toHaveLength(0); + // The error is emitted (give-up) and partial content preserved. + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); + }); + + it("does NOT retry a non-retryable emitted error (retryable: false)", async () => { + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "bad request", code: "400", retryable: false }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + }); + + it("does NOT retry a non-retryable emitted error (retryable absent)", async () => { + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "bad request", code: "400" }, // no retryable field + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + }); + + it("give-up emits the final error when budget is exhausted", async () => { + // Custom delayFor that allows exactly 1 retry then stops. + const shortDelayFor = (attempt: number): number | undefined => + attempt === 0 ? 100 : undefined; + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor: shortDelayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("error"); + expect(fake.calls).toEqual([100]); // one retry, then give up + // One provider-retry (attempt 0), then the final error. + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(1); + const errs = events.filter((e) => e.type === "error"); + expect(errs).toHaveLength(1); + if (errs[0]?.type === "error") { + expect(errs[0].message).toBe("overloaded"); + expect(errs[0].code).toBe("429"); + } + }); + + it("abort during sleep seals the turn aborted", async () => { + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + fake.abortOnCall(2); // abort on the 2nd sleep + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("aborted"); + // Two sleeps attempted; the 2nd aborted. + expect(fake.calls).toHaveLength(2); + // No terminal error emitted (it was an abort, not a give-up). + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + // One provider-retry before the aborted sleep (attempt 0). + const retries = events.filter((e) => e.type === "provider-retry"); + expect(retries).toHaveLength(2); + // The done event carries reason "aborted". + const done = events.find((e) => e.type === "done"); + if (done?.type === "done") { + expect(done.reason).toBe("aborted"); + } + }); + + it("omitting retry keeps the pre-retry behavior (backward-compatible)", async () => { + // A retryable error with no retry configured → ends the step as today. + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "overloaded", code: "429", retryable: true }, + }); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + // no retry field + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + }); + + it("retries a THROWN error (retryable-by-default when pre-content)", async () => { + // A thrown error (no retryable flag) before content is retried. + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + if (callCount <= 2) { + throw new Error("network blip"); + } + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(callCount).toBe(3); // 2 throws retried, 3rd succeeds + expect(fake.calls).toEqual([5_000, 10_000]); + expect(result.finishReason).toBe("stop"); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(2); + // Thrown errors have no code. + if (events[0]?.type === "provider-retry") { + expect(events[0].code).toBeUndefined(); + expect(events[0].message).toBe("network blip"); + } + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + }); + + it("does NOT retry a thrown error after content was emitted", async () => { + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("network blip"); + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(callCount).toBe(1); + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); + }); + + it("provider-retry events interleave correctly: error → retry-event → sleep → retry", async () => { + // Verify ordering: each provider-retry event comes BEFORE its sleep, + // and the successful content comes only after the last retry. + const { provider } = createRetryingProvider({ + errorCount: 2, + error: { message: "overloaded", code: "429", retryable: true }, + success: [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + const types = events.map((e) => e.type); + // turn-start, provider-retry(0), provider-retry(1), text-delta, step-complete, done + expect(types[0]).toBe("turn-start"); + const firstRetryIdx = types.indexOf("provider-retry"); + const textIdx = types.indexOf("text-delta"); + expect(firstRetryIdx).toBeGreaterThan(0); + expect(textIdx).toBeGreaterThan(firstRetryIdx); + // Both retries precede the text. + const retryCount = types.filter((t) => t === "provider-retry").length; + expect(retryCount).toBe(2); + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index f5d80d3..08f8459 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -6,12 +6,18 @@ import type { ProviderStreamOptions, Usage, } from "../contracts/provider.js"; -import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js"; +import type { + EventEmitter, + RetryStrategy, + RunTurnInput, + RunTurnResult, +} from "../contracts/runtime.js"; import type { ToolCall, ToolContract } from "../contracts/tool.js"; import { createStepDispatcher, type StepDispatcher } from "./dispatch.js"; import { doneEvent, errorEvent, + providerRetryEvent, reasoningDeltaEvent, stepCompleteEvent, textDeltaEvent, @@ -120,6 +126,8 @@ interface StepContext { readonly now: (() => number) | undefined; /** Per-turn provider options (model, systemPrompt, …) threaded to stream(). */ readonly providerOpts: ProviderStreamOptions | undefined; + /** Optional injected retry strategy (omit = no retry, backward-compatible). */ + readonly retry: RetryStrategy | undefined; } interface TimingState { @@ -249,12 +257,10 @@ function processEvent( case "finish": break; case "error": - if (event.code !== undefined) { - chunks.push({ type: "error", message: event.message, code: event.code }); - } else { - chunks.push({ type: "error", message: event.message }); - } - ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, event.message, event.code)); + // Handled by the retry loop in executeStep (not here): an error event + // is intercepted before processEvent so the step can decide whether to + // retry (suppressing the error) or give up (emit it). processEvent + // never receives an "error" event. break; } } @@ -314,34 +320,142 @@ async function executeStep(ctx: StepContext): Promise { // Swallow — D7. } - try { - const opts: ProviderStreamOptions = { - ...ctx.providerOpts, - ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), - }; - const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); - for await (const event of stream) { - if (ctx.signal.aborted) break; - processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes); - if (event.type === "usage") { - stepUsage = addUsage(stepUsage, event.usage); + // Retry loop: wrap provider.stream() consumption. Retries are ONLY + // attempted when no content was emitted yet this step (the safety + // invariant — never duplicate partial output). On a retryable error — + // either an EMITTED `error` ProviderEvent with `retryable === true`, OR a + // THROWN error (retryable-by-default when pre-content) — with !hadContent: + // ask retry.delayFor(attempt); if it returns a delay → emit a transient + // provider-retry AgentEvent, sleep via the injected retry.sleep (abortable), + // attempt++, re-call provider.stream(); if it returns undefined (budget + // exhausted) → give up. Non-retryable emitted errors (retryable === false or + // absent), errors after content, and the no-retry-configured case all fall + // through to "give up" — identical to the pre-retry behavior. + let hadContent = false; + let attempt = 0; + while (true) { + let errored = false; + let wasThrown = false; + let errorMessage: string | undefined; + let errorCode: string | undefined; + let errorRetryable: boolean | undefined; + let thrownErr: unknown; + + try { + const opts: ProviderStreamOptions = { + ...ctx.providerOpts, + ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), + }; + const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); + for await (const event of stream) { + if (ctx.signal.aborted) break; + if (event.type === "error") { + // Intercept: hold for the retry decision — don't push a chunk + // or emit yet (a successful retry would leave a stale error). + errored = true; + errorMessage = event.message; + errorCode = event.code; + errorRetryable = event.retryable; + break; + } + if ( + event.type === "text-delta" || + event.type === "reasoning-delta" || + event.type === "tool-call" || + event.type === "usage" + ) { + hadContent = true; + } + processEvent( + event, + chunks, + toolCalls, + dispatcher, + ctx, + stepSpan, + timing, + toolDispatchTimes, + ); + if (event.type === "usage") { + stepUsage = addUsage(stepUsage, event.usage); + } + if (event.type === "finish") { + finishReason = event.reason; + } } - if (event.type === "finish") { - finishReason = event.reason; + } catch (err) { + errored = true; + wasThrown = true; + errorMessage = err instanceof Error ? err.message : String(err); + errorCode = undefined; + errorRetryable = undefined; + thrownErr = err; + } + + // Abort (during stream) → stop; the runTurn loop seals aborted. + if (ctx.signal.aborted) { + break; + } + + // No error → step succeeded. + if (!errored) { + break; + } + + // Retryable? A thrown error is retryable-by-default when pre-content; + // an emitted error is retryable ONLY when `retryable === true` (absent + // or false → not retried, per the contract). + const isRetryable = wasThrown ? true : errorRetryable === true; + if (ctx.retry !== undefined && !hadContent && isRetryable) { + const delay = ctx.retry.delayFor(attempt); + if (delay !== undefined) { + // Emit the transient provider-retry event BEFORE the sleep so the + // UI shows "⚠ retrying in Ns…" immediately. Not persisted as a + // chat message — it never pollutes the prompt. + ctx.emit( + providerRetryEvent( + ctx.conversationId, + ctx.turnId, + attempt, + delay, + errorMessage ?? "", + errorCode, + ), + ); + // Abortable sleep. If the signal fires during sleep, the shell's + // sleep rejects — we catch it and break so the turn seals aborted. + try { + await ctx.retry.sleep(delay, ctx.signal); + } catch { + // Abort during sleep (or unexpected sleep failure). + } + if (ctx.signal.aborted) { + break; + } + attempt++; + continue; } + // delayFor returned undefined → budget exhausted → give up. + } + + // Give up: emit the suppressed error and end the step. This is the + // single emission point for a terminal provider error (non-retryable, + // post-content, budget-exhausted, or no-retry-configured). + const message = errorMessage ?? ""; + if (errorCode !== undefined) { + chunks.push({ type: "error", message, code: errorCode }); + } else { + chunks.push({ type: "error", message }); } - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - chunks.push({ type: "error", message }); - ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message)); + ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message, errorCode)); finishReason = "error"; - // Close step span with error try { - stepSpan?.end({ err }); + stepSpan?.end({ err: thrownErr ?? new Error(message) }); } catch { // Swallow — D7. } stepSpan = undefined; + break; } // Close timing spans: if no first token was seen, end ttft with firstToken: false @@ -524,6 +638,7 @@ export async function runTurn(input: RunTurnInput): Promise { cwd: input.cwd, now, providerOpts: input.providerOpts, + retry: input.retry, }); totalUsage = addUsage(totalUsage, stepResult.usage); diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts index fa8d9e9..aaafb76 100644 --- a/packages/session-orchestrator/src/index.ts +++ b/packages/session-orchestrator/src/index.ts @@ -12,6 +12,7 @@ export { conversationOpened, conversationStatusChanged, createCompactionService, + createRetryStrategy, createSessionOrchestrator, createWarmService, type EnqueueInput, @@ -34,8 +35,13 @@ export { } from "./orchestrator.js"; export { buildUserMessage, + cumulativeSleepMs, defaultDispatchPolicy, + delayFor, generateTurnId, + RETRY_BUDGET_MS, + RETRY_SCHEDULE_MS, + RETRY_TAIL_MS, resolveReasoningEffort, selectFirstProvider, } from "./pure.js"; diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index a533a16..b4d4b35 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -11,6 +11,7 @@ import type { ProviderEvent, ProviderStreamOptions, ReasoningEffort, + RetryStrategy, RunTurnInput, RunTurnResult, ToolContract, @@ -24,6 +25,7 @@ import { createMetricsAccumulator } from "./metrics.js"; import { buildUserMessage, defaultDispatchPolicy, + delayFor, generateTurnId, resolveModelName, resolveReasoningEffort, @@ -325,12 +327,45 @@ export interface SessionOrchestratorBundle { readonly activeConversations: ReadonlySet; } +/** + * The concrete retry strategy wired into every turn's `RunTurnInput.retry`. + * + * `delayFor` is the pure schedule (`5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, + * then repeat 30m until 8h cumulative scheduled sleep) — no I/O, no clock. + * `sleep` is the abortable I/O effect: a `setTimeout`-based promise that + * rejects when the turn's abort signal fires (so a retry in flight seals the + * turn `aborted`). The kernel imports no timer; this is the shell-provided I/O. + */ +export function createRetryStrategy(): RetryStrategy { + const sleep = (ms: number, signal: AbortSignal): Promise => { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new Error("aborted")); + return; + } + const timer = setTimeout(() => { + signal.removeEventListener("abort", onAbort); + resolve(); + }, ms); + const onAbort = () => { + clearTimeout(timer); + reject(new Error("aborted")); + }; + signal.addEventListener("abort", onAbort, { once: true }); + }); + }; + return { delayFor, sleep }; +} + export function createSessionOrchestrator( deps: SessionOrchestratorDeps, ): SessionOrchestratorBundle { const activeConversations = new Set(); const subscribers = new Map>(); const activeTurns = new Map(); + // One stateless retry strategy shared by every turn (delayFor is pure; sleep + // is a stateless setTimeout closure). Wired into each RunTurnInput.retry. + const retryStrategy = createRetryStrategy(); function emitToHub(conversationId: string, event: AgentEvent): void { const turn = activeTurns.get(conversationId); @@ -596,6 +631,7 @@ export function createSessionOrchestrator( turnId, signal: controller.signal, providerOpts, + retry: retryStrategy, ...(turnLogger !== undefined ? { logger: turnLogger } : {}), ...(effectiveCwd !== undefined ? { cwd: effectiveCwd } : {}), ...(deps.now !== undefined ? { now: deps.now } : {}), diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts index 9e5d3c4..2cbe15f 100644 --- a/packages/session-orchestrator/src/pure.test.ts +++ b/packages/session-orchestrator/src/pure.test.ts @@ -2,8 +2,13 @@ import type { ProviderContract } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; import { buildUserMessage, + cumulativeSleepMs, defaultDispatchPolicy, + delayFor, generateTurnId, + RETRY_BUDGET_MS, + RETRY_SCHEDULE_MS, + RETRY_TAIL_MS, resolveReasoningEffort, selectFirstProvider, } from "./pure.js"; @@ -100,3 +105,59 @@ describe("resolveReasoningEffort", () => { expect(resolveReasoningEffort(undefined, "max")).toBe("max"); }); }); + +describe("retry backoff schedule (delayFor)", () => { + it("emits the stepped head: 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m", () => { + expect(delayFor(0)).toBe(5_000); + expect(delayFor(1)).toBe(10_000); + expect(delayFor(2)).toBe(30_000); + expect(delayFor(3)).toBe(60_000); + expect(delayFor(4)).toBe(300_000); + expect(delayFor(5)).toBe(600_000); + expect(delayFor(6)).toBe(900_000); + expect(delayFor(7)).toBe(1_800_000); + }); + + it("repeats 30m after the head", () => { + expect(delayFor(8)).toBe(RETRY_TAIL_MS); + expect(delayFor(9)).toBe(RETRY_TAIL_MS); + expect(delayFor(20)).toBe(RETRY_TAIL_MS); + }); + + it("gives up (returns undefined) once cumulative sleep exceeds 8h", () => { + // Head sums to 3,705,000 ms; +1,800,000 per extra step. 8h = 28,800,000. + // attempt 20 cumulative = 3,705,000 + 13*1,800,000 = 27,105,000 (< 8h) → retry. + expect(delayFor(20)).toBe(RETRY_TAIL_MS); + // attempt 21 cumulative = 27,105,000 + 1,800,000 = 28,905,000 (> 8h) → stop. + expect(delayFor(21)).toBeUndefined(); + }); + + it("cumulativeSleepMs matches the sum of the schedule", () => { + expect(cumulativeSleepMs(0)).toBe(5_000); + expect(cumulativeSleepMs(1)).toBe(15_000); + expect(cumulativeSleepMs(7)).toBe(RETRY_SCHEDULE_MS.reduce((a, b) => a + b, 0)); + // 8h budget is 28,800,000 ms. + expect(RETRY_BUDGET_MS).toBe(8 * 60 * 60 * 1000); + // The last retry (attempt 20) keeps cumulative under budget. + expect(cumulativeSleepMs(20)).toBeLessThanOrEqual(RETRY_BUDGET_MS); + // The next (attempt 21) exceeds it. + expect(cumulativeSleepMs(21)).toBeGreaterThan(RETRY_BUDGET_MS); + }); + + it("the full schedule has 21 retries then stops", () => { + const schedule: number[] = []; + let attempt = 0; + while (true) { + const delay = delayFor(attempt); + if (delay === undefined) break; + schedule.push(delay); + attempt++; + } + expect(schedule).toHaveLength(21); + expect(schedule[0]).toBe(5_000); + expect(schedule.at(-1)).toBe(RETRY_TAIL_MS); + // 8 stepped head + 13 tail repeats. + expect(schedule.slice(0, 8)).toEqual([...RETRY_SCHEDULE_MS]); + expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true); + }); +}); diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts index 9a31e17..a028cbe 100644 --- a/packages/session-orchestrator/src/pure.ts +++ b/packages/session-orchestrator/src/pure.ts @@ -9,6 +9,53 @@ export function buildUserMessage(text: string): ChatMessage { return { role: "user", chunks: [{ type: "text", text }] }; } +// ── Provider-error retry backoff schedule ─────────────────────────────────── +// +// Pure, deterministic delay decision (no I/O, no clock) for retrying retryable +// provider errors (HTTP 429 / 5xx "overloaded"). The concrete `sleep` (I/O) +// is wired in the orchestrator; this owns only the policy. + +/** + * Stepped backoff schedule (ms): 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m. + * After the head is exhausted, {@link RETRY_TAIL_MS} (30m) repeats. + */ +export const RETRY_SCHEDULE_MS = [ + 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000, +] as const; + +/** Tail delay (ms) repeated after the stepped head: 30 minutes. */ +export const RETRY_TAIL_MS = 1_800_000; + +/** Cumulative scheduled-sleep budget (ms) after which retrying gives up: 8h. */ +export const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; + +/** + * Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). + * Pure — no I/O, no clock. + */ +export function cumulativeSleepMs(attempt: number): number { + let sum = 0; + for (let i = 0; i <= attempt; i++) { + sum += i < RETRY_SCHEDULE_MS.length ? (RETRY_SCHEDULE_MS[i] ?? RETRY_TAIL_MS) : RETRY_TAIL_MS; + } + return sum; +} + +/** + * Pure, deterministic delay decision for the retry strategy: given the + * 0-based attempt index, return the delay in ms to sleep before the next + * retry, or `undefined` to stop (cumulative budget exhausted). No I/O, no + * clock — fully testable. Matches the plan's schedule: + * `5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, then repeat 30m until 8h of + * cumulative scheduled sleep is reached, then give up. + */ +export function delayFor(attempt: number): number | undefined { + const scheduled = RETRY_SCHEDULE_MS[attempt]; + const delay = scheduled !== undefined ? scheduled : RETRY_TAIL_MS; + if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop + return delay; +} + /** * Resolve the reasoning-effort level for a turn: * per-turn override → persisted per-conversation value → default `"high"`. diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index bade977..eecd2f7 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -273,6 +273,7 @@ export type AgentEvent = | TurnUsageEvent | TurnStepCompleteEvent | TurnErrorEvent + | TurnProviderRetryEvent | TurnDoneEvent | TurnSealedEvent | TurnSteeringEvent; @@ -429,6 +430,31 @@ export interface TurnErrorEvent { readonly code?: string; } +/** + * A retryable provider error is being retried with backoff. Emitted once per + * scheduled retry, BEFORE the sleep, so the UI can show "⚠ Server overloaded — + * retrying in 5s…" immediately. TRANSIENT: emitted to the frontend but NOT + * persisted into the model's message history (it never pollutes the prompt). + * + * When the retry budget is exhausted, the existing `error` event is emitted and + * the turn seals — so the final failure is still a persisted error. `attempt` is + * 0-based (the Nth retry about to happen); `delayMs` is the scheduled sleep + * before that retry fires. + */ +export interface TurnProviderRetryEvent { + readonly type: "provider-retry"; + readonly conversationId: string; + readonly turnId: string; + /** 0-based: this is the Nth retry about to happen. */ + readonly attempt: number; + /** ms the client should expect to wait before the retry fires. */ + readonly delayMs: number; + /** The endpoint's error verbatim (e.g. "HTTP 429: {…overloaded_error…}"). */ + readonly message: string; + /** The HTTP code when known (e.g. "429"). */ + readonly code?: string; +} + /** The turn has completed (model finished generating). */ export interface TurnDoneEvent { readonly type: "done"; diff --git a/tasks.md b/tasks.md index 8a3343d..0a1c747 100644 --- a/tasks.md +++ b/tasks.md @@ -5,7 +5,43 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **1537 vitest** green. +`tsc -b` EXIT 0 · biome clean · **1574 vitest** green. + +## Retry with backoff on retryable provider errors (DONE) +When the upstream LLM API returns a retryable error (HTTP 429 / 5xx "overloaded"), +the kernel now retries `provider.stream()` with a stepped backoff, visibly, until +the 8h cumulative-sleep budget is exhausted — then emits the final error and +seals the turn. Retries fire ONLY when no content was emitted yet this step (the +safety invariant — never duplicate partial output). Plan: +`notes/retry-with-backoff-plan.md`; report: `reports/retry-with-backoff.md`. +- **Architecture (kernel hook + shell policy/I/O):** kernel provides the hook + (`RetryStrategy` contract + the retry loop in `runTurn`); the shell + (session-orchestrator) provides the policy (the schedule) + the I/O (an + abortable `setTimeout` sleep). Kernel imports no timer. `retry?` is optional + → omit = no retry (backward-compatible). +- **New transient `AgentEvent` variant** `provider-retry` (`@dispatch/wire`), + emitted once per scheduled retry BEFORE the sleep so the UI can show + "⚠ retrying in Ns…" immediately; NOT persisted to model history (never + pollutes the prompt). Final failure is still a persisted `error` + seal. +- **Schedule:** `5s,10s,30s,60s,5m,10m,15m,30m`, then repeat 30m until 8h of + cumulative scheduled sleep → ~21 retries then give up. Pure `delayFor(attempt)`. +- **Retry trigger:** emitted `error` with `retryable===true` → retry; + `retryable` false/absent → give up; a THROWN error → retryable-by-default + ONLY when pre-content. All gated on `!hadContent` (text/reasoning/tool-call/usage). +- [x] Verified: `tsc -b` EXIT 0, biome clean, **1574 vitest** pass (+16 new: 11 + kernel retry tests with an injected fake `sleep` + pure `delayFor` + stub + provider — zero `@dispatch/*` mocks; 5 pure schedule tests). Transports + unchanged — transport-ws forwards `AgentEvent` verbatim inside `chat.delta`; + transport-http is generic `JSON.stringify`. Unit-tested only — not yet + live-verified against a real 429. +- **Optional follow-up (roadmap):** the CLI renderer + (`packages/cli/src/render.ts` `renderEvent`) has no `default` case and silently + drops `provider-retry` — the yellow-warning/countdown target is the web + frontend, not the CLI, so non-blocking. Optional: render `provider-retry` in + the CLI as a stderr warning + `delayMs` countdown. +- **Frontend handoff (5d3f, separate repo `../dispatch-web`):** render + `provider-retry` as a yellow warning system-message bubble showing `message` + (+`code`) with the `delayMs` countdown. ## Per-edit LSP diagnostics auto-append (DONE) After a successful `edit_file`, the extension now calls LSP `getDiagnostics` on the -- cgit v1.2.3