summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-25 18:36:08 +0900
committerAdam Malczewski <[email protected]>2026-06-25 18:36:08 +0900
commitde022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc (patch)
tree041dcb1017e544a405526443cb578baa974bec0e /packages/kernel
parentfc1c3a54c3075990ec0dd0f97901bd46fe142923 (diff)
parent649fc4f66f40f7743683546f81d3320e7394e597 (diff)
downloaddispatch-de022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc.tar.gz
dispatch-de022cee7ac66c95d7ed6a35d4e00f8e2d92cbbc.zip
Merge branch 'dev' into feature/ssh-support
Brings dev's retry-with-backoff (the transient `provider-retry` AgentEvent the web frontend consumes) + the LSP-dead-server per-edit-hang fix into the SSH feature branch, alongside the SSH waves 0-5c. All code files auto-merged cleanly (run-turn.ts, orchestrator.ts, runtime.ts, wire/index.ts, tool-edit-file/extension.ts, run-turn.test.ts — both computerId threading and retry-with-backoff coexist). Only tasks.md conflicted (status section — orchestrator-resolved; both feature sections kept). Verified post-merge: tsc -b EXIT 0, biome clean (391 files), 1730 vitest pass +6 sshd-integration skipped (was 1690; +40 from dev's retry/LSP tests). Wire dist rebuilt so the FE can re-sync the pinned @dispatch/wire dep and pick up BOTH provider-retry AND the SSH Computer/defaultComputerId types. No merge or push (into dev or otherwise).
Diffstat (limited to 'packages/kernel')
-rw-r--r--packages/kernel/src/contracts/events.ts1
-rw-r--r--packages/kernel/src/contracts/index.ts2
-rw-r--r--packages/kernel/src/contracts/runtime.ts44
-rw-r--r--packages/kernel/src/runtime/events.ts14
-rw-r--r--packages/kernel/src/runtime/index.ts1
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts535
-rw-r--r--packages/kernel/src/runtime/run-turn.ts167
7 files changed, 738 insertions, 26 deletions
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts
index 6c9652d..dca34c2 100644
--- a/packages/kernel/src/contracts/events.ts
+++ b/packages/kernel/src/contracts/events.ts
@@ -11,6 +11,7 @@ export type {
TurnDoneEvent,
TurnErrorEvent,
TurnInputEvent,
+ TurnProviderRetryEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index c67607b..f3e5bca 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -40,6 +40,7 @@ export type {
TurnDoneEvent,
TurnErrorEvent,
TurnInputEvent,
+ TurnProviderRetryEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
@@ -109,6 +110,7 @@ export type {
export type {
EventEmitter,
FinishReason,
+ RetryStrategy,
RunTurnInput,
RunTurnResult,
} from "./runtime.js";
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts
index 03e62c2..dc74c84 100644
--- a/packages/kernel/src/contracts/runtime.ts
+++ b/packages/kernel/src/contracts/runtime.ts
@@ -140,6 +140,22 @@ export interface RunTurnInput {
* double-persist them.
*/
readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void;
+
+ /**
+ * Optional injected retry strategy for retryable provider errors (e.g. HTTP
+ * 429 / 5xx "overloaded"). When omitted, a retryable error ends the step
+ * exactly as before (backward-compatible). When provided, the runtime wraps
+ * `provider.stream()` consumption in a retry loop: on a retryable error
+ * (an emitted `error` ProviderEvent with `retryable === true`, OR a thrown
+ * error) — ONLY when no content was emitted yet this step (the safety
+ * invariant — never duplicate partial output) — it asks `retry.delayFor`
+ * for a delay, emits a transient `provider-retry` AgentEvent, sleeps via the
+ * injected `retry.sleep` (abortable), and re-calls `provider.stream()`.
+ *
+ * Injected (not ambient): the kernel imports no timer and owns no schedule.
+ * Mirrors the `now`/`logger` injection pattern — optional + backward-compatible.
+ */
+ readonly retry?: RetryStrategy;
}
/**
@@ -156,3 +172,31 @@ export interface RunTurnResult {
/** Why the turn ended. */
readonly finishReason: FinishReason;
}
+
+/**
+ * Injected retry strategy for retryable provider errors (e.g. HTTP 429 / 5xx).
+ *
+ * The kernel provides the HOOK (this contract + the retry loop in `runTurn`);
+ * the shell (session-orchestrator) provides the POLICY (the concrete schedule)
+ * and the I/O (the actual sleep). The kernel imports no timer — `sleep` is an
+ * injected effect so the runtime stays pure and deterministic in tests.
+ *
+ * Retries are ONLY attempted when NO content was emitted yet this step (the
+ * safety invariant — never duplicate partial output). When omitted on
+ * `RunTurnInput`, no retry happens (backward-compatible: a retryable error ends
+ * the step exactly as before).
+ */
+export interface RetryStrategy {
+ /**
+ * Pure, deterministic decision: given the 0-based attempt index, return the
+ * delay in ms to sleep before the next retry, or `undefined` to stop (budget
+ * exhausted). No I/O, no clock — fully testable.
+ */
+ readonly delayFor: (attempt: number) => number | undefined;
+ /**
+ * Injected effect: actually sleep for the given ms. Must honor the abort
+ * signal — reject when aborted so the turn seals `aborted`. The kernel
+ * imports no timer; the shell provides a `setTimeout`-based implementation.
+ */
+ readonly sleep: (ms: number, signal: AbortSignal) => Promise<void>;
+}
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts
index b194577..5805e28 100644
--- a/packages/kernel/src/runtime/events.ts
+++ b/packages/kernel/src/runtime/events.ts
@@ -164,3 +164,17 @@ export function errorEvent(
}
return { type: "error", conversationId, turnId, message };
}
+
+export function providerRetryEvent(
+ conversationId: string,
+ turnId: string,
+ attempt: number,
+ delayMs: number,
+ message: string,
+ code?: string,
+): AgentEvent {
+ if (code !== undefined) {
+ return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message, code };
+ }
+ return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message };
+}
diff --git a/packages/kernel/src/runtime/index.ts b/packages/kernel/src/runtime/index.ts
index e1156e3..e0dd656 100644
--- a/packages/kernel/src/runtime/index.ts
+++ b/packages/kernel/src/runtime/index.ts
@@ -2,6 +2,7 @@ export type { StepDispatcher } from "./dispatch.js";
export { createStepDispatcher, executeToolCall } from "./dispatch.js";
export {
errorEvent,
+ providerRetryEvent,
reasoningDeltaEvent,
textDeltaEvent,
toolCallEvent,
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 08d3055..a9fc3d9 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -2886,4 +2886,539 @@ describe("runTurn", () => {
expect(drainCallCount).toBe(MAX_STEPS - 1);
});
});
+
+ // ── Retry with backoff ──────────────────────────────────────────────────
+ //
+ // PURE tests: a fake `sleep` (records calls, resolves instantly, can abort
+ // on a chosen call) + a pure `delayFor` (the canonical schedule + 8h budget).
+ // A stub `ProviderContract` whose `stream` yields a retryable error N times
+ // then a finish. ZERO mocks of `@dispatch/*` modules — effects injected.
+
+ /** The canonical backoff schedule (matches the orchestrator's concrete strategy). */
+ const RETRY_SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000];
+ const RETRY_TAIL_MS = 1_800_000; // 30m
+ const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; // 8h
+
+ /** Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). */
+ function cumulativeSleepMs(attempt: number): number {
+ let sum = 0;
+ for (let i = 0; i <= attempt; i++) {
+ sum += i < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[i] : RETRY_TAIL_MS;
+ }
+ return sum;
+ }
+
+ /** Pure, deterministic delay decision (no I/O, no clock). */
+ function delayFor(attempt: number): number | undefined {
+ const delay = attempt < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[attempt] : RETRY_TAIL_MS;
+ if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop
+ return delay;
+ }
+
+ /** The full schedule delayFor would emit (until budget exhausted). */
+ function fullSchedule(): number[] {
+ const result: number[] = [];
+ let attempt = 0;
+ while (true) {
+ const delay = delayFor(attempt);
+ if (delay === undefined) break;
+ result.push(delay);
+ attempt++;
+ }
+ return result;
+ }
+
+ /**
+ * Fake, controllable `sleep`: records every call's delay, resolves
+ * instantly (no real waiting), and can abort the controller on a chosen
+ * 1-based call index to simulate "abort during sleep".
+ */
+ function createFakeSleep(controller: AbortController): {
+ sleep: (ms: number, signal: AbortSignal) => Promise<void>;
+ calls: number[];
+ abortOnCall: (n: number) => void;
+ } {
+ const calls: number[] = [];
+ let abortAt: number | undefined;
+ const sleep = async (ms: number, _signal: AbortSignal): Promise<void> => {
+ calls.push(ms);
+ if (abortAt !== undefined && calls.length === abortAt) {
+ controller.abort();
+ throw new Error("aborted");
+ }
+ // Otherwise resolve instantly (no real waiting).
+ };
+ return {
+ sleep,
+ calls,
+ abortOnCall: (n: number) => {
+ abortAt = n;
+ },
+ };
+ }
+
+ /** A provider that yields a retryable error `errorCount` times, then success. */
+ function createRetryingProvider(opts: {
+ errorCount: number;
+ error?: { message: string; code?: string; retryable?: boolean };
+ success?: ProviderEvent[];
+ }): { provider: ProviderContract; streamCalls: { value: number } } {
+ const streamCalls = { value: 0 };
+ const error: ProviderEvent = {
+ type: "error",
+ message: opts.error?.message ?? "overloaded",
+ ...(opts.error?.code !== undefined ? { code: opts.error.code } : {}),
+ ...(opts.error?.retryable !== undefined ? { retryable: opts.error.retryable } : {}),
+ };
+ const success = opts.success ?? [
+ { type: "text-delta", delta: "hi" },
+ { type: "finish", reason: "stop" },
+ ];
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = streamCalls.value++;
+ return (async function* () {
+ if (idx < opts.errorCount) {
+ yield error;
+ return;
+ }
+ for (const event of success) yield event;
+ })();
+ },
+ };
+ return { provider, streamCalls };
+ }
+
+ describe("retry with backoff", () => {
+ it("retries a retryable emitted error on schedule then succeeds", async () => {
+ const { provider } = createRetryingProvider({
+ errorCount: 3,
+ error: { message: "HTTP 429: overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("stop");
+ // 3 retries: 5s, 10s, 30s.
+ expect(fake.calls).toEqual([5_000, 10_000, 30_000]);
+ // 3 provider-retry events (one per sleep), then the successful text.
+ const retryEvents = events.filter((e) => e.type === "provider-retry");
+ expect(retryEvents).toHaveLength(3);
+ if (retryEvents[0]?.type === "provider-retry") {
+ expect(retryEvents[0].attempt).toBe(0);
+ expect(retryEvents[0].delayMs).toBe(5_000);
+ expect(retryEvents[0].message).toBe("HTTP 429: overloaded");
+ expect(retryEvents[0].code).toBe("429");
+ expect(retryEvents[0].conversationId).toBe("conv-1");
+ expect(retryEvents[0].turnId).toBe("turn-1");
+ }
+ if (retryEvents[1]?.type === "provider-retry") {
+ expect(retryEvents[1].attempt).toBe(1);
+ expect(retryEvents[1].delayMs).toBe(10_000);
+ }
+ if (retryEvents[2]?.type === "provider-retry") {
+ expect(retryEvents[2].attempt).toBe(2);
+ expect(retryEvents[2].delayMs).toBe(30_000);
+ }
+ // The error was suppressed (no error event emitted — retry succeeded).
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ // The successful content still streams.
+ const deltas = events.filter((e) => e.type === "text-delta");
+ expect(deltas).toHaveLength(1);
+ });
+
+ it("sleep is called with the full schedule [5s,10s,30s,60s,5m,10m,15m,30m,30m…]", async () => {
+ // Provider errors forever → retries until budget exhausted → gives up.
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ // Budget exhausted → give up → error.
+ expect(result.finishReason).toBe("error");
+
+ // The sleep schedule matches the pure delayFor output exactly.
+ expect(fake.calls).toEqual(fullSchedule());
+
+ // Head of the schedule (the 8 stepped delays).
+ expect(fake.calls.slice(0, 8)).toEqual([
+ 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000,
+ ]);
+ // Tail repeats 30m.
+ expect(fake.calls[8]).toBe(1_800_000);
+ expect(fake.calls.at(-1)).toBe(1_800_000);
+
+ // 8h cumulative budget cap: head (3705s) + 13×30m = ~7h31m, then stop.
+ // 21 retries (attempts 0..20), then delayFor(21) → undefined → give up.
+ expect(fake.calls).toHaveLength(21);
+ const totalSlept = fake.calls.reduce((a, b) => a + b, 0);
+ expect(totalSlept).toBeLessThanOrEqual(RETRY_BUDGET_MS);
+ expect(totalSlept).toBe(3_705_000 + 13 * 1_800_000); // 27_105_000
+
+ // One provider-retry per sleep, plus a final error (give-up).
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(21);
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ const errEvt = events.find((e) => e.type === "error");
+ if (errEvt?.type === "error") {
+ expect(errEvt.message).toBe("overloaded");
+ expect(errEvt.code).toBe("429");
+ }
+ });
+
+ it("does NOT retry after content was emitted (safety invariant)", async () => {
+ // Provider yields text (content) THEN a retryable error. Because content
+ // was emitted, retrying is unsafe (would duplicate partial output).
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ yield {
+ type: "error",
+ message: "overloaded",
+ code: "429",
+ retryable: true,
+ } as ProviderEvent;
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ // No retries: stream called exactly once.
+ expect(callCount).toBe(1);
+ expect(fake.calls).toHaveLength(0);
+ // The error is emitted (give-up) and partial content preserved.
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1);
+ });
+
+ it("does NOT retry a non-retryable emitted error (retryable: false)", async () => {
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "bad request", code: "400", retryable: false },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ });
+
+ it("does NOT retry a non-retryable emitted error (retryable absent)", async () => {
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "bad request", code: "400" }, // no retryable field
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ });
+
+ it("give-up emits the final error when budget is exhausted", async () => {
+ // Custom delayFor that allows exactly 1 retry then stops.
+ const shortDelayFor = (attempt: number): number | undefined =>
+ attempt === 0 ? 100 : undefined;
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor: shortDelayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("error");
+ expect(fake.calls).toEqual([100]); // one retry, then give up
+ // One provider-retry (attempt 0), then the final error.
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(1);
+ const errs = events.filter((e) => e.type === "error");
+ expect(errs).toHaveLength(1);
+ if (errs[0]?.type === "error") {
+ expect(errs[0].message).toBe("overloaded");
+ expect(errs[0].code).toBe("429");
+ }
+ });
+
+ it("abort during sleep seals the turn aborted", async () => {
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+ fake.abortOnCall(2); // abort on the 2nd sleep
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("aborted");
+ // Two sleeps attempted; the 2nd aborted.
+ expect(fake.calls).toHaveLength(2);
+ // No terminal error emitted (it was an abort, not a give-up).
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ // One provider-retry before the aborted sleep (attempt 0).
+ const retries = events.filter((e) => e.type === "provider-retry");
+ expect(retries).toHaveLength(2);
+ // The done event carries reason "aborted".
+ const done = events.find((e) => e.type === "done");
+ if (done?.type === "done") {
+ expect(done.reason).toBe("aborted");
+ }
+ });
+
+ it("omitting retry keeps the pre-retry behavior (backward-compatible)", async () => {
+ // A retryable error with no retry configured → ends the step as today.
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ // no retry field
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ });
+
+ it("retries a THROWN error (retryable-by-default when pre-content)", async () => {
+ // A thrown error (no retryable flag) before content is retried.
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ if (callCount <= 2) {
+ throw new Error("network blip");
+ }
+ yield { type: "text-delta", delta: "hi" } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(callCount).toBe(3); // 2 throws retried, 3rd succeeds
+ expect(fake.calls).toEqual([5_000, 10_000]);
+ expect(result.finishReason).toBe("stop");
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(2);
+ // Thrown errors have no code.
+ if (events[0]?.type === "provider-retry") {
+ expect(events[0].code).toBeUndefined();
+ expect(events[0].message).toBe("network blip");
+ }
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ });
+
+ it("does NOT retry a thrown error after content was emitted", async () => {
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ throw new Error("network blip");
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(callCount).toBe(1);
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1);
+ });
+
+ it("provider-retry events interleave correctly: error → retry-event → sleep → retry", async () => {
+ // Verify ordering: each provider-retry event comes BEFORE its sleep,
+ // and the successful content comes only after the last retry.
+ const { provider } = createRetryingProvider({
+ errorCount: 2,
+ error: { message: "overloaded", code: "429", retryable: true },
+ success: [
+ { type: "text-delta", delta: "ok" },
+ { type: "finish", reason: "stop" },
+ ],
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ const types = events.map((e) => e.type);
+ // turn-start, provider-retry(0), provider-retry(1), text-delta, step-complete, done
+ expect(types[0]).toBe("turn-start");
+ const firstRetryIdx = types.indexOf("provider-retry");
+ const textIdx = types.indexOf("text-delta");
+ expect(firstRetryIdx).toBeGreaterThan(0);
+ expect(textIdx).toBeGreaterThan(firstRetryIdx);
+ // Both retries precede the text.
+ const retryCount = types.filter((t) => t === "provider-retry").length;
+ expect(retryCount).toBe(2);
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 940c77f..ac87a1f 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -6,12 +6,18 @@ import type {
ProviderStreamOptions,
Usage,
} from "../contracts/provider.js";
-import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js";
+import type {
+ EventEmitter,
+ RetryStrategy,
+ RunTurnInput,
+ RunTurnResult,
+} from "../contracts/runtime.js";
import type { ToolCall, ToolContract } from "../contracts/tool.js";
import { createStepDispatcher, type StepDispatcher } from "./dispatch.js";
import {
doneEvent,
errorEvent,
+ providerRetryEvent,
reasoningDeltaEvent,
stepCompleteEvent,
textDeltaEvent,
@@ -121,6 +127,8 @@ interface StepContext {
readonly now: (() => number) | undefined;
/** Per-turn provider options (model, systemPrompt, …) threaded to stream(). */
readonly providerOpts: ProviderStreamOptions | undefined;
+ /** Optional injected retry strategy (omit = no retry, backward-compatible). */
+ readonly retry: RetryStrategy | undefined;
}
interface TimingState {
@@ -250,12 +258,10 @@ function processEvent(
case "finish":
break;
case "error":
- if (event.code !== undefined) {
- chunks.push({ type: "error", message: event.message, code: event.code });
- } else {
- chunks.push({ type: "error", message: event.message });
- }
- ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, event.message, event.code));
+ // Handled by the retry loop in executeStep (not here): an error event
+ // is intercepted before processEvent so the step can decide whether to
+ // retry (suppressing the error) or give up (emit it). processEvent
+ // never receives an "error" event.
break;
}
}
@@ -316,34 +322,142 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
// Swallow — D7.
}
- try {
- const opts: ProviderStreamOptions = {
- ...ctx.providerOpts,
- ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}),
- };
- const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
- for await (const event of stream) {
- if (ctx.signal.aborted) break;
- processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes);
- if (event.type === "usage") {
- stepUsage = addUsage(stepUsage, event.usage);
+ // Retry loop: wrap provider.stream() consumption. Retries are ONLY
+ // attempted when no content was emitted yet this step (the safety
+ // invariant — never duplicate partial output). On a retryable error —
+ // either an EMITTED `error` ProviderEvent with `retryable === true`, OR a
+ // THROWN error (retryable-by-default when pre-content) — with !hadContent:
+ // ask retry.delayFor(attempt); if it returns a delay → emit a transient
+ // provider-retry AgentEvent, sleep via the injected retry.sleep (abortable),
+ // attempt++, re-call provider.stream(); if it returns undefined (budget
+ // exhausted) → give up. Non-retryable emitted errors (retryable === false or
+ // absent), errors after content, and the no-retry-configured case all fall
+ // through to "give up" — identical to the pre-retry behavior.
+ let hadContent = false;
+ let attempt = 0;
+ while (true) {
+ let errored = false;
+ let wasThrown = false;
+ let errorMessage: string | undefined;
+ let errorCode: string | undefined;
+ let errorRetryable: boolean | undefined;
+ let thrownErr: unknown;
+
+ try {
+ const opts: ProviderStreamOptions = {
+ ...ctx.providerOpts,
+ ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}),
+ };
+ const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
+ for await (const event of stream) {
+ if (ctx.signal.aborted) break;
+ if (event.type === "error") {
+ // Intercept: hold for the retry decision — don't push a chunk
+ // or emit yet (a successful retry would leave a stale error).
+ errored = true;
+ errorMessage = event.message;
+ errorCode = event.code;
+ errorRetryable = event.retryable;
+ break;
+ }
+ if (
+ event.type === "text-delta" ||
+ event.type === "reasoning-delta" ||
+ event.type === "tool-call" ||
+ event.type === "usage"
+ ) {
+ hadContent = true;
+ }
+ processEvent(
+ event,
+ chunks,
+ toolCalls,
+ dispatcher,
+ ctx,
+ stepSpan,
+ timing,
+ toolDispatchTimes,
+ );
+ if (event.type === "usage") {
+ stepUsage = addUsage(stepUsage, event.usage);
+ }
+ if (event.type === "finish") {
+ finishReason = event.reason;
+ }
}
- if (event.type === "finish") {
- finishReason = event.reason;
+ } catch (err) {
+ errored = true;
+ wasThrown = true;
+ errorMessage = err instanceof Error ? err.message : String(err);
+ errorCode = undefined;
+ errorRetryable = undefined;
+ thrownErr = err;
+ }
+
+ // Abort (during stream) → stop; the runTurn loop seals aborted.
+ if (ctx.signal.aborted) {
+ break;
+ }
+
+ // No error → step succeeded.
+ if (!errored) {
+ break;
+ }
+
+ // Retryable? A thrown error is retryable-by-default when pre-content;
+ // an emitted error is retryable ONLY when `retryable === true` (absent
+ // or false → not retried, per the contract).
+ const isRetryable = wasThrown ? true : errorRetryable === true;
+ if (ctx.retry !== undefined && !hadContent && isRetryable) {
+ const delay = ctx.retry.delayFor(attempt);
+ if (delay !== undefined) {
+ // Emit the transient provider-retry event BEFORE the sleep so the
+ // UI shows "⚠ retrying in Ns…" immediately. Not persisted as a
+ // chat message — it never pollutes the prompt.
+ ctx.emit(
+ providerRetryEvent(
+ ctx.conversationId,
+ ctx.turnId,
+ attempt,
+ delay,
+ errorMessage ?? "",
+ errorCode,
+ ),
+ );
+ // Abortable sleep. If the signal fires during sleep, the shell's
+ // sleep rejects — we catch it and break so the turn seals aborted.
+ try {
+ await ctx.retry.sleep(delay, ctx.signal);
+ } catch {
+ // Abort during sleep (or unexpected sleep failure).
+ }
+ if (ctx.signal.aborted) {
+ break;
+ }
+ attempt++;
+ continue;
}
+ // delayFor returned undefined → budget exhausted → give up.
+ }
+
+ // Give up: emit the suppressed error and end the step. This is the
+ // single emission point for a terminal provider error (non-retryable,
+ // post-content, budget-exhausted, or no-retry-configured).
+ const message = errorMessage ?? "";
+ if (errorCode !== undefined) {
+ chunks.push({ type: "error", message, code: errorCode });
+ } else {
+ chunks.push({ type: "error", message });
}
- } catch (err) {
- const message = err instanceof Error ? err.message : String(err);
- chunks.push({ type: "error", message });
- ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message));
+ ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message, errorCode));
finishReason = "error";
- // Close step span with error
try {
- stepSpan?.end({ err });
+ stepSpan?.end({ err: thrownErr ?? new Error(message) });
} catch {
// Swallow — D7.
}
stepSpan = undefined;
+ break;
}
// Close timing spans: if no first token was seen, end ttft with firstToken: false
@@ -527,6 +641,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
computerId: input.computerId,
now,
providerOpts: input.providerOpts,
+ retry: input.retry,
});
totalUsage = addUsage(totalUsage, stepResult.usage);