summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
Diffstat (limited to 'packages')
-rw-r--r--packages/cli/src/args.test.ts68
-rw-r--r--packages/cli/src/args.ts22
-rw-r--r--packages/cli/src/http.test.ts30
-rw-r--r--packages/cli/src/http.ts2
-rw-r--r--packages/cli/src/main.ts19
-rw-r--r--packages/kernel/src/contracts/events.ts1
-rw-r--r--packages/kernel/src/contracts/index.ts2
-rw-r--r--packages/kernel/src/contracts/runtime.ts44
-rw-r--r--packages/kernel/src/runtime/events.ts14
-rw-r--r--packages/kernel/src/runtime/index.ts1
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts535
-rw-r--r--packages/kernel/src/runtime/run-turn.ts167
-rw-r--r--packages/lsp/src/aggregate.test.ts141
-rw-r--r--packages/lsp/src/aggregate.ts80
-rw-r--r--packages/lsp/src/client.test.ts146
-rw-r--r--packages/lsp/src/client.ts153
-rw-r--r--packages/lsp/src/diagnostics.ts2
-rw-r--r--packages/lsp/src/extension.ts54
-rw-r--r--packages/lsp/src/manager.test.ts88
-rw-r--r--packages/lsp/src/manager.ts13
-rw-r--r--packages/lsp/src/rpc.test.ts26
-rw-r--r--packages/lsp/src/rpc.ts29
-rw-r--r--packages/lsp/src/tool.test.ts97
-rw-r--r--packages/lsp/src/tool.ts43
-rw-r--r--packages/session-orchestrator/src/index.ts6
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts36
-rw-r--r--packages/session-orchestrator/src/pure.test.ts61
-rw-r--r--packages/session-orchestrator/src/pure.ts47
-rw-r--r--packages/tool-edit-file/src/extension.ts5
-rw-r--r--packages/wire/src/index.ts26
30 files changed, 1854 insertions, 104 deletions
diff --git a/packages/cli/src/args.test.ts b/packages/cli/src/args.test.ts
index 3d07c96..e613f31 100644
--- a/packages/cli/src/args.test.ts
+++ b/packages/cli/src/args.test.ts
@@ -254,6 +254,41 @@ describe("parseArgs", () => {
});
});
+ it("parses 'list' with --workspace", () => {
+ expect(parseArgs(["list", "--workspace", "proj"], { defaultServer })).toEqual({
+ kind: "list",
+ server: "http://localhost:24203",
+ workspaceId: "proj",
+ all: false,
+ });
+ });
+
+ it("parses 'list' with -w shorthand", () => {
+ const result = parseArgs(["list", "-w", "ws"], { defaultServer });
+ expect(result.kind).toBe("list");
+ if (result.kind === "list") expect(result.workspaceId).toBe("ws");
+ });
+
+ it("parses 'list' with --workspace, --status, and a prefix together", () => {
+ const result = parseArgs(["list", "abc", "--status", "active", "--workspace", "proj"], {
+ defaultServer,
+ });
+ expect(result).toEqual({
+ kind: "list",
+ server: "http://localhost:24203",
+ query: "abc",
+ status: "active",
+ workspaceId: "proj",
+ all: false,
+ });
+ });
+
+ it("errors when --workspace has no value (list)", () => {
+ const result = parseArgs(["list", "--workspace"], { defaultServer });
+ expect(result.kind).toBe("error");
+ if (result.kind === "error") expect(result.message).toContain("--workspace requires a value");
+ });
+
it("parses 'list' with --all", () => {
expect(parseArgs(["list", "--all"], { defaultServer })).toEqual({
kind: "list",
@@ -320,11 +355,31 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
+ queue: false,
+ open: false,
+ });
+ });
+
+ it("parses 'send' with --file", () => {
+ expect(parseArgs(["send", "deadbeef", "--file", "foo.txt"], { defaultServer })).toEqual({
+ kind: "send",
+ server: "http://localhost:24203",
+ conversationId: "deadbeef",
+ text: undefined,
+ file: "foo.txt",
queue: false,
open: false,
});
});
+ it("parses 'send' with both --text and --file", () => {
+ const result = parseArgs(["send", "deadbeef", "--text", "hi", "--file", "f.txt"], {
+ defaultServer,
+ });
+ expect(result).toMatchObject({ kind: "send", text: "hi", file: "f.txt" });
+ });
+
it("parses 'send' with --queue", () => {
const result = parseArgs(["send", "deadbeef", "--text", "hi", "--queue"], {
defaultServer,
@@ -334,6 +389,7 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
queue: true,
open: false,
});
@@ -348,6 +404,7 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
queue: false,
open: true,
});
@@ -363,6 +420,7 @@ describe("parseArgs", () => {
server: "http://localhost:24203",
conversationId: "deadbeef",
text: "hi",
+ file: undefined,
queue: false,
open: false,
cwd: "/tmp",
@@ -370,10 +428,10 @@ describe("parseArgs", () => {
});
});
- it("requires --text", () => {
+ it("errors when --text and --file are both missing", () => {
const result = parseArgs(["send", "deadbeef"], { defaultServer });
expect(result.kind).toBe("error");
- if (result.kind === "error") expect(result.message).toContain("--text");
+ if (result.kind === "error") expect(result.message).toContain("--text or --file");
});
it("requires a conversation id", () => {
@@ -386,6 +444,12 @@ describe("parseArgs", () => {
const result = parseArgs(["send", "deadbeef", "--text"], { defaultServer });
expect(result.kind).toBe("error");
});
+
+ it("errors when --file has no value", () => {
+ const result = parseArgs(["send", "deadbeef", "--file"], { defaultServer });
+ expect(result.kind).toBe("error");
+ if (result.kind === "error") expect(result.message).toContain("--file requires a value");
+ });
});
describe("open", () => {
diff --git a/packages/cli/src/args.ts b/packages/cli/src/args.ts
index 8a63777..74cc56a 100644
--- a/packages/cli/src/args.ts
+++ b/packages/cli/src/args.ts
@@ -33,6 +33,7 @@ export type ParsedCommand =
readonly server: string;
readonly query?: string;
readonly status?: string;
+ readonly workspaceId?: string;
readonly all: boolean;
}
| { readonly kind: "compact"; readonly server: string; readonly conversationId: string }
@@ -42,7 +43,8 @@ export type ParsedCommand =
readonly kind: "send";
readonly server: string;
readonly conversationId: string;
- readonly text: string;
+ readonly text?: string | undefined;
+ readonly file?: string | undefined;
readonly queue: boolean;
readonly open: boolean;
readonly cwd?: string;
@@ -84,6 +86,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
let server = opts.defaultServer;
let query: string | undefined;
let status: string | undefined;
+ let workspaceId: string | undefined;
let all = false;
for (let i = 1; i < argv.length; i++) {
const arg = argv[i] as string;
@@ -93,6 +96,9 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
} else if (arg === "--status") {
if (i + 1 >= argv.length) return { kind: "error", message: "--status requires a value" };
status = argv[++i];
+ } else if (arg === "--workspace" || arg === "-w") {
+ if (i + 1 >= argv.length) return { kind: "error", message: "--workspace requires a value" };
+ workspaceId = argv[++i];
} else if (arg === "--all") {
all = true;
} else if (arg.startsWith("--")) {
@@ -108,6 +114,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
server,
...(query !== undefined && { query }),
...(status !== undefined && { status }),
+ ...(workspaceId !== undefined && { workspaceId }),
all,
};
}
@@ -204,6 +211,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
let server = opts.defaultServer;
let conversationId: string | undefined;
let text: string | undefined;
+ let file: string | undefined;
let queue = false;
let open = false;
let cwd: string | undefined;
@@ -221,6 +229,10 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
if (i + 1 >= argv.length) return { kind: "error", message: "--text requires a value" };
text = argv[++i];
break;
+ case "--file":
+ if (i + 1 >= argv.length) return { kind: "error", message: "--file requires a value" };
+ file = argv[++i];
+ break;
case "--queue":
queue = true;
break;
@@ -263,8 +275,11 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
if (conversationId === undefined) {
return { kind: "error", message: "'send' requires a conversation id" };
}
- if (text === undefined) {
- return { kind: "error", message: "'send' requires --text" };
+ if (!text && !file) {
+ return {
+ kind: "error",
+ message: "At least one of --text or --file is required for 'send'",
+ };
}
return {
@@ -272,6 +287,7 @@ export function parseArgs(argv: readonly string[], opts: ParseOpts): ParsedComma
server,
conversationId,
text,
+ file,
queue,
open,
...(cwd !== undefined && { cwd }),
diff --git a/packages/cli/src/http.test.ts b/packages/cli/src/http.test.ts
index 2aa61e9..ab39813 100644
--- a/packages/cli/src/http.test.ts
+++ b/packages/cli/src/http.test.ts
@@ -289,6 +289,36 @@ describe("fetchConversations", () => {
expect(calledUrl).toBe("http://localhost:24203/conversations?q=abc+def");
});
+ it("appends ?workspaceId=<value> when a workspaceId is given", async () => {
+ let calledUrl: string | undefined;
+ const fakeFetch = (async (url: string | URL | Request): Promise<Response> => {
+ calledUrl = String(url);
+ return new Response(JSON.stringify({ conversations: [] }), { status: 200 });
+ }) as unknown as typeof fetch;
+
+ await fetchConversations(
+ { fetchImpl: fakeFetch },
+ { server: "http://localhost:24203", workspaceId: "proj" },
+ );
+ expect(calledUrl).toBe("http://localhost:24203/conversations?workspaceId=proj");
+ });
+
+ it("combines ?status= and ?workspaceId= when both are given", async () => {
+ let calledUrl: string | undefined;
+ const fakeFetch = (async (url: string | URL | Request): Promise<Response> => {
+ calledUrl = String(url);
+ return new Response(JSON.stringify({ conversations: [] }), { status: 200 });
+ }) as unknown as typeof fetch;
+
+ await fetchConversations(
+ { fetchImpl: fakeFetch },
+ { server: "http://localhost:24203", status: "active,idle", workspaceId: "proj" },
+ );
+ expect(calledUrl).toBe(
+ "http://localhost:24203/conversations?status=active%2Cidle&workspaceId=proj",
+ );
+ });
+
it("throws on non-OK status", async () => {
const fakeFetch = (async (): Promise<Response> =>
new Response("boom", { status: 500 })) as unknown as typeof fetch;
diff --git a/packages/cli/src/http.ts b/packages/cli/src/http.ts
index 42fcfec..e13842a 100644
--- a/packages/cli/src/http.ts
+++ b/packages/cli/src/http.ts
@@ -98,6 +98,7 @@ interface FetchConversationsOpts {
readonly server: string;
readonly query?: string;
readonly status?: string;
+ readonly workspaceId?: string;
}
export async function fetchConversations(
@@ -107,6 +108,7 @@ export async function fetchConversations(
const params = new URLSearchParams();
if (opts.query !== undefined) params.set("q", opts.query);
if (opts.status !== undefined) params.set("status", opts.status);
+ if (opts.workspaceId !== undefined) params.set("workspaceId", opts.workspaceId);
const qs = params.toString();
const url = qs.length > 0 ? `${opts.server}/conversations?${qs}` : `${opts.server}/conversations`;
const res = await deps.fetchImpl(url);
diff --git a/packages/cli/src/main.ts b/packages/cli/src/main.ts
index 9dfc317..cba0de7 100644
--- a/packages/cli/src/main.ts
+++ b/packages/cli/src/main.ts
@@ -24,12 +24,12 @@ import { extractLastText, formatConversationList, renderEvent } from "./render.j
const USAGE = `Usage:
dispatch models [--server <url>]
- dispatch list [<prefix>] [--status <active|idle|closed>] [--all] [--server <url>]
+ dispatch list [<prefix>] [--status <active|idle|closed>] [--workspace <id>] [--all] [--server <url>]
dispatch stop <conversationId> [--server <url>]
dispatch compact <conversationId> [--server <url>]
dispatch read <conversationId> [--server <url>]
dispatch open <conversationId> [--server <url>]
- dispatch send <conversationId> --text "..." [--queue] [--open] [--cwd <dir>] [--effort <level>] [--workspace <id>] [--server <url>]
+ dispatch send <conversationId> --text "..." [--file <path>] [--queue] [--open] [--cwd <dir>] [--effort <level>] [--workspace <id>] [--server <url>]
dispatch <modelName> --text "..." [--file <path>] [--cwd <dir>] [--conversation <id>] [--effort <level>] [--workspace <id>] [--server <url>] [--show-reasoning] [--open]
dispatch --help
@@ -61,6 +61,7 @@ async function main(): Promise<void> {
server: parsed.server,
...(parsed.query !== undefined && { query: parsed.query }),
...(status !== undefined && { status }),
+ ...(parsed.workspaceId !== undefined && { workspaceId: parsed.workspaceId }),
},
);
const table = formatConversationList(result.conversations, Date.now());
@@ -156,10 +157,20 @@ async function main(): Promise<void> {
process.stdout.write(`Signaled frontend to open ${conversationId}\n`);
}
+ let fileContent: string | undefined;
+ if (parsed.file) {
+ fileContent = await readFile(parsed.file, "utf-8");
+ }
+ const message = composeMessage({
+ ...(parsed.text !== undefined && { text: parsed.text }),
+ ...(parsed.file !== undefined && { file: parsed.file }),
+ ...(fileContent !== undefined && { fileContent }),
+ });
+
if (parsed.queue) {
const queued = await enqueueMessage(
{ fetchImpl: globalThis.fetch },
- { server: parsed.server, conversationId, text: parsed.text },
+ { server: parsed.server, conversationId, text: message },
);
const line = queued.startedTurn
? `Started turn for ${conversationId}`
@@ -168,7 +179,7 @@ async function main(): Promise<void> {
} else {
const request = {
conversationId,
- message: parsed.text,
+ message,
...(parsed.cwd !== undefined && { cwd: parsed.cwd }),
...(parsed.reasoningEffort !== undefined && { reasoningEffort: parsed.reasoningEffort }),
...(parsed.workspaceId !== undefined && { workspaceId: parsed.workspaceId }),
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts
index 6c9652d..dca34c2 100644
--- a/packages/kernel/src/contracts/events.ts
+++ b/packages/kernel/src/contracts/events.ts
@@ -11,6 +11,7 @@ export type {
TurnDoneEvent,
TurnErrorEvent,
TurnInputEvent,
+ TurnProviderRetryEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index c67607b..f3e5bca 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -40,6 +40,7 @@ export type {
TurnDoneEvent,
TurnErrorEvent,
TurnInputEvent,
+ TurnProviderRetryEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
@@ -109,6 +110,7 @@ export type {
export type {
EventEmitter,
FinishReason,
+ RetryStrategy,
RunTurnInput,
RunTurnResult,
} from "./runtime.js";
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts
index 03e62c2..dc74c84 100644
--- a/packages/kernel/src/contracts/runtime.ts
+++ b/packages/kernel/src/contracts/runtime.ts
@@ -140,6 +140,22 @@ export interface RunTurnInput {
* double-persist them.
*/
readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void;
+
+ /**
+ * Optional injected retry strategy for retryable provider errors (e.g. HTTP
+ * 429 / 5xx "overloaded"). When omitted, a retryable error ends the step
+ * exactly as before (backward-compatible). When provided, the runtime wraps
+ * `provider.stream()` consumption in a retry loop: on a retryable error
+ * (an emitted `error` ProviderEvent with `retryable === true`, OR a thrown
+ * error) — ONLY when no content was emitted yet this step (the safety
+ * invariant — never duplicate partial output) — it asks `retry.delayFor`
+ * for a delay, emits a transient `provider-retry` AgentEvent, sleeps via the
+ * injected `retry.sleep` (abortable), and re-calls `provider.stream()`.
+ *
+ * Injected (not ambient): the kernel imports no timer and owns no schedule.
+ * Mirrors the `now`/`logger` injection pattern — optional + backward-compatible.
+ */
+ readonly retry?: RetryStrategy;
}
/**
@@ -156,3 +172,31 @@ export interface RunTurnResult {
/** Why the turn ended. */
readonly finishReason: FinishReason;
}
+
+/**
+ * Injected retry strategy for retryable provider errors (e.g. HTTP 429 / 5xx).
+ *
+ * The kernel provides the HOOK (this contract + the retry loop in `runTurn`);
+ * the shell (session-orchestrator) provides the POLICY (the concrete schedule)
+ * and the I/O (the actual sleep). The kernel imports no timer — `sleep` is an
+ * injected effect so the runtime stays pure and deterministic in tests.
+ *
+ * Retries are ONLY attempted when NO content was emitted yet this step (the
+ * safety invariant — never duplicate partial output). When omitted on
+ * `RunTurnInput`, no retry happens (backward-compatible: a retryable error ends
+ * the step exactly as before).
+ */
+export interface RetryStrategy {
+ /**
+ * Pure, deterministic decision: given the 0-based attempt index, return the
+ * delay in ms to sleep before the next retry, or `undefined` to stop (budget
+ * exhausted). No I/O, no clock — fully testable.
+ */
+ readonly delayFor: (attempt: number) => number | undefined;
+ /**
+ * Injected effect: actually sleep for the given ms. Must honor the abort
+ * signal — reject when aborted so the turn seals `aborted`. The kernel
+ * imports no timer; the shell provides a `setTimeout`-based implementation.
+ */
+ readonly sleep: (ms: number, signal: AbortSignal) => Promise<void>;
+}
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts
index b194577..5805e28 100644
--- a/packages/kernel/src/runtime/events.ts
+++ b/packages/kernel/src/runtime/events.ts
@@ -164,3 +164,17 @@ export function errorEvent(
}
return { type: "error", conversationId, turnId, message };
}
+
+export function providerRetryEvent(
+ conversationId: string,
+ turnId: string,
+ attempt: number,
+ delayMs: number,
+ message: string,
+ code?: string,
+): AgentEvent {
+ if (code !== undefined) {
+ return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message, code };
+ }
+ return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message };
+}
diff --git a/packages/kernel/src/runtime/index.ts b/packages/kernel/src/runtime/index.ts
index e1156e3..e0dd656 100644
--- a/packages/kernel/src/runtime/index.ts
+++ b/packages/kernel/src/runtime/index.ts
@@ -2,6 +2,7 @@ export type { StepDispatcher } from "./dispatch.js";
export { createStepDispatcher, executeToolCall } from "./dispatch.js";
export {
errorEvent,
+ providerRetryEvent,
reasoningDeltaEvent,
textDeltaEvent,
toolCallEvent,
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 08d3055..a9fc3d9 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -2886,4 +2886,539 @@ describe("runTurn", () => {
expect(drainCallCount).toBe(MAX_STEPS - 1);
});
});
+
+ // ── Retry with backoff ──────────────────────────────────────────────────
+ //
+ // PURE tests: a fake `sleep` (records calls, resolves instantly, can abort
+ // on a chosen call) + a pure `delayFor` (the canonical schedule + 8h budget).
+ // A stub `ProviderContract` whose `stream` yields a retryable error N times
+ // then a finish. ZERO mocks of `@dispatch/*` modules — effects injected.
+
+ /** The canonical backoff schedule (matches the orchestrator's concrete strategy). */
+ const RETRY_SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000];
+ const RETRY_TAIL_MS = 1_800_000; // 30m
+ const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; // 8h
+
+ /** Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). */
+ function cumulativeSleepMs(attempt: number): number {
+ let sum = 0;
+ for (let i = 0; i <= attempt; i++) {
+ sum += i < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[i] : RETRY_TAIL_MS;
+ }
+ return sum;
+ }
+
+ /** Pure, deterministic delay decision (no I/O, no clock). */
+ function delayFor(attempt: number): number | undefined {
+ const delay = attempt < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[attempt] : RETRY_TAIL_MS;
+ if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop
+ return delay;
+ }
+
+ /** The full schedule delayFor would emit (until budget exhausted). */
+ function fullSchedule(): number[] {
+ const result: number[] = [];
+ let attempt = 0;
+ while (true) {
+ const delay = delayFor(attempt);
+ if (delay === undefined) break;
+ result.push(delay);
+ attempt++;
+ }
+ return result;
+ }
+
+ /**
+ * Fake, controllable `sleep`: records every call's delay, resolves
+ * instantly (no real waiting), and can abort the controller on a chosen
+ * 1-based call index to simulate "abort during sleep".
+ */
+ function createFakeSleep(controller: AbortController): {
+ sleep: (ms: number, signal: AbortSignal) => Promise<void>;
+ calls: number[];
+ abortOnCall: (n: number) => void;
+ } {
+ const calls: number[] = [];
+ let abortAt: number | undefined;
+ const sleep = async (ms: number, _signal: AbortSignal): Promise<void> => {
+ calls.push(ms);
+ if (abortAt !== undefined && calls.length === abortAt) {
+ controller.abort();
+ throw new Error("aborted");
+ }
+ // Otherwise resolve instantly (no real waiting).
+ };
+ return {
+ sleep,
+ calls,
+ abortOnCall: (n: number) => {
+ abortAt = n;
+ },
+ };
+ }
+
+ /** A provider that yields a retryable error `errorCount` times, then success. */
+ function createRetryingProvider(opts: {
+ errorCount: number;
+ error?: { message: string; code?: string; retryable?: boolean };
+ success?: ProviderEvent[];
+ }): { provider: ProviderContract; streamCalls: { value: number } } {
+ const streamCalls = { value: 0 };
+ const error: ProviderEvent = {
+ type: "error",
+ message: opts.error?.message ?? "overloaded",
+ ...(opts.error?.code !== undefined ? { code: opts.error.code } : {}),
+ ...(opts.error?.retryable !== undefined ? { retryable: opts.error.retryable } : {}),
+ };
+ const success = opts.success ?? [
+ { type: "text-delta", delta: "hi" },
+ { type: "finish", reason: "stop" },
+ ];
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = streamCalls.value++;
+ return (async function* () {
+ if (idx < opts.errorCount) {
+ yield error;
+ return;
+ }
+ for (const event of success) yield event;
+ })();
+ },
+ };
+ return { provider, streamCalls };
+ }
+
+ describe("retry with backoff", () => {
+ it("retries a retryable emitted error on schedule then succeeds", async () => {
+ const { provider } = createRetryingProvider({
+ errorCount: 3,
+ error: { message: "HTTP 429: overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("stop");
+ // 3 retries: 5s, 10s, 30s.
+ expect(fake.calls).toEqual([5_000, 10_000, 30_000]);
+ // 3 provider-retry events (one per sleep), then the successful text.
+ const retryEvents = events.filter((e) => e.type === "provider-retry");
+ expect(retryEvents).toHaveLength(3);
+ if (retryEvents[0]?.type === "provider-retry") {
+ expect(retryEvents[0].attempt).toBe(0);
+ expect(retryEvents[0].delayMs).toBe(5_000);
+ expect(retryEvents[0].message).toBe("HTTP 429: overloaded");
+ expect(retryEvents[0].code).toBe("429");
+ expect(retryEvents[0].conversationId).toBe("conv-1");
+ expect(retryEvents[0].turnId).toBe("turn-1");
+ }
+ if (retryEvents[1]?.type === "provider-retry") {
+ expect(retryEvents[1].attempt).toBe(1);
+ expect(retryEvents[1].delayMs).toBe(10_000);
+ }
+ if (retryEvents[2]?.type === "provider-retry") {
+ expect(retryEvents[2].attempt).toBe(2);
+ expect(retryEvents[2].delayMs).toBe(30_000);
+ }
+ // The error was suppressed (no error event emitted — retry succeeded).
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ // The successful content still streams.
+ const deltas = events.filter((e) => e.type === "text-delta");
+ expect(deltas).toHaveLength(1);
+ });
+
+ it("sleep is called with the full schedule [5s,10s,30s,60s,5m,10m,15m,30m,30m…]", async () => {
+ // Provider errors forever → retries until budget exhausted → gives up.
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ // Budget exhausted → give up → error.
+ expect(result.finishReason).toBe("error");
+
+ // The sleep schedule matches the pure delayFor output exactly.
+ expect(fake.calls).toEqual(fullSchedule());
+
+ // Head of the schedule (the 8 stepped delays).
+ expect(fake.calls.slice(0, 8)).toEqual([
+ 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000,
+ ]);
+ // Tail repeats 30m.
+ expect(fake.calls[8]).toBe(1_800_000);
+ expect(fake.calls.at(-1)).toBe(1_800_000);
+
+ // 8h cumulative budget cap: head (3705s) + 13×30m = ~7h31m, then stop.
+ // 21 retries (attempts 0..20), then delayFor(21) → undefined → give up.
+ expect(fake.calls).toHaveLength(21);
+ const totalSlept = fake.calls.reduce((a, b) => a + b, 0);
+ expect(totalSlept).toBeLessThanOrEqual(RETRY_BUDGET_MS);
+ expect(totalSlept).toBe(3_705_000 + 13 * 1_800_000); // 27_105_000
+
+ // One provider-retry per sleep, plus a final error (give-up).
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(21);
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ const errEvt = events.find((e) => e.type === "error");
+ if (errEvt?.type === "error") {
+ expect(errEvt.message).toBe("overloaded");
+ expect(errEvt.code).toBe("429");
+ }
+ });
+
+ it("does NOT retry after content was emitted (safety invariant)", async () => {
+ // Provider yields text (content) THEN a retryable error. Because content
+ // was emitted, retrying is unsafe (would duplicate partial output).
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ yield {
+ type: "error",
+ message: "overloaded",
+ code: "429",
+ retryable: true,
+ } as ProviderEvent;
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ // No retries: stream called exactly once.
+ expect(callCount).toBe(1);
+ expect(fake.calls).toHaveLength(0);
+ // The error is emitted (give-up) and partial content preserved.
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1);
+ });
+
+ it("does NOT retry a non-retryable emitted error (retryable: false)", async () => {
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "bad request", code: "400", retryable: false },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ });
+
+ it("does NOT retry a non-retryable emitted error (retryable absent)", async () => {
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "bad request", code: "400" }, // no retryable field
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ });
+
+ it("give-up emits the final error when budget is exhausted", async () => {
+ // Custom delayFor that allows exactly 1 retry then stops.
+ const shortDelayFor = (attempt: number): number | undefined =>
+ attempt === 0 ? 100 : undefined;
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor: shortDelayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("error");
+ expect(fake.calls).toEqual([100]); // one retry, then give up
+ // One provider-retry (attempt 0), then the final error.
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(1);
+ const errs = events.filter((e) => e.type === "error");
+ expect(errs).toHaveLength(1);
+ if (errs[0]?.type === "error") {
+ expect(errs[0].message).toBe("overloaded");
+ expect(errs[0].code).toBe("429");
+ }
+ });
+
+ it("abort during sleep seals the turn aborted", async () => {
+ const { provider } = createRetryingProvider({
+ errorCount: Number.POSITIVE_INFINITY,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+ fake.abortOnCall(2); // abort on the 2nd sleep
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(result.finishReason).toBe("aborted");
+ // Two sleeps attempted; the 2nd aborted.
+ expect(fake.calls).toHaveLength(2);
+ // No terminal error emitted (it was an abort, not a give-up).
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ // One provider-retry before the aborted sleep (attempt 0).
+ const retries = events.filter((e) => e.type === "provider-retry");
+ expect(retries).toHaveLength(2);
+ // The done event carries reason "aborted".
+ const done = events.find((e) => e.type === "done");
+ if (done?.type === "done") {
+ expect(done.reason).toBe("aborted");
+ }
+ });
+
+ it("omitting retry keeps the pre-retry behavior (backward-compatible)", async () => {
+ // A retryable error with no retry configured → ends the step as today.
+ const { provider, streamCalls } = createRetryingProvider({
+ errorCount: 1,
+ error: { message: "overloaded", code: "429", retryable: true },
+ });
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ // no retry field
+ });
+
+ expect(streamCalls.value).toBe(1); // no retry
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0);
+ });
+
+ it("retries a THROWN error (retryable-by-default when pre-content)", async () => {
+ // A thrown error (no retryable flag) before content is retried.
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ if (callCount <= 2) {
+ throw new Error("network blip");
+ }
+ yield { type: "text-delta", delta: "hi" } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(callCount).toBe(3); // 2 throws retried, 3rd succeeds
+ expect(fake.calls).toEqual([5_000, 10_000]);
+ expect(result.finishReason).toBe("stop");
+ expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(2);
+ // Thrown errors have no code.
+ if (events[0]?.type === "provider-retry") {
+ expect(events[0].code).toBeUndefined();
+ expect(events[0].message).toBe("network blip");
+ }
+ expect(events.filter((e) => e.type === "error")).toHaveLength(0);
+ });
+
+ it("does NOT retry a thrown error after content was emitted", async () => {
+ let callCount = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ callCount++;
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ throw new Error("network blip");
+ })();
+ },
+ };
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ expect(callCount).toBe(1);
+ expect(fake.calls).toHaveLength(0);
+ expect(result.finishReason).toBe("error");
+ expect(events.filter((e) => e.type === "error")).toHaveLength(1);
+ expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1);
+ });
+
+ it("provider-retry events interleave correctly: error → retry-event → sleep → retry", async () => {
+ // Verify ordering: each provider-retry event comes BEFORE its sleep,
+ // and the successful content comes only after the last retry.
+ const { provider } = createRetryingProvider({
+ errorCount: 2,
+ error: { message: "overloaded", code: "429", retryable: true },
+ success: [
+ { type: "text-delta", delta: "ok" },
+ { type: "finish", reason: "stop" },
+ ],
+ });
+ const controller = new AbortController();
+ const fake = createFakeSleep(controller);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: controller.signal,
+ retry: { delayFor, sleep: fake.sleep },
+ });
+
+ const types = events.map((e) => e.type);
+ // turn-start, provider-retry(0), provider-retry(1), text-delta, step-complete, done
+ expect(types[0]).toBe("turn-start");
+ const firstRetryIdx = types.indexOf("provider-retry");
+ const textIdx = types.indexOf("text-delta");
+ expect(firstRetryIdx).toBeGreaterThan(0);
+ expect(textIdx).toBeGreaterThan(firstRetryIdx);
+ // Both retries precede the text.
+ const retryCount = types.filter((t) => t === "provider-retry").length;
+ expect(retryCount).toBe(2);
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 940c77f..ac87a1f 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -6,12 +6,18 @@ import type {
ProviderStreamOptions,
Usage,
} from "../contracts/provider.js";
-import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js";
+import type {
+ EventEmitter,
+ RetryStrategy,
+ RunTurnInput,
+ RunTurnResult,
+} from "../contracts/runtime.js";
import type { ToolCall, ToolContract } from "../contracts/tool.js";
import { createStepDispatcher, type StepDispatcher } from "./dispatch.js";
import {
doneEvent,
errorEvent,
+ providerRetryEvent,
reasoningDeltaEvent,
stepCompleteEvent,
textDeltaEvent,
@@ -121,6 +127,8 @@ interface StepContext {
readonly now: (() => number) | undefined;
/** Per-turn provider options (model, systemPrompt, …) threaded to stream(). */
readonly providerOpts: ProviderStreamOptions | undefined;
+ /** Optional injected retry strategy (omit = no retry, backward-compatible). */
+ readonly retry: RetryStrategy | undefined;
}
interface TimingState {
@@ -250,12 +258,10 @@ function processEvent(
case "finish":
break;
case "error":
- if (event.code !== undefined) {
- chunks.push({ type: "error", message: event.message, code: event.code });
- } else {
- chunks.push({ type: "error", message: event.message });
- }
- ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, event.message, event.code));
+ // Handled by the retry loop in executeStep (not here): an error event
+ // is intercepted before processEvent so the step can decide whether to
+ // retry (suppressing the error) or give up (emit it). processEvent
+ // never receives an "error" event.
break;
}
}
@@ -316,34 +322,142 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
// Swallow — D7.
}
- try {
- const opts: ProviderStreamOptions = {
- ...ctx.providerOpts,
- ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}),
- };
- const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
- for await (const event of stream) {
- if (ctx.signal.aborted) break;
- processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes);
- if (event.type === "usage") {
- stepUsage = addUsage(stepUsage, event.usage);
+ // Retry loop: wrap provider.stream() consumption. Retries are ONLY
+ // attempted when no content was emitted yet this step (the safety
+ // invariant — never duplicate partial output). On a retryable error —
+ // either an EMITTED `error` ProviderEvent with `retryable === true`, OR a
+ // THROWN error (retryable-by-default when pre-content) — with !hadContent:
+ // ask retry.delayFor(attempt); if it returns a delay → emit a transient
+ // provider-retry AgentEvent, sleep via the injected retry.sleep (abortable),
+ // attempt++, re-call provider.stream(); if it returns undefined (budget
+ // exhausted) → give up. Non-retryable emitted errors (retryable === false or
+ // absent), errors after content, and the no-retry-configured case all fall
+ // through to "give up" — identical to the pre-retry behavior.
+ let hadContent = false;
+ let attempt = 0;
+ while (true) {
+ let errored = false;
+ let wasThrown = false;
+ let errorMessage: string | undefined;
+ let errorCode: string | undefined;
+ let errorRetryable: boolean | undefined;
+ let thrownErr: unknown;
+
+ try {
+ const opts: ProviderStreamOptions = {
+ ...ctx.providerOpts,
+ ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}),
+ };
+ const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
+ for await (const event of stream) {
+ if (ctx.signal.aborted) break;
+ if (event.type === "error") {
+ // Intercept: hold for the retry decision — don't push a chunk
+ // or emit yet (a successful retry would leave a stale error).
+ errored = true;
+ errorMessage = event.message;
+ errorCode = event.code;
+ errorRetryable = event.retryable;
+ break;
+ }
+ if (
+ event.type === "text-delta" ||
+ event.type === "reasoning-delta" ||
+ event.type === "tool-call" ||
+ event.type === "usage"
+ ) {
+ hadContent = true;
+ }
+ processEvent(
+ event,
+ chunks,
+ toolCalls,
+ dispatcher,
+ ctx,
+ stepSpan,
+ timing,
+ toolDispatchTimes,
+ );
+ if (event.type === "usage") {
+ stepUsage = addUsage(stepUsage, event.usage);
+ }
+ if (event.type === "finish") {
+ finishReason = event.reason;
+ }
}
- if (event.type === "finish") {
- finishReason = event.reason;
+ } catch (err) {
+ errored = true;
+ wasThrown = true;
+ errorMessage = err instanceof Error ? err.message : String(err);
+ errorCode = undefined;
+ errorRetryable = undefined;
+ thrownErr = err;
+ }
+
+ // Abort (during stream) → stop; the runTurn loop seals aborted.
+ if (ctx.signal.aborted) {
+ break;
+ }
+
+ // No error → step succeeded.
+ if (!errored) {
+ break;
+ }
+
+ // Retryable? A thrown error is retryable-by-default when pre-content;
+ // an emitted error is retryable ONLY when `retryable === true` (absent
+ // or false → not retried, per the contract).
+ const isRetryable = wasThrown ? true : errorRetryable === true;
+ if (ctx.retry !== undefined && !hadContent && isRetryable) {
+ const delay = ctx.retry.delayFor(attempt);
+ if (delay !== undefined) {
+ // Emit the transient provider-retry event BEFORE the sleep so the
+ // UI shows "⚠ retrying in Ns…" immediately. Not persisted as a
+ // chat message — it never pollutes the prompt.
+ ctx.emit(
+ providerRetryEvent(
+ ctx.conversationId,
+ ctx.turnId,
+ attempt,
+ delay,
+ errorMessage ?? "",
+ errorCode,
+ ),
+ );
+ // Abortable sleep. If the signal fires during sleep, the shell's
+ // sleep rejects — we catch it and break so the turn seals aborted.
+ try {
+ await ctx.retry.sleep(delay, ctx.signal);
+ } catch {
+ // Abort during sleep (or unexpected sleep failure).
+ }
+ if (ctx.signal.aborted) {
+ break;
+ }
+ attempt++;
+ continue;
}
+ // delayFor returned undefined → budget exhausted → give up.
+ }
+
+ // Give up: emit the suppressed error and end the step. This is the
+ // single emission point for a terminal provider error (non-retryable,
+ // post-content, budget-exhausted, or no-retry-configured).
+ const message = errorMessage ?? "";
+ if (errorCode !== undefined) {
+ chunks.push({ type: "error", message, code: errorCode });
+ } else {
+ chunks.push({ type: "error", message });
}
- } catch (err) {
- const message = err instanceof Error ? err.message : String(err);
- chunks.push({ type: "error", message });
- ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message));
+ ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message, errorCode));
finishReason = "error";
- // Close step span with error
try {
- stepSpan?.end({ err });
+ stepSpan?.end({ err: thrownErr ?? new Error(message) });
} catch {
// Swallow — D7.
}
stepSpan = undefined;
+ break;
}
// Close timing spans: if no first token was seen, end ttft with firstToken: false
@@ -527,6 +641,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
computerId: input.computerId,
now,
providerOpts: input.providerOpts,
+ retry: input.retry,
});
totalUsage = addUsage(totalUsage, stepResult.usage);
diff --git a/packages/lsp/src/aggregate.test.ts b/packages/lsp/src/aggregate.test.ts
new file mode 100644
index 0000000..4579a0a
--- /dev/null
+++ b/packages/lsp/src/aggregate.test.ts
@@ -0,0 +1,141 @@
+import { describe, expect, it } from "vitest";
+import { type AggregateServer, aggregateDiagnostics } from "./aggregate.js";
+import type { LanguageServerClient } from "./client.js";
+
+/**
+ * A minimal fake client: only `waitForDiagnostics` is exercised by
+ * aggregateDiagnostics, so we stub just that. Cast to the real type (mirrors
+ * tool.test.ts) — no real process, no internal mocks of our own modules.
+ */
+function fakeClient(
+ waitForDiagnostics: LanguageServerClient["waitForDiagnostics"],
+): LanguageServerClient {
+ return { waitForDiagnostics } as unknown as LanguageServerClient;
+}
+
+const SERVER_A: AggregateServer = { id: "a", name: "Ruby-LSP", root: "/p" };
+const SERVER_B: AggregateServer = { id: "b", name: "Steep", root: "/p" };
+
+describe("aggregateDiagnostics", () => {
+ it("returns merged diagnostics from all responding servers, tagged by source", async () => {
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async () => ({ formatted: "ERROR L1:1: boom", slow: false, timedOut: false })),
+ ],
+ [
+ "b",
+ fakeClient(async () => ({ formatted: "WARNING L2:3: meh", slow: false, timedOut: false })),
+ ],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ expect(result.timedOut).toBe(false);
+ expect(result.formatted).toContain("[Ruby-LSP]");
+ expect(result.formatted).toContain("boom");
+ expect(result.formatted).toContain("[Steep]");
+ expect(result.formatted).toContain("meh");
+ });
+
+ it("skips a server that times out with a raise-to-user notice, and still returns the fast server's result", async () => {
+ // Steep never resolves within the cap → timedOut; ruby-lsp answers fast.
+ const clients = new Map<string, LanguageServerClient>([
+ ["a", fakeClient(async () => ({ formatted: "", slow: false, timedOut: false }))],
+ ["b", fakeClient(async () => ({ formatted: "", slow: false, timedOut: true }))],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ expect(result.timedOut).toBe(true);
+ // The skip notice names the offending server and the cap.
+ expect(result.formatted).toContain("[Steep]");
+ expect(result.formatted).toContain("took too long");
+ expect(result.formatted).toContain(">10s");
+ expect(result.formatted).toContain("raise this to the user");
+ // ruby-lsp answered cleanly (empty diagnostics) → no line for it.
+ expect(result.formatted).not.toContain("[Ruby-LSP]");
+ });
+
+ it("runs servers concurrently: a slow server does not delay a fast one's contribution order", async () => {
+ const callOrder: string[] = [];
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async () => {
+ callOrder.push("a-start");
+ await new Promise((r) => setTimeout(r, 5));
+ callOrder.push("a-end");
+ return { formatted: "from-a", slow: false, timedOut: false };
+ }),
+ ],
+ [
+ "b",
+ fakeClient(async () => {
+ callOrder.push("b-start");
+ await new Promise((r) => setTimeout(r, 30));
+ callOrder.push("b-end");
+ return { formatted: "from-b", slow: false, timedOut: false };
+ }),
+ ],
+ ]);
+
+ const result = await aggregateDiagnostics(
+ (id) => clients.get(id),
+ [SERVER_A, SERVER_B],
+ "/p/x.rb",
+ 10_000,
+ {},
+ );
+
+ // Both started before either ended → concurrent, not sequential.
+ expect(callOrder.slice(0, 2).sort()).toEqual(["a-start", "b-start"]);
+ expect(result.formatted).toContain("from-a");
+ expect(result.formatted).toContain("from-b");
+ });
+
+ it("a missing client (dead/excluded) contributes nothing and never rejects", async () => {
+ const result = await aggregateDiagnostics(() => undefined, [SERVER_A], "/p/x.rb", 10_000, {});
+ expect(result.formatted).toBe("");
+ expect(result.timedOut).toBe(false);
+ });
+
+ it("forwards text + minSeverity to each client's waitForDiagnostics", async () => {
+ const seen: Array<{ text?: string; minSeverity?: number; timeoutMs: number }> = [];
+ const clients = new Map<string, LanguageServerClient>([
+ [
+ "a",
+ fakeClient(async (_path, opts) => {
+ seen.push({
+ text: opts?.text,
+ minSeverity: opts?.minSeverity,
+ timeoutMs: opts?.timeoutMs ?? -1,
+ });
+ return { formatted: "", slow: false, timedOut: false };
+ }),
+ ],
+ ]);
+
+ await aggregateDiagnostics((id) => clients.get(id), [SERVER_A], "/p/x.rb", 7000, {
+ text: "post-edit buffer",
+ minSeverity: 2,
+ });
+
+ expect(seen).toHaveLength(1);
+ expect(seen[0]?.text).toBe("post-edit buffer");
+ expect(seen[0]?.minSeverity).toBe(2);
+ expect(seen[0]?.timeoutMs).toBe(7000);
+ });
+});
diff --git a/packages/lsp/src/aggregate.ts b/packages/lsp/src/aggregate.ts
new file mode 100644
index 0000000..b044e21
--- /dev/null
+++ b/packages/lsp/src/aggregate.ts
@@ -0,0 +1,80 @@
+/**
+ * Concurrent multi-server diagnostics aggregation.
+ *
+ * Queries every matching language server AT ONCE (not one-at-a-time), each
+ * capped at `timeoutMs`. A server that doesn't push diagnostics within the cap
+ * is SKIPPED with a per-server notice rather than blocking the others — so one
+ * slow/dead server (e.g. a corrupted Steep) can't hold up the fast one's
+ * (ruby-lsp) results for the full timeout on every edit.
+ *
+ * The only I/O here is `client.waitForDiagnostics` (injected via `getClient`),
+ * so this is unit-testable with a fake client and no real process.
+ */
+
+import type { LanguageServerClient } from "./client.js";
+
+export interface AggregateServer {
+ readonly id: string;
+ readonly name: string;
+ readonly root: string;
+}
+
+export interface AggregateOpts {
+ /** Post-edit buffer; when omitted the server reads from disk. */
+ readonly text?: string | undefined;
+ /** Only include diagnostics with severity ≤ this (1=Error, 2=Warning). */
+ readonly minSeverity?: number | undefined;
+}
+
+export interface AggregateResult {
+ /** Merged diagnostics tagged by source + a per-skipped-server notice. */
+ readonly formatted: string;
+ /** True if at least one server was skipped for exceeding the cap. */
+ readonly timedOut: boolean;
+}
+
+/**
+ * Query `servers` concurrently, each capped at `timeoutMs`. Returns merged
+ * diagnostics tagged by source (`[name]\n…`) and, for any server that did not
+ * respond in time, a `⚠️ [name] LSP took too long (>Ns), skipped — please raise
+ * this to the user.` notice. Never rejects: a client error yields an empty
+ * contribution for that server.
+ */
+export async function aggregateDiagnostics(
+ getClient: (id: string, root: string) => LanguageServerClient | undefined,
+ servers: readonly AggregateServer[],
+ absolutePath: string,
+ timeoutMs: number,
+ opts: AggregateOpts,
+): Promise<AggregateResult> {
+ const entries = await Promise.all(
+ servers.map(async (server) => {
+ const client = getClient(server.id, server.root);
+ if (!client) return null;
+ const waitOpts: { text?: string; timeoutMs: number; minSeverity?: number } = { timeoutMs };
+ if (opts.text !== undefined) waitOpts.text = opts.text;
+ if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity;
+ const result = await client.waitForDiagnostics(absolutePath, waitOpts);
+ return { server, result };
+ }),
+ );
+
+ const parts: string[] = [];
+ let timedOut = false;
+ const capSeconds = Math.round(timeoutMs / 1000);
+
+ for (const entry of entries) {
+ if (!entry) continue;
+ const { server, result } = entry;
+ if (result.timedOut) {
+ timedOut = true;
+ parts.push(
+ `⚠️ [${server.name}] LSP took too long (>${capSeconds}s), diagnostics skipped — please raise this to the user.`,
+ );
+ } else if (result.formatted) {
+ parts.push(`[${server.name}]\n${result.formatted}`);
+ }
+ }
+
+ return { formatted: parts.join("\n\n"), timedOut };
+}
diff --git a/packages/lsp/src/client.test.ts b/packages/lsp/src/client.test.ts
index 681860f..338ef0b 100644
--- a/packages/lsp/src/client.test.ts
+++ b/packages/lsp/src/client.test.ts
@@ -3,6 +3,7 @@ import {
type FileWatcher,
type FsAccess,
LanguageServerClient,
+ type ProcessExitHandler,
type SpawnProcess,
} from "./client.js";
import { encode } from "./framing.js";
@@ -288,4 +289,149 @@ describe("client", () => {
client.shutdown();
expect(state.killed).toBe(true);
});
+
+ it("onExit marks the client broken (error) so callers stop querying a corpse", async () => {
+ const state = { killed: false };
+ let exitCb: ProcessExitHandler | null = null;
+ const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null };
+
+ const spawnWithExit: SpawnProcess = () => ({
+ stdin: { write: () => {} },
+ stdout: {
+ on: (_event: string, cb: (data: Uint8Array) => void) => {
+ stdoutHolder.cb = cb;
+ },
+ },
+ pid: 999,
+ kill: () => {
+ state.killed = true;
+ },
+ onExit: (handler) => {
+ exitCb = handler;
+ },
+ });
+
+ const { client } = makeClient({ spawn: spawnWithExit });
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ stdoutHolder.cb?.(
+ encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })),
+ );
+ await startPromise;
+ expect(client.getState()).toBe("connected");
+
+ // Simulate the process dying (user kill / crash).
+ exitCb?.({ code: 1 });
+
+ expect(client.getState()).toBe("error");
+ expect(client.getStateError()).toMatch(/process exited/);
+ // The (still-alive-in-test) process was killed to avoid a zombie.
+ expect(state.killed).toBe(true);
+ });
+
+ it("a dead client is skipped by waitForDiagnostics callers (state !== connected)", async () => {
+ // Build a client, connect, kill via onExit, then assert a diagnostics
+ // query would not block: getState() is "error" so the matching filter
+ // (state === "connected") excludes it. We assert the state guard.
+ const stdoutHolder: { cb: ((data: Uint8Array) => void) | null } = { cb: null };
+ let exitCb: ProcessExitHandler | null = null;
+ const spawnWithExit: SpawnProcess = () => ({
+ stdin: { write: () => {} },
+ stdout: {
+ on: (_e, cb) => {
+ stdoutHolder.cb = cb;
+ },
+ },
+ pid: 1,
+ kill: () => {},
+ onExit: (handler) => {
+ exitCb = handler;
+ },
+ });
+
+ const { client } = makeClient({ spawn: spawnWithExit });
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ stdoutHolder.cb?.(
+ encode(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } })),
+ );
+ await startPromise;
+
+ exitCb?.({ code: null });
+ expect(client.getState()).toBe("error");
+ // The aggregate / getDiagnostics matching filter requires "connected".
+ expect(client.getState() === "connected").toBe(false);
+ });
+
+ it("corruption detector marks the client broken after repeated identical diagnostics despite text changes", async () => {
+ // A healthy server would change diagnostics as the file changes; a
+ // corrupted one re-emits the SAME non-empty set. Drive 5 edits with
+ // different text but identical diagnostics → client flips to error.
+ const { client, serverResponses } = makeClient();
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } }));
+ await startPromise;
+
+ const phantom = JSON.stringify({
+ jsonrpc: "2.0",
+ method: "textDocument/publishDiagnostics",
+ params: {
+ uri: "file:///project/game.rb",
+ diagnostics: [
+ {
+ range: { start: { line: 0, character: 27 }, end: { line: 0, character: 28 } },
+ severity: 1,
+ message: "SyntaxError: unexpected token",
+ },
+ ],
+ },
+ });
+
+ const path = "/project/game.rb";
+ // The first call establishes the baseline snapshot (no increment).
+ // Each subsequent call with identical diagnostics + changed text
+ // increments; the 6th call (5th increment) trips the threshold.
+ for (let i = 1; i <= 5; i++) {
+ const p = client.waitForDiagnostics(path, { text: `buf-v${i}`, timeoutMs: 2000 });
+ // Push the identical phantom diagnostics so the poll resolves.
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(phantom);
+ await p;
+ expect(client.getState()).toBe("connected");
+ }
+ // 6th identical-across-changed-text repeat trips the threshold.
+ const p6 = client.waitForDiagnostics(path, { text: "buf-v6", timeoutMs: 2000 });
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(phantom);
+ await p6;
+
+ expect(client.getState()).toBe("error");
+ expect(client.getStateError()).toMatch(/repeated stale diagnostics/i);
+ });
+
+ it("corruption detector does NOT trip on a clean file (empty diagnostics stay identical)", async () => {
+ const { client, serverResponses } = makeClient();
+ const startPromise = client.start();
+ await new Promise((r) => setTimeout(r, 50));
+ serverResponses(JSON.stringify({ jsonrpc: "2.0", id: 1, result: { capabilities: {} } }));
+ await startPromise;
+
+ const clean = JSON.stringify({
+ jsonrpc: "2.0",
+ method: "textDocument/publishDiagnostics",
+ params: { uri: "file:///project/game.rb", diagnostics: [] },
+ });
+ const path = "/project/game.rb";
+
+ for (let i = 1; i <= 6; i++) {
+ const p = client.waitForDiagnostics(path, { text: `clean-v${i}`, timeoutMs: 2000 });
+ await new Promise((r) => setTimeout(r, 30));
+ serverResponses(clean);
+ await p;
+ }
+ // Empty diagnostics never count as "stale" — a clean file staying clean
+ // is normal, not corruption.
+ expect(client.getState()).toBe("connected");
+ });
});
diff --git a/packages/lsp/src/client.ts b/packages/lsp/src/client.ts
index 677a22a..ac7d025 100644
--- a/packages/lsp/src/client.ts
+++ b/packages/lsp/src/client.ts
@@ -4,13 +4,22 @@
* FileWatcher, and forwards matching disk changes.
*/
-import { DiagnosticsStore, type PublishDiagnosticsParams } from "./diagnostics.js";
+import { DiagnosticsStore, diagnosticKey, type PublishDiagnosticsParams } from "./diagnostics.js";
import { computeChangeRange } from "./diff.js";
import { FrameDecoder } from "./framing.js";
import { languageId as resolveLanguageId } from "./language.js";
import { JsonRpcConnection, type WriteFn } from "./rpc.js";
import { FileChangeType, WatchedFilesRegistry } from "./watched-files.js";
+/** Info delivered to an `onExit` handler when the child process terminates. */
+export interface ProcessExitInfo {
+ readonly code: number | null;
+ readonly signal?: string;
+}
+
+/** A handler registered to be called when the child process exits. */
+export type ProcessExitHandler = (info: ProcessExitInfo) => void;
+
export interface SpawnedProcess {
readonly stdin: { readonly write: (bytes: Uint8Array) => void };
readonly stdout:
@@ -22,6 +31,13 @@ export interface SpawnedProcess {
| undefined;
readonly pid: number | undefined;
readonly kill: () => void;
+ /**
+ * Register a handler fired when the child process exits (code|signal).
+ * Optional: when absent, death is detected via stdout-end instead. Wires
+ * Bun's `proc.exited` in production; tests invoke it directly to simulate
+ * a crash. Lets the client stop querying a dead server (no per-edit hang).
+ */
+ readonly onExit?: ((handler: ProcessExitHandler) => void) | undefined;
}
export type SpawnProcess = (
@@ -102,6 +118,18 @@ export class LanguageServerClient {
private openDocuments = new Map<string, { version: number; text: string }>();
/** Sync mode captured from the server's initialize capabilities: 1=Full, 2=Incremental. */
private textDocumentChange: 1 | 2 = 1;
+ /**
+ * Corruption detection: the last diagnostic-key set + synced text per URI.
+ * A healthy server's diagnostics change when the file changes; a corrupted
+ * one (e.g. Steep's ~3h phantom-SyntaxError drift) re-emits the identical
+ * non-empty set across edits. `staleRepeat` counts consecutive such repeats
+ * across URIs; at the threshold the client is marked broken (→ respawn).
+ */
+ private lastDiagSnapshot = new Map<string, { keys: Set<string>; text: string }>();
+ private staleRepeat = 0;
+ private static readonly STALE_REPEAT_THRESHOLD = 5;
+ /** Default timeout for outbound requests (hover/definition/references). */
+ private static readonly REQUEST_TIMEOUT_MS = 10_000;
constructor(deps: ClientDeps) {
this.deps = deps;
@@ -128,6 +156,12 @@ export class LanguageServerClient {
}
const proc = this.deps.spawn(this.deps.command as string[], spawnOpts);
this.process = proc;
+ // Detect process death so we stop querying a corpse (fixes the
+ // per-edit hang after a server is killed/crashes). onExit is the
+ // primary signal; stdout-end is the defence-in-depth fallback.
+ if (proc.onExit) {
+ proc.onExit((info) => this.handleExit(info));
+ }
const writeFn: WriteFn = (bytes) => proc.stdin.write(bytes);
const rpc = new JsonRpcConnection(writeFn);
@@ -158,8 +192,11 @@ export class LanguageServerClient {
for await (const chunk of source) {
this.handleBytes(chunk);
}
+ // stdout closed — the process is gone (defence-in-depth alongside onExit,
+ // which some edges never call). Idempotent via handleExit's guard.
+ this.handleExit({ code: null });
} catch {
- // process exited
+ this.handleExit({ code: null });
}
})();
}
@@ -172,6 +209,65 @@ export class LanguageServerClient {
});
}
+ /**
+ * The server process exited (onExit or stdout-end). Transition to a broken
+ * state so callers skip it and the manager re-spawns after backoff — instead
+ * of polling a corpse for the full timeout on every edit. Idempotent.
+ */
+ private handleExit(info: ProcessExitInfo): void {
+ if (this.state === "error" || this.state === "not-started") return;
+ const detail = info.signal !== undefined ? `signal ${info.signal}` : `code ${info.code ?? "?"}`;
+ this.markBroken(`language server process exited (${detail})`);
+ }
+
+ /**
+ * Mark this client permanently broken: kill the process if still alive
+ * (corruption case), dispose the rpc (rejects pending requests), and drop
+ * edge handles. The manager's status() observes state:"error" and re-spawns
+ * after the bounded backoff. Called on process death AND on corruption.
+ */
+ private markBroken(reason: string): void {
+ if (this.state === "error") return;
+ this.state = "error";
+ this.stateError = reason;
+ this.fileWatcherHandle?.close();
+ this.fileWatcherHandle = null;
+ this.process?.kill();
+ this.process = null;
+ this.rpc?.dispose();
+ this.rpc = null;
+ }
+
+ /**
+ * Detect a server stuck re-emitting identical non-empty diagnostics
+ * despite the file content changing between calls — the signature of a
+ * corrupted parse/type-check state (e.g. Steep's ~3h phantom-SyntaxError
+ * drift, where a fresh CLI reports green on the same project). After
+ * STALE_REPEAT_THRESHOLD consecutive such repeats, mark the client broken
+ * so it is skipped + re-spawned. A clean file (empty diagnostics) or a
+ * genuinely changing diagnostic set resets the counter. Note the
+ * tradeoff: a real, unfixed error on an untouched line also "stays the
+ * same across edits", so this can false-positive on a healthy server —
+ * the threshold is set conservatively and the CLI type-check gate remains
+ * authoritative either way.
+ */
+ private detectStaleDiagnostics(uri: string, text: string): void {
+ const merged = this.diagnostics.getMerged(uri);
+ const keys = new Set(merged.map((d) => diagnosticKey(d)));
+ const prev = this.lastDiagSnapshot.get(uri);
+ if (prev && keys.size > 0 && setsEqual(keys, prev.keys) && text !== prev.text) {
+ this.staleRepeat++;
+ } else {
+ this.staleRepeat = 0;
+ }
+ this.lastDiagSnapshot.set(uri, { keys, text });
+ if (this.staleRepeat >= LanguageServerClient.STALE_REPEAT_THRESHOLD) {
+ this.markBroken(
+ "language server emitting repeated stale diagnostics despite file changes — likely corrupted; restarting",
+ );
+ }
+ }
+
private handleBytes(chunk: Uint8Array): void {
const messages = this.decoder.decode(chunk);
for (const msg of messages) {
@@ -403,26 +499,37 @@ export class LanguageServerClient {
await this.open(filePath);
}
- const slowThreshold = 10_000;
const start = Date.now();
- // Poll until the server pushes diagnostics (even empty = done) or timeout.
- return new Promise((resolve) => {
+ // Poll until the server pushes diagnostics (even empty = done) or the
+ // per-server cap elapses (then we skip it — see aggregateDiagnostics).
+ const received = await new Promise<boolean>((resolve) => {
const check = () => {
const elapsed = Date.now() - start;
- const received = this.diagnostics.hasReceivedPush(uri);
- if (received || elapsed >= timeoutMs) {
- resolve({
- formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity),
- slow: elapsed > slowThreshold,
- timedOut: !received,
- });
+ const got = this.diagnostics.hasReceivedPush(uri);
+ if (got || elapsed >= timeoutMs) {
+ resolve(got);
return;
}
setTimeout(check, 100);
};
check();
});
+
+ // Only a server that actually pushed can be corruption-checked.
+ if (received) {
+ this.detectStaleDiagnostics(uri, opts?.text ?? "");
+ }
+
+ // `slow` is structurally false now: the per-server cap is 10s, so
+ // elapsed can never exceed the old "unusually long" threshold. That
+ // warning is superseded by the timeout→skip notice produced in
+ // aggregateDiagnostics. The field is kept for contract compatibility.
+ return {
+ formatted: this.diagnostics.formatFiltered(uri, opts?.minSeverity),
+ slow: false,
+ timedOut: !received,
+ };
}
getWatchedFilesRegistry(): WatchedFilesRegistry {
@@ -433,11 +540,21 @@ export class LanguageServerClient {
return this.diagnostics;
}
- async request(method: string, params?: unknown): Promise<unknown> {
+ /**
+ * Send a request (hover/definition/references/documentSymbol). Capped at
+ * REQUEST_TIMEOUT_MS so a dead/slow server can't hang the turn — the
+ * initialize handshake bypasses this (it calls rpc.sendRequest directly
+ * with its own 45s race).
+ */
+ async request(
+ method: string,
+ params?: unknown,
+ timeoutMs: number = LanguageServerClient.REQUEST_TIMEOUT_MS,
+ ): Promise<unknown> {
if (!this.rpc || this.state !== "connected") {
throw new Error("Client not connected");
}
- return this.rpc.sendRequest(method, params);
+ return this.rpc.sendRequest(method, params, timeoutMs);
}
shutdown(): void {
@@ -450,3 +567,11 @@ export class LanguageServerClient {
this.state = "not-started";
}
}
+
+function setsEqual<T>(a: Set<T>, b: Set<T>): boolean {
+ if (a.size !== b.size) return false;
+ for (const v of a) {
+ if (!b.has(v)) return false;
+ }
+ return true;
+}
diff --git a/packages/lsp/src/diagnostics.ts b/packages/lsp/src/diagnostics.ts
index bc7ac0a..50beca9 100644
--- a/packages/lsp/src/diagnostics.ts
+++ b/packages/lsp/src/diagnostics.ts
@@ -91,7 +91,7 @@ export class DiagnosticsStore {
}
}
-function diagnosticKey(d: Diagnostic): string {
+export function diagnosticKey(d: Diagnostic): string {
const r = d.range;
return `${r.start.line}:${r.start.character}-${r.end.line}:${r.end.character}:${d.severity ?? 0}:${d.message}`;
}
diff --git a/packages/lsp/src/extension.ts b/packages/lsp/src/extension.ts
index 8e3178a..c0fee44 100644
--- a/packages/lsp/src/extension.ts
+++ b/packages/lsp/src/extension.ts
@@ -8,6 +8,7 @@
import { extname, join } from "node:path";
import type { Extension, HostAPI, ServiceHandle } from "@dispatch/kernel";
import { defineService } from "@dispatch/kernel";
+import { aggregateDiagnostics } from "./aggregate.js";
import type { SpawnedProcess } from "./client.js";
import { LspManager } from "./manager.js";
import { createLspTool } from "./tool.js";
@@ -43,6 +44,14 @@ function realSpawn(
stderr: proc.stderr,
pid: proc.pid,
kill: () => proc.kill(),
+ // Surface process exit so the client can stop querying a dead server
+ // and self-heal (respawn). Bun's Subprocess.exited resolves with the
+ // exit code (or rejects if killed by signal — treat as code:null).
+ onExit: (handler) => {
+ (proc as { exited: Promise<number | null> }).exited
+ .then((code) => handler({ code }))
+ .catch(() => handler({ code: null }));
+ },
};
}
@@ -108,14 +117,20 @@ export const extension: Extension = {
return manager.status(cwd);
},
async getDiagnostics(opts: GetDiagnosticsOpts): Promise<DiagnosticsResult> {
- const timeoutMs = opts.timeoutMs ?? 60_000;
- const slowThreshold = 10_000;
+ // 10s hard ceiling per server, regardless of what the caller
+ // passes (the edit hook still passes 60_000 — clamped here, so
+ // no other-unit edit is needed). A server that doesn't respond
+ // in 10s is skipped with a notice instead of waited out.
+ const PER_SERVER_CAP_MS = 10_000;
+ const timeoutMs = Math.min(opts.timeoutMs ?? PER_SERVER_CAP_MS, PER_SERVER_CAP_MS);
const fileExt = extname(opts.filePath).toLowerCase();
const absolutePath = opts.filePath.startsWith("/")
? opts.filePath
: join(opts.cwd, opts.filePath);
// Get all connected servers matching this file's extension.
+ // A dead/corrupted server has state:"error" and is excluded —
+ // no per-edit hang on a corpse.
const statuses = await manager.status(opts.cwd);
const matching = statuses.filter(
(s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt),
@@ -125,34 +140,15 @@ export const extension: Extension = {
return { formatted: "", slow: false, timedOut: false };
}
- const parts: string[] = [];
- let anySlow = false;
- let anyTimedOut = false;
- const start = Date.now();
-
- for (const s of matching) {
- const client = manager.getClient(s.id, s.root);
- if (!client) continue;
- const waitOpts: { text?: string; timeoutMs?: number; minSeverity?: number } = {
- timeoutMs,
- };
- if (opts.text !== undefined) waitOpts.text = opts.text;
- if (opts.minSeverity !== undefined) waitOpts.minSeverity = opts.minSeverity;
- const result = await client.waitForDiagnostics(absolutePath, waitOpts);
- if (result.slow) anySlow = true;
- if (result.timedOut) anyTimedOut = true;
- if (result.formatted) {
- parts.push(`[${s.name}]\n${result.formatted}`);
- }
- }
-
- const elapsed = Date.now() - start;
+ const agg = await aggregateDiagnostics(
+ (id, root) => manager.getClient(id, root),
+ matching,
+ absolutePath,
+ timeoutMs,
+ { text: opts.text, minSeverity: opts.minSeverity },
+ );
- return {
- formatted: parts.join("\n\n"),
- slow: anySlow || elapsed > slowThreshold,
- timedOut: anyTimedOut,
- };
+ return { formatted: agg.formatted, slow: false, timedOut: agg.timedOut };
},
};
host.provideService(lspServiceHandle, service);
diff --git a/packages/lsp/src/manager.test.ts b/packages/lsp/src/manager.test.ts
index 1649111..d8cba4f 100644
--- a/packages/lsp/src/manager.test.ts
+++ b/packages/lsp/src/manager.test.ts
@@ -361,4 +361,92 @@ describe("manager", () => {
expect(s[0]?.error).toContain("[from .dispatch/lsp.json]");
expect(s[0]?.error).toContain("spawn failed");
});
+
+ it("a client that dies after connecting is skipped + re-spawned after backoff (no storm, no eternal hang)", async () => {
+ // A spawn that completes the initialize handshake AND lets the test
+ // simulate process death via the captured onExit handler.
+ const exitHandlers: Array<(info: { code: number | null; signal?: string }) => void> = [];
+ let spawnCount = 0;
+ const spawn: SpawnProcess = () => {
+ spawnCount++;
+ let messageHandler: ((data: Uint8Array) => void) | null = null;
+ const proc: SpawnedProcess = {
+ stdin: {
+ write: (bytes: Uint8Array) => {
+ const decoded = new TextDecoder().decode(bytes);
+ const headerEnd = decoded.indexOf("\r\n\r\n");
+ if (headerEnd === -1) return;
+ const json = decoded.slice(headerEnd + 4);
+ try {
+ const msg = JSON.parse(json);
+ if (msg.method === "initialize") {
+ setTimeout(() => {
+ const response = JSON.stringify({
+ jsonrpc: "2.0",
+ id: msg.id,
+ result: { capabilities: {} },
+ });
+ messageHandler?.(encode(response));
+ }, 1);
+ }
+ } catch {
+ // ignore
+ }
+ },
+ },
+ stdout: {
+ on: (_event: string, cb: (data: Uint8Array) => void) => {
+ messageHandler = cb;
+ },
+ },
+ pid: 1000 + spawnCount,
+ kill: () => {},
+ onExit: (handler) => {
+ exitHandlers.push(handler);
+ },
+ };
+ return proc;
+ };
+
+ const clock = { now: 0 };
+ const manager = new LspManager({
+ spawn,
+ fileWatcher: noopFileWatcher(),
+ fs: fakeFs({
+ "/project/.dispatch/lsp.json": JSON.stringify({
+ servers: {
+ steep: {
+ command: ["steep", "--stdio"],
+ extensions: [".rb"],
+ rootMarkers: [],
+ },
+ },
+ }),
+ }),
+ now: () => clock.now,
+ });
+
+ // 1) Connects.
+ const s1 = await manager.status("/project");
+ expect(s1[0]?.state).toBe("connected");
+ expect(spawnCount).toBe(1);
+
+ // 2) Simulate the process dying (user kill / crash) via onExit.
+ exitHandlers[0]?.({ code: 1 });
+ const clientAfterDeath = manager.getClient("steep", "/project");
+ expect(clientAfterDeath?.getState()).toBe("error");
+
+ // 3) status() now reports error (and seeds a broken entry for backoff).
+ // Backoff not elapsed yet (clock frozen at 0) → NOT re-spawned.
+ const s2 = await manager.status("/project");
+ expect(s2[0]?.state).toBe("error");
+ expect(s2[0]?.error).toMatch(/process exited/);
+ expect(spawnCount).toBe(1); // no retry storm before backoff
+
+ // 4) After the backoff elapses, status() re-spawns a fresh server.
+ clock.now = 31_000;
+ const s3 = await manager.status("/project");
+ expect(s3[0]?.state).toBe("connected");
+ expect(spawnCount).toBe(2); // re-spawned exactly once
+ });
});
diff --git a/packages/lsp/src/manager.ts b/packages/lsp/src/manager.ts
index 7153956..bc84479 100644
--- a/packages/lsp/src/manager.ts
+++ b/packages/lsp/src/manager.ts
@@ -134,6 +134,19 @@ export class LspManager {
if (existing) {
const state = existing.client.getState();
const stateError = existing.client.getStateError();
+ // A client that died or corrupted AFTER connecting flipped its
+ // own state to "error" (client.ts handleExit/markBroken). Spawn
+ // succeeded so there's no broken entry yet — seed one so the
+ // bounded-backoff path above re-spawns it, instead of reporting
+ // error forever (and so getDiagnostics' "connected" filter skips
+ // it, avoiding a per-edit hang on the corpse).
+ if (state === "error" && !this.broken.has(key)) {
+ this.broken.set(key, {
+ configFingerprint: configFingerprint(server),
+ brokenAt: this.now(),
+ error: enrichError(server, stateError ?? "server unavailable"),
+ });
+ }
const status: LspServerStatus = {
id: server.id,
name: server.name,
diff --git a/packages/lsp/src/rpc.test.ts b/packages/lsp/src/rpc.test.ts
index 05ce924..7b22ec5 100644
--- a/packages/lsp/src/rpc.test.ts
+++ b/packages/lsp/src/rpc.test.ts
@@ -84,3 +84,29 @@ it("handleMessage does not throw on malformed JSON", async () => {
await expect(conn.handleMessage("")).resolves.toBeUndefined();
await expect(conn.handleMessage("not json at all")).resolves.toBeUndefined();
});
+
+describe("sendRequest timeout", () => {
+ it("rejects with a timeout error when no response arrives within timeoutMs", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("textDocument/hover", {}, 50);
+ await expect(promise).rejects.toThrow(/LSP request timed out after 50ms: textDocument\/hover/);
+ });
+
+ it("clears the timer on a normal response (no unhandled rejection)", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("textDocument/hover", {}, 5000);
+ conn.handleMessage(frameResponse(1, { ok: true }));
+ await expect(promise).resolves.toEqual({ ok: true });
+ // Give the (now-cleared) timer window ample time to prove it never fires.
+ await new Promise((r) => setTimeout(r, 80));
+ });
+
+ it("does not time out when no timeoutMs is given (initialize handshake path)", async () => {
+ const { conn } = makeConnection();
+ const promise = conn.sendRequest("initialize", {});
+ // A late response well past any plausible default still resolves.
+ await new Promise((r) => setTimeout(r, 60));
+ conn.handleMessage(frameResponse(1, { capabilities: {} }));
+ await expect(promise).resolves.toEqual({ capabilities: {} });
+ });
+});
diff --git a/packages/lsp/src/rpc.ts b/packages/lsp/src/rpc.ts
index 6b82624..95157de 100644
--- a/packages/lsp/src/rpc.ts
+++ b/packages/lsp/src/rpc.ts
@@ -39,11 +39,36 @@ export class JsonRpcConnection {
this.write = write;
}
- sendRequest(method: string, params?: unknown): Promise<unknown> {
+ /**
+ * Send a request and await the correlated response. If `timeoutMs` is given,
+ * the promise rejects with a timeout error after that long — so a dead/slow
+ * server can't hang the caller forever (hover/definition/references).
+ * No `timeoutMs` = wait indefinitely (used by the initialize handshake, which
+ * has its own 45s race).
+ */
+ sendRequest(method: string, params?: unknown, timeoutMs?: number): Promise<unknown> {
const id = this.nextId++;
const msg: JsonRpcMessage = { jsonrpc: "2.0", id, method, params };
return new Promise((resolve, reject) => {
- this.pending.set(id, { resolve, reject });
+ let timer: ReturnType<typeof setTimeout> | undefined;
+ // Wrap resolve/reject so the timer is cleared on a normal response
+ // (or on dispose) — no dangling timer after completion.
+ const finish = (fn: () => void): void => {
+ if (timer) clearTimeout(timer);
+ fn();
+ };
+ const entry: PendingRequest = {
+ resolve: (value: unknown) => finish(() => resolve(value)),
+ reject: (reason: unknown) => finish(() => reject(reason)),
+ };
+ if (timeoutMs !== undefined) {
+ timer = setTimeout(() => {
+ if (this.pending.delete(id)) {
+ reject(new Error(`LSP request timed out after ${timeoutMs}ms: ${method}`));
+ }
+ }, timeoutMs);
+ }
+ this.pending.set(id, entry);
this.sendMessage(msg);
});
}
diff --git a/packages/lsp/src/tool.test.ts b/packages/lsp/src/tool.test.ts
index 03787ae..efd1514 100644
--- a/packages/lsp/src/tool.test.ts
+++ b/packages/lsp/src/tool.test.ts
@@ -176,4 +176,101 @@ describe("tool", () => {
expect(result.isError).toBe(true);
expect(result.content).toContain("requires both");
});
+
+ it("diagnostics op: a server that times out is skipped with a raise-to-user notice", async () => {
+ const mockClient = {
+ getState: () => "connected" as const,
+ getStateError: () => undefined,
+ request: async () => null,
+ waitForDiagnostics: async () => ({ formatted: "", slow: false, timedOut: true }),
+ };
+
+ const tool = createLspTool(
+ stubManager({
+ status: async () => [
+ {
+ id: "steep",
+ name: "Steep",
+ root: "/project",
+ extensions: [".rb"],
+ state: "connected",
+ },
+ ],
+ getClient: () => mockClient as never,
+ }),
+ );
+
+ const result = await tool.execute(
+ { operation: "diagnostics", path: "game.rb" },
+ {
+ toolCallId: "test",
+ onOutput: () => {},
+ signal: AbortSignal.timeout(5000),
+ log: {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: () => ({}) as never,
+ span: () => ({}) as never,
+ },
+ cwd: "/project",
+ },
+ );
+
+ expect(result.isError).not.toBe(true);
+ expect(result.content).toContain("[Steep]");
+ expect(result.content).toContain("took too long");
+ expect(result.content).toContain(">10s");
+ expect(result.content).toContain("raise this to the user");
+ });
+
+ it("diagnostics op: responding servers' diagnostics are merged, tagged by source", async () => {
+ const mockClient = {
+ getState: () => "connected" as const,
+ getStateError: () => undefined,
+ request: async () => null,
+ waitForDiagnostics: async () => ({
+ formatted: "ERROR L1:1: boom",
+ slow: false,
+ timedOut: false,
+ }),
+ };
+
+ const tool = createLspTool(
+ stubManager({
+ status: async () => [
+ {
+ id: "steep",
+ name: "Steep",
+ root: "/project",
+ extensions: [".rb"],
+ state: "connected",
+ },
+ ],
+ getClient: () => mockClient as never,
+ }),
+ );
+
+ const result = await tool.execute(
+ { operation: "diagnostics", path: "game.rb" },
+ {
+ toolCallId: "test",
+ onOutput: () => {},
+ signal: AbortSignal.timeout(5000),
+ log: {
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child: () => ({}) as never,
+ span: () => ({}) as never,
+ },
+ cwd: "/project",
+ },
+ );
+
+ expect(result.content).toContain("[Steep]");
+ expect(result.content).toContain("boom");
+ });
});
diff --git a/packages/lsp/src/tool.ts b/packages/lsp/src/tool.ts
index 8d282ec..be0d269 100644
--- a/packages/lsp/src/tool.ts
+++ b/packages/lsp/src/tool.ts
@@ -6,6 +6,7 @@
import { extname, resolve } from "node:path";
import type { ToolContract, ToolExecuteContext, ToolResult } from "@dispatch/kernel";
+import { aggregateDiagnostics } from "./aggregate.js";
import type { LspManager } from "./manager.js";
type Operation = "diagnostics" | "hover" | "definition" | "references" | "documentSymbol";
@@ -157,6 +158,8 @@ export function createLspTool(manager: LspManager): ToolContract {
switch (operation) {
case "diagnostics": {
+ // 10s hard ceiling per server (same policy as the edit path).
+ const DIAGNOSTICS_TIMEOUT_MS = 10_000;
// Query ALL connected servers whose extensions match this file.
const matching = statuses.filter(
(s) => s.state === "connected" && s.extensions.some((ext) => ext === fileExt),
@@ -179,31 +182,27 @@ export function createLspTool(manager: LspManager): ToolContract {
if (!client) {
return { content: "Language server client not available.", isError: true };
}
- const result = await client.waitForDiagnostics(absolutePath);
+ const result = await client.waitForDiagnostics(absolutePath, {
+ timeoutMs: DIAGNOSTICS_TIMEOUT_MS,
+ });
+ if (result.timedOut) {
+ return {
+ content: `⚠️ [${connected.name}] LSP took too long (>10s), diagnostics skipped — please raise this to the user.`,
+ };
+ }
return { content: result.formatted || "No diagnostics found." };
}
- // Query each matching server and merge results, tagged by source.
- const parts: string[] = [];
- let anyTimedOut = false;
- for (const s of matching) {
- const client = manager.getClient(s.id, s.root);
- if (!client) continue;
- const result = await client.waitForDiagnostics(absolutePath, { timeoutMs: 60_000 });
- if (result.timedOut) anyTimedOut = true;
- if (result.slow) {
- parts.push(
- `⚠️ LSP is taking unusually long. If this happens more than once, raise it to the user.`,
- );
- }
- if (result.formatted) {
- parts.push(`[${s.name}]\n${result.formatted}`);
- }
- }
- if (anyTimedOut && parts.length === 0) {
- parts.push("Diagnostics timed out (server may still be indexing).");
- }
- return { content: parts.length > 0 ? parts.join("\n\n") : "No diagnostics found." };
+ // Query matching servers concurrently, each capped at 10s;
+ // a non-responding server is skipped with a notice.
+ const agg = await aggregateDiagnostics(
+ (id, root) => manager.getClient(id, root),
+ matching,
+ absolutePath,
+ DIAGNOSTICS_TIMEOUT_MS,
+ {},
+ );
+ return { content: agg.formatted || "No diagnostics found." };
}
case "hover": {
const client = await getFirstMatchingClient(manager, statuses, fileExt);
diff --git a/packages/session-orchestrator/src/index.ts b/packages/session-orchestrator/src/index.ts
index fa8d9e9..aaafb76 100644
--- a/packages/session-orchestrator/src/index.ts
+++ b/packages/session-orchestrator/src/index.ts
@@ -12,6 +12,7 @@ export {
conversationOpened,
conversationStatusChanged,
createCompactionService,
+ createRetryStrategy,
createSessionOrchestrator,
createWarmService,
type EnqueueInput,
@@ -34,8 +35,13 @@ export {
} from "./orchestrator.js";
export {
buildUserMessage,
+ cumulativeSleepMs,
defaultDispatchPolicy,
+ delayFor,
generateTurnId,
+ RETRY_BUDGET_MS,
+ RETRY_SCHEDULE_MS,
+ RETRY_TAIL_MS,
resolveReasoningEffort,
selectFirstProvider,
} from "./pure.js";
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index ae27e59..a1401d6 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -11,6 +11,7 @@ import type {
ProviderEvent,
ProviderStreamOptions,
ReasoningEffort,
+ RetryStrategy,
RunTurnInput,
RunTurnResult,
ToolContract,
@@ -24,6 +25,7 @@ import { createMetricsAccumulator } from "./metrics.js";
import {
buildUserMessage,
defaultDispatchPolicy,
+ delayFor,
generateTurnId,
resolveModelName,
resolveReasoningEffort,
@@ -342,12 +344,45 @@ export interface SessionOrchestratorBundle {
readonly activeConversations: ReadonlySet<string>;
}
+/**
+ * The concrete retry strategy wired into every turn's `RunTurnInput.retry`.
+ *
+ * `delayFor` is the pure schedule (`5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`,
+ * then repeat 30m until 8h cumulative scheduled sleep) — no I/O, no clock.
+ * `sleep` is the abortable I/O effect: a `setTimeout`-based promise that
+ * rejects when the turn's abort signal fires (so a retry in flight seals the
+ * turn `aborted`). The kernel imports no timer; this is the shell-provided I/O.
+ */
+export function createRetryStrategy(): RetryStrategy {
+ const sleep = (ms: number, signal: AbortSignal): Promise<void> => {
+ return new Promise((resolve, reject) => {
+ if (signal.aborted) {
+ reject(new Error("aborted"));
+ return;
+ }
+ const timer = setTimeout(() => {
+ signal.removeEventListener("abort", onAbort);
+ resolve();
+ }, ms);
+ const onAbort = () => {
+ clearTimeout(timer);
+ reject(new Error("aborted"));
+ };
+ signal.addEventListener("abort", onAbort, { once: true });
+ });
+ };
+ return { delayFor, sleep };
+}
+
export function createSessionOrchestrator(
deps: SessionOrchestratorDeps,
): SessionOrchestratorBundle {
const activeConversations = new Set<string>();
const subscribers = new Map<string, Set<TurnEventListener>>();
const activeTurns = new Map<string, ActiveTurn>();
+ // One stateless retry strategy shared by every turn (delayFor is pure; sleep
+ // is a stateless setTimeout closure). Wired into each RunTurnInput.retry.
+ const retryStrategy = createRetryStrategy();
function emitToHub(conversationId: string, event: AgentEvent): void {
const turn = activeTurns.get(conversationId);
@@ -640,6 +675,7 @@ export function createSessionOrchestrator(
turnId,
signal: controller.signal,
providerOpts,
+ retry: retryStrategy,
...(turnLogger !== undefined ? { logger: turnLogger } : {}),
...(effectiveCwd !== undefined ? { cwd: effectiveCwd } : {}),
...(effectiveComputerId !== undefined ? { computerId: effectiveComputerId } : {}),
diff --git a/packages/session-orchestrator/src/pure.test.ts b/packages/session-orchestrator/src/pure.test.ts
index 9e5d3c4..2cbe15f 100644
--- a/packages/session-orchestrator/src/pure.test.ts
+++ b/packages/session-orchestrator/src/pure.test.ts
@@ -2,8 +2,13 @@ import type { ProviderContract } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
import {
buildUserMessage,
+ cumulativeSleepMs,
defaultDispatchPolicy,
+ delayFor,
generateTurnId,
+ RETRY_BUDGET_MS,
+ RETRY_SCHEDULE_MS,
+ RETRY_TAIL_MS,
resolveReasoningEffort,
selectFirstProvider,
} from "./pure.js";
@@ -100,3 +105,59 @@ describe("resolveReasoningEffort", () => {
expect(resolveReasoningEffort(undefined, "max")).toBe("max");
});
});
+
+describe("retry backoff schedule (delayFor)", () => {
+ it("emits the stepped head: 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m", () => {
+ expect(delayFor(0)).toBe(5_000);
+ expect(delayFor(1)).toBe(10_000);
+ expect(delayFor(2)).toBe(30_000);
+ expect(delayFor(3)).toBe(60_000);
+ expect(delayFor(4)).toBe(300_000);
+ expect(delayFor(5)).toBe(600_000);
+ expect(delayFor(6)).toBe(900_000);
+ expect(delayFor(7)).toBe(1_800_000);
+ });
+
+ it("repeats 30m after the head", () => {
+ expect(delayFor(8)).toBe(RETRY_TAIL_MS);
+ expect(delayFor(9)).toBe(RETRY_TAIL_MS);
+ expect(delayFor(20)).toBe(RETRY_TAIL_MS);
+ });
+
+ it("gives up (returns undefined) once cumulative sleep exceeds 8h", () => {
+ // Head sums to 3,705,000 ms; +1,800,000 per extra step. 8h = 28,800,000.
+ // attempt 20 cumulative = 3,705,000 + 13*1,800,000 = 27,105,000 (< 8h) → retry.
+ expect(delayFor(20)).toBe(RETRY_TAIL_MS);
+ // attempt 21 cumulative = 27,105,000 + 1,800,000 = 28,905,000 (> 8h) → stop.
+ expect(delayFor(21)).toBeUndefined();
+ });
+
+ it("cumulativeSleepMs matches the sum of the schedule", () => {
+ expect(cumulativeSleepMs(0)).toBe(5_000);
+ expect(cumulativeSleepMs(1)).toBe(15_000);
+ expect(cumulativeSleepMs(7)).toBe(RETRY_SCHEDULE_MS.reduce((a, b) => a + b, 0));
+ // 8h budget is 28,800,000 ms.
+ expect(RETRY_BUDGET_MS).toBe(8 * 60 * 60 * 1000);
+ // The last retry (attempt 20) keeps cumulative under budget.
+ expect(cumulativeSleepMs(20)).toBeLessThanOrEqual(RETRY_BUDGET_MS);
+ // The next (attempt 21) exceeds it.
+ expect(cumulativeSleepMs(21)).toBeGreaterThan(RETRY_BUDGET_MS);
+ });
+
+ it("the full schedule has 21 retries then stops", () => {
+ const schedule: number[] = [];
+ let attempt = 0;
+ while (true) {
+ const delay = delayFor(attempt);
+ if (delay === undefined) break;
+ schedule.push(delay);
+ attempt++;
+ }
+ expect(schedule).toHaveLength(21);
+ expect(schedule[0]).toBe(5_000);
+ expect(schedule.at(-1)).toBe(RETRY_TAIL_MS);
+ // 8 stepped head + 13 tail repeats.
+ expect(schedule.slice(0, 8)).toEqual([...RETRY_SCHEDULE_MS]);
+ expect(schedule.slice(8).every((d) => d === RETRY_TAIL_MS)).toBe(true);
+ });
+});
diff --git a/packages/session-orchestrator/src/pure.ts b/packages/session-orchestrator/src/pure.ts
index 9a31e17..a028cbe 100644
--- a/packages/session-orchestrator/src/pure.ts
+++ b/packages/session-orchestrator/src/pure.ts
@@ -9,6 +9,53 @@ export function buildUserMessage(text: string): ChatMessage {
return { role: "user", chunks: [{ type: "text", text }] };
}
+// ── Provider-error retry backoff schedule ───────────────────────────────────
+//
+// Pure, deterministic delay decision (no I/O, no clock) for retrying retryable
+// provider errors (HTTP 429 / 5xx "overloaded"). The concrete `sleep` (I/O)
+// is wired in the orchestrator; this owns only the policy.
+
+/**
+ * Stepped backoff schedule (ms): 5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m.
+ * After the head is exhausted, {@link RETRY_TAIL_MS} (30m) repeats.
+ */
+export const RETRY_SCHEDULE_MS = [
+ 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000,
+] as const;
+
+/** Tail delay (ms) repeated after the stepped head: 30 minutes. */
+export const RETRY_TAIL_MS = 1_800_000;
+
+/** Cumulative scheduled-sleep budget (ms) after which retrying gives up: 8h. */
+export const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000;
+
+/**
+ * Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]).
+ * Pure — no I/O, no clock.
+ */
+export function cumulativeSleepMs(attempt: number): number {
+ let sum = 0;
+ for (let i = 0; i <= attempt; i++) {
+ sum += i < RETRY_SCHEDULE_MS.length ? (RETRY_SCHEDULE_MS[i] ?? RETRY_TAIL_MS) : RETRY_TAIL_MS;
+ }
+ return sum;
+}
+
+/**
+ * Pure, deterministic delay decision for the retry strategy: given the
+ * 0-based attempt index, return the delay in ms to sleep before the next
+ * retry, or `undefined` to stop (cumulative budget exhausted). No I/O, no
+ * clock — fully testable. Matches the plan's schedule:
+ * `5s, 10s, 30s, 60s, 5m, 10m, 15m, 30m`, then repeat 30m until 8h of
+ * cumulative scheduled sleep is reached, then give up.
+ */
+export function delayFor(attempt: number): number | undefined {
+ const scheduled = RETRY_SCHEDULE_MS[attempt];
+ const delay = scheduled !== undefined ? scheduled : RETRY_TAIL_MS;
+ if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop
+ return delay;
+}
+
/**
* Resolve the reasoning-effort level for a turn:
* per-turn override → persisted per-conversation value → default `"high"`.
diff --git a/packages/tool-edit-file/src/extension.ts b/packages/tool-edit-file/src/extension.ts
index 2eaa0e9..9dbebda 100644
--- a/packages/tool-edit-file/src/extension.ts
+++ b/packages/tool-edit-file/src/extension.ts
@@ -41,7 +41,10 @@ export const extension: Extension = {
filePath: opts.filePath,
text: opts.text,
cwd: opts.cwd,
- timeoutMs: 60_000,
+ // 10s matches the LSP service's per-server cap (see packages/lsp).
+ // The service clamps this anyway; stated explicitly so the call
+ // site is honest about the effective live-diagnostics budget.
+ timeoutMs: 10_000,
minSeverity: 2, // errors + warnings only
});
};
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index f6a95cf..8dc3a72 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -273,6 +273,7 @@ export type AgentEvent =
| TurnUsageEvent
| TurnStepCompleteEvent
| TurnErrorEvent
+ | TurnProviderRetryEvent
| TurnDoneEvent
| TurnSealedEvent
| TurnSteeringEvent;
@@ -429,6 +430,31 @@ export interface TurnErrorEvent {
readonly code?: string;
}
+/**
+ * A retryable provider error is being retried with backoff. Emitted once per
+ * scheduled retry, BEFORE the sleep, so the UI can show "⚠ Server overloaded —
+ * retrying in 5s…" immediately. TRANSIENT: emitted to the frontend but NOT
+ * persisted into the model's message history (it never pollutes the prompt).
+ *
+ * When the retry budget is exhausted, the existing `error` event is emitted and
+ * the turn seals — so the final failure is still a persisted error. `attempt` is
+ * 0-based (the Nth retry about to happen); `delayMs` is the scheduled sleep
+ * before that retry fires.
+ */
+export interface TurnProviderRetryEvent {
+ readonly type: "provider-retry";
+ readonly conversationId: string;
+ readonly turnId: string;
+ /** 0-based: this is the Nth retry about to happen. */
+ readonly attempt: number;
+ /** ms the client should expect to wait before the retry fires. */
+ readonly delayMs: number;
+ /** The endpoint's error verbatim (e.g. "HTTP 429: {…overloaded_error…}"). */
+ readonly message: string;
+ /** The HTTP code when known (e.g. "429"). */
+ readonly code?: string;
+}
+
/** The turn has completed (model finished generating). */
export interface TurnDoneEvent {
readonly type: "done";