summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-03-28 12:09:47 -0400
committerGitHub <[email protected]>2026-03-28 12:09:47 -0400
commit860531c275cf845f80ccf26bba5bad745fe98398 (patch)
tree10a16b2a34a1b0a2b33771c7177e0df4b5a090b4 /packages
parent2b86b36c8cd7d13a64d1e85296554cf5414d532b (diff)
downloadopencode-860531c275cf845f80ccf26bba5bad745fe98398.tar.gz
opencode-860531c275cf845f80ccf26bba5bad745fe98398.zip
refactor(session): effectify session processor (#19485)
Diffstat (limited to 'packages')
-rw-r--r--packages/opencode/src/effect/run-service.ts2
-rw-r--r--packages/opencode/src/session/compaction.ts113
-rw-r--r--packages/opencode/src/session/llm.ts31
-rw-r--r--packages/opencode/src/session/overflow.ts22
-rw-r--r--packages/opencode/src/session/processor.ts902
-rw-r--r--packages/opencode/src/session/prompt.ts2
-rw-r--r--packages/opencode/src/session/retry.ts77
-rw-r--r--packages/opencode/test/session/compaction.test.ts485
-rw-r--r--packages/opencode/test/session/processor-effect.test.ts838
-rw-r--r--packages/opencode/test/session/prompt.test.ts230
-rw-r--r--packages/opencode/test/session/retry.test.ts68
11 files changed, 2168 insertions, 602 deletions
diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts
index 76248ca88..2daa29fde 100644
--- a/packages/opencode/src/effect/run-service.ts
+++ b/packages/opencode/src/effect/run-service.ts
@@ -9,6 +9,8 @@ export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: L
return {
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
+ runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
+ getRuntime().runPromiseExit(service.use(fn), options),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromise(service.use(fn), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts
index f9ee56549..223e71639 100644
--- a/packages/opencode/src/session/compaction.ts
+++ b/packages/opencode/src/session/compaction.ts
@@ -14,10 +14,10 @@ import { Agent } from "@/agent/agent"
import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db"
-import { ProviderTransform } from "@/provider/transform"
import { ModelID, ProviderID } from "@/provider/schema"
-import { Effect, Layer, ServiceMap } from "effect"
+import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
+import { isOverflow as overflow } from "./overflow"
export namespace SessionCompaction {
const log = Log.create({ service: "session.compaction" })
@@ -31,7 +31,6 @@ export namespace SessionCompaction {
),
}
- const COMPACTION_BUFFER = 20_000
export const PRUNE_MINIMUM = 20_000
export const PRUNE_PROTECT = 40_000
const PRUNE_PROTECTED_TOOLS = ["skill"]
@@ -64,7 +63,7 @@ export namespace SessionCompaction {
export const layer: Layer.Layer<
Service,
never,
- Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service
+ Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service | SessionProcessor.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
@@ -73,26 +72,13 @@ export namespace SessionCompaction {
const session = yield* Session.Service
const agents = yield* Agent.Service
const plugin = yield* Plugin.Service
+ const processors = yield* SessionProcessor.Service
const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: {
tokens: MessageV2.Assistant["tokens"]
model: Provider.Model
}) {
- const cfg = yield* config.get()
- if (cfg.compaction?.auto === false) return false
- const context = input.model.limit.context
- if (context === 0) return false
-
- const count =
- input.tokens.total ||
- input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
-
- const reserved =
- cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
- const usable = input.model.limit.input
- ? input.model.limit.input - reserved
- : context - ProviderTransform.maxOutputTokens(input.model)
- return count >= usable
+ return overflow({ cfg: yield* config.get(), tokens: input.tokens, model: input.model })
})
// goes backwards through parts until there are PRUNE_PROTECT tokens worth of tool
@@ -181,38 +167,6 @@ export namespace SessionCompaction {
? Provider.getModel(agent.model.providerID, agent.model.modelID)
: Provider.getModel(userMessage.model.providerID, userMessage.model.modelID),
)
- const msg = (yield* session.updateMessage({
- id: MessageID.ascending(),
- role: "assistant",
- parentID: input.parentID,
- sessionID: input.sessionID,
- mode: "compaction",
- agent: "compaction",
- variant: userMessage.variant,
- summary: true,
- path: {
- cwd: Instance.directory,
- root: Instance.worktree,
- },
- cost: 0,
- tokens: {
- output: 0,
- input: 0,
- reasoning: 0,
- cache: { read: 0, write: 0 },
- },
- modelID: model.id,
- providerID: model.providerID,
- time: {
- created: Date.now(),
- },
- })) as MessageV2.Assistant
- const processor = SessionProcessor.create({
- assistantMessage: msg,
- sessionID: input.sessionID,
- model,
- abort: input.abort,
- })
// Allow plugins to inject context or replace compaction prompt.
const compacting = yield* plugin.trigger(
"experimental.session.compacting",
@@ -251,8 +205,47 @@ When constructing the summary, try to stick to this template:
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
- const result = yield* Effect.promise(() =>
- processor.process({
+ const msg = (yield* session.updateMessage({
+ id: MessageID.ascending(),
+ role: "assistant",
+ parentID: input.parentID,
+ sessionID: input.sessionID,
+ mode: "compaction",
+ agent: "compaction",
+ variant: userMessage.variant,
+ summary: true,
+ path: {
+ cwd: Instance.directory,
+ root: Instance.worktree,
+ },
+ cost: 0,
+ tokens: {
+ output: 0,
+ input: 0,
+ reasoning: 0,
+ cache: { read: 0, write: 0 },
+ },
+ modelID: model.id,
+ providerID: model.providerID,
+ time: {
+ created: Date.now(),
+ },
+ })) as MessageV2.Assistant
+ const processor = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: input.sessionID,
+ model,
+ abort: input.abort,
+ })
+ const cancel = Effect.fn("SessionCompaction.cancel")(function* () {
+ if (!input.abort.aborted || msg.time.completed) return
+ msg.error = msg.error ?? new MessageV2.AbortedError({ message: "Aborted" }).toObject()
+ msg.finish = msg.finish ?? "error"
+ msg.time.completed = Date.now()
+ yield* session.updateMessage(msg)
+ })
+ const result = yield* processor
+ .process({
user: userMessage,
agent,
abort: input.abort,
@@ -267,8 +260,8 @@ When constructing the summary, try to stick to this template:
},
],
model,
- }),
- )
+ })
+ .pipe(Effect.ensuring(cancel()))
if (result === "compact") {
processor.message.error = new MessageV2.ContextOverflowError({
@@ -383,6 +376,7 @@ When constructing the summary, try to stick to this template:
Effect.sync(() =>
layer.pipe(
Layer.provide(Session.defaultLayer),
+ Layer.provide(SessionProcessor.defaultLayer),
Layer.provide(Agent.defaultLayer),
Layer.provide(Plugin.defaultLayer),
Layer.provide(Bus.layer),
@@ -391,7 +385,7 @@ When constructing the summary, try to stick to this template:
),
)
- const { runPromise } = makeRuntime(Service, defaultLayer)
+ const { runPromise, runPromiseExit } = makeRuntime(Service, defaultLayer)
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
return runPromise((svc) => svc.isOverflow(input))
@@ -409,7 +403,12 @@ When constructing the summary, try to stick to this template:
auto: boolean
overflow?: boolean
}) {
- return runPromise((svc) => svc.process(input))
+ const exit = await runPromiseExit((svc) => svc.process(input), { signal: input.abort })
+ if (Exit.isFailure(exit)) {
+ if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop"
+ throw Cause.squash(exit.cause)
+ }
+ return exit.value
}
export const create = fn(
diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts
index ed82ebc59..7c67c1b3f 100644
--- a/packages/opencode/src/session/llm.ts
+++ b/packages/opencode/src/session/llm.ts
@@ -1,5 +1,7 @@
import { Provider } from "@/provider/provider"
import { Log } from "@/util/log"
+import { Effect, Layer, ServiceMap } from "effect"
+import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import { mergeDeep, pipe } from "remeda"
import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider"
@@ -34,6 +36,35 @@ export namespace LLM {
toolChoice?: "auto" | "required" | "none"
}
+ export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never
+
+ export interface Interface {
+ readonly stream: (input: StreamInput) => Stream.Stream<Event, unknown>
+ }
+
+ export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/LLM") {}
+
+ export const layer = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ return Service.of({
+ stream(input) {
+ return Stream.unwrap(
+ Effect.promise(() => LLM.stream(input)).pipe(
+ Effect.map((result) =>
+ Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe(
+ Stream.mapEffect((event) => Effect.succeed(event)),
+ ),
+ ),
+ ),
+ )
+ },
+ })
+ }),
+ )
+
+ export const defaultLayer = layer
+
export async function stream(input: StreamInput) {
const l = log
.clone()
diff --git a/packages/opencode/src/session/overflow.ts b/packages/opencode/src/session/overflow.ts
new file mode 100644
index 000000000..f0e52565d
--- /dev/null
+++ b/packages/opencode/src/session/overflow.ts
@@ -0,0 +1,22 @@
+import type { Config } from "@/config/config"
+import type { Provider } from "@/provider/provider"
+import { ProviderTransform } from "@/provider/transform"
+import type { MessageV2 } from "./message-v2"
+
+const COMPACTION_BUFFER = 20_000
+
+export function isOverflow(input: { cfg: Config.Info; tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
+ if (input.cfg.compaction?.auto === false) return false
+ const context = input.model.limit.context
+ if (context === 0) return false
+
+ const count =
+ input.tokens.total || input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
+
+ const reserved =
+ input.cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
+ const usable = input.model.limit.input
+ ? input.model.limit.input - reserved
+ : context - ProviderTransform.maxOutputTokens(input.model)
+ return count >= usable
+}
diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts
index 84ea76656..d2459cd8b 100644
--- a/packages/opencode/src/session/processor.ts
+++ b/packages/opencode/src/session/processor.ts
@@ -1,430 +1,554 @@
-import { MessageV2 } from "./message-v2"
-import { Log } from "@/util/log"
-import { Session } from "."
+import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
+import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
-import { Snapshot } from "@/snapshot"
-import { SessionSummary } from "./summary"
import { Bus } from "@/bus"
+import { makeRuntime } from "@/effect/run-service"
+import { Config } from "@/config/config"
+import { Permission } from "@/permission"
+import { Plugin } from "@/plugin"
+import { Snapshot } from "@/snapshot"
+import { Log } from "@/util/log"
+import { Session } from "."
+import { LLM } from "./llm"
+import { MessageV2 } from "./message-v2"
+import { isOverflow } from "./overflow"
+import { PartID } from "./schema"
+import type { SessionID } from "./schema"
import { SessionRetry } from "./retry"
import { SessionStatus } from "./status"
-import { Plugin } from "@/plugin"
+import { SessionSummary } from "./summary"
import type { Provider } from "@/provider/provider"
-import { LLM } from "./llm"
-import { Config } from "@/config/config"
-import { SessionCompaction } from "./compaction"
-import { Permission } from "@/permission"
import { Question } from "@/question"
-import { PartID } from "./schema"
-import type { SessionID, MessageID } from "./schema"
export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
- export type Info = Awaited<ReturnType<typeof create>>
- export type Result = Awaited<ReturnType<Info["process"]>>
+ export type Result = "compact" | "stop" | "continue"
+
+ export type Event = LLM.Event
+
+ export interface Handle {
+ readonly message: MessageV2.Assistant
+ readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
+ readonly abort: () => Effect.Effect<void>
+ readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
+ }
+
+ export interface Info {
+ readonly message: MessageV2.Assistant
+ readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
+ readonly process: (streamInput: LLM.StreamInput) => Promise<Result>
+ }
- export function create(input: {
+ type Input = {
assistantMessage: MessageV2.Assistant
sessionID: SessionID
model: Provider.Model
abort: AbortSignal
- }) {
- const toolcalls: Record<string, MessageV2.ToolPart> = {}
- let snapshot: string | undefined
- let blocked = false
- let attempt = 0
- let needsCompaction = false
-
- const result = {
- get message() {
- return input.assistantMessage
- },
- partFromToolCall(toolCallID: string) {
- return toolcalls[toolCallID]
- },
- async process(streamInput: LLM.StreamInput) {
- log.info("process")
- needsCompaction = false
- const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true
- while (true) {
- try {
- let currentText: MessageV2.TextPart | undefined
- let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
- const stream = await LLM.stream(streamInput)
-
- for await (const value of stream.fullStream) {
- input.abort.throwIfAborted()
- switch (value.type) {
- case "start":
- await SessionStatus.set(input.sessionID, { type: "busy" })
- break
-
- case "reasoning-start":
- if (value.id in reasoningMap) {
- continue
- }
- const reasoningPart = {
- id: PartID.ascending(),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "reasoning" as const,
- text: "",
- time: {
- start: Date.now(),
- },
- metadata: value.providerMetadata,
- }
- reasoningMap[value.id] = reasoningPart
- await Session.updatePart(reasoningPart)
- break
-
- case "reasoning-delta":
- if (value.id in reasoningMap) {
- const part = reasoningMap[value.id]
- part.text += value.text
- if (value.providerMetadata) part.metadata = value.providerMetadata
- await Session.updatePartDelta({
- sessionID: part.sessionID,
- messageID: part.messageID,
- partID: part.id,
- field: "text",
- delta: value.text,
- })
- }
- break
-
- case "reasoning-end":
- if (value.id in reasoningMap) {
- const part = reasoningMap[value.id]
- part.text = part.text.trimEnd()
-
- part.time = {
- ...part.time,
- end: Date.now(),
- }
- if (value.providerMetadata) part.metadata = value.providerMetadata
- await Session.updatePart(part)
- delete reasoningMap[value.id]
- }
- break
-
- case "tool-input-start":
- const part = await Session.updatePart({
- id: toolcalls[value.id]?.id ?? PartID.ascending(),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "tool",
- tool: value.toolName,
- callID: value.id,
- state: {
- status: "pending",
- input: {},
- raw: "",
- },
- })
- toolcalls[value.id] = part as MessageV2.ToolPart
- break
-
- case "tool-input-delta":
- break
-
- case "tool-input-end":
- break
-
- case "tool-call": {
- const match = toolcalls[value.toolCallId]
- if (match) {
- const part = await Session.updatePart({
- ...match,
- tool: value.toolName,
- state: {
- status: "running",
- input: value.input,
- time: {
- start: Date.now(),
- },
- },
- metadata: value.providerMetadata,
- })
- toolcalls[value.toolCallId] = part as MessageV2.ToolPart
-
- const parts = await MessageV2.parts(input.assistantMessage.id)
- const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD)
-
- if (
- lastThree.length === DOOM_LOOP_THRESHOLD &&
- lastThree.every(
- (p) =>
- p.type === "tool" &&
- p.tool === value.toolName &&
- p.state.status !== "pending" &&
- JSON.stringify(p.state.input) === JSON.stringify(value.input),
- )
- ) {
- const agent = await Agent.get(input.assistantMessage.agent)
- await Permission.ask({
- permission: "doom_loop",
- patterns: [value.toolName],
- sessionID: input.assistantMessage.sessionID,
- metadata: {
- tool: value.toolName,
- input: value.input,
- },
- always: [value.toolName],
- ruleset: agent.permission,
- })
- }
- }
- break
- }
- case "tool-result": {
- const match = toolcalls[value.toolCallId]
- if (match && match.state.status === "running") {
- await Session.updatePart({
- ...match,
- state: {
- status: "completed",
- input: value.input ?? match.state.input,
- output: value.output.output,
- metadata: value.output.metadata,
- title: value.output.title,
- time: {
- start: match.state.time.start,
- end: Date.now(),
- },
- attachments: value.output.attachments,
- },
- })
-
- delete toolcalls[value.toolCallId]
- }
- break
- }
+ }
- case "tool-error": {
- const match = toolcalls[value.toolCallId]
- if (match && match.state.status === "running") {
- await Session.updatePart({
- ...match,
- state: {
- status: "error",
- input: value.input ?? match.state.input,
- error: value.error instanceof Error ? value.error.message : String(value.error),
- time: {
- start: match.state.time.start,
- end: Date.now(),
- },
- },
- })
-
- if (
- value.error instanceof Permission.RejectedError ||
- value.error instanceof Question.RejectedError
- ) {
- blocked = shouldBreak
- }
- delete toolcalls[value.toolCallId]
- }
- break
- }
- case "error":
- throw value.error
+ export interface Interface {
+ readonly create: (input: Input) => Effect.Effect<Handle>
+ }
- case "start-step":
- snapshot = await Snapshot.track()
- await Session.updatePart({
- id: PartID.ascending(),
- messageID: input.assistantMessage.id,
- sessionID: input.sessionID,
- snapshot,
- type: "step-start",
- })
- break
+ interface ProcessorContext extends Input {
+ toolcalls: Record<string, MessageV2.ToolPart>
+ shouldBreak: boolean
+ snapshot: string | undefined
+ blocked: boolean
+ needsCompaction: boolean
+ currentText: MessageV2.TextPart | undefined
+ reasoningMap: Record<string, MessageV2.ReasoningPart>
+ }
- case "finish-step":
- const usage = Session.getUsage({
- model: input.model,
- usage: value.usage,
- metadata: value.providerMetadata,
- })
- input.assistantMessage.finish = value.finishReason
- input.assistantMessage.cost += usage.cost
- input.assistantMessage.tokens = usage.tokens
- await Session.updatePart({
- id: PartID.ascending(),
- reason: value.finishReason,
- snapshot: await Snapshot.track(),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "step-finish",
- tokens: usage.tokens,
- cost: usage.cost,
- })
- await Session.updateMessage(input.assistantMessage)
- if (snapshot) {
- const patch = await Snapshot.patch(snapshot)
- if (patch.files.length) {
- await Session.updatePart({
- id: PartID.ascending(),
- messageID: input.assistantMessage.id,
- sessionID: input.sessionID,
- type: "patch",
- hash: patch.hash,
- files: patch.files,
- })
- }
- snapshot = undefined
- }
- SessionSummary.summarize({
- sessionID: input.sessionID,
- messageID: input.assistantMessage.parentID,
- })
- if (
- !input.assistantMessage.summary &&
- (await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model }))
- ) {
- needsCompaction = true
- }
- break
-
- case "text-start":
- currentText = {
- id: PartID.ascending(),
- messageID: input.assistantMessage.id,
- sessionID: input.assistantMessage.sessionID,
- type: "text",
- text: "",
- time: {
- start: Date.now(),
- },
- metadata: value.providerMetadata,
- }
- await Session.updatePart(currentText)
- break
-
- case "text-delta":
- if (currentText) {
- currentText.text += value.text
- if (value.providerMetadata) currentText.metadata = value.providerMetadata
- await Session.updatePartDelta({
- sessionID: currentText.sessionID,
- messageID: currentText.messageID,
- partID: currentText.id,
- field: "text",
- delta: value.text,
- })
- }
- break
-
- case "text-end":
- if (currentText) {
- currentText.text = currentText.text.trimEnd()
- const textOutput = await Plugin.trigger(
- "experimental.text.complete",
- {
- sessionID: input.sessionID,
- messageID: input.assistantMessage.id,
- partID: currentText.id,
- },
- { text: currentText.text },
- )
- currentText.text = textOutput.text
- currentText.time = {
- start: Date.now(),
- end: Date.now(),
- }
- if (value.providerMetadata) currentText.metadata = value.providerMetadata
- await Session.updatePart(currentText)
- }
- currentText = undefined
- break
-
- case "finish":
- break
-
- default:
- log.info("unhandled", {
- ...value,
- })
- continue
+ type StreamEvent = Event
+
+ export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionProcessor") {}
+
+ export const layer: Layer.Layer<
+ Service,
+ never,
+ | Session.Service
+ | Config.Service
+ | Bus.Service
+ | Snapshot.Service
+ | Agent.Service
+ | LLM.Service
+ | Permission.Service
+ | Plugin.Service
+ | SessionStatus.Service
+ > = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const session = yield* Session.Service
+ const config = yield* Config.Service
+ const bus = yield* Bus.Service
+ const snapshot = yield* Snapshot.Service
+ const agents = yield* Agent.Service
+ const llm = yield* LLM.Service
+ const permission = yield* Permission.Service
+ const plugin = yield* Plugin.Service
+ const status = yield* SessionStatus.Service
+
+ const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
+ const ctx: ProcessorContext = {
+ assistantMessage: input.assistantMessage,
+ sessionID: input.sessionID,
+ model: input.model,
+ abort: input.abort,
+ toolcalls: {},
+ shouldBreak: false,
+ snapshot: undefined,
+ blocked: false,
+ needsCompaction: false,
+ currentText: undefined,
+ reasoningMap: {},
+ }
+
+ const parse = (e: unknown) =>
+ MessageV2.fromError(e, {
+ providerID: input.model.providerID,
+ aborted: input.abort.aborted,
+ })
+
+ const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
+ switch (value.type) {
+ case "start":
+ yield* status.set(ctx.sessionID, { type: "busy" })
+ return
+
+ case "reasoning-start":
+ if (value.id in ctx.reasoningMap) return
+ ctx.reasoningMap[value.id] = {
+ id: PartID.ascending(),
+ messageID: ctx.assistantMessage.id,
+ sessionID: ctx.assistantMessage.sessionID,
+ type: "reasoning",
+ text: "",
+ time: { start: Date.now() },
+ metadata: value.providerMetadata,
+ }
+ yield* session.updatePart(ctx.reasoningMap[value.id])
+ return
+
+ case "reasoning-delta":
+ if (!(value.id in ctx.reasoningMap)) return
+ ctx.reasoningMap[value.id].text += value.text
+ if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
+ yield* session.updatePartDelta({
+ sessionID: ctx.reasoningMap[value.id].sessionID,
+ messageID: ctx.reasoningMap[value.id].messageID,
+ partID: ctx.reasoningMap[value.id].id,
+ field: "text",
+ delta: value.text,
+ })
+ return
+
+ case "reasoning-end":
+ if (!(value.id in ctx.reasoningMap)) return
+ ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text.trimEnd()
+ ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
+ if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
+ yield* session.updatePart(ctx.reasoningMap[value.id])
+ delete ctx.reasoningMap[value.id]
+ return
+
+ case "tool-input-start":
+ ctx.toolcalls[value.id] = (yield* session.updatePart({
+ id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(),
+ messageID: ctx.assistantMessage.id,
+ sessionID: ctx.assistantMessage.sessionID,
+ type: "tool",
+ tool: value.toolName,
+ callID: value.id,
+ state: { status: "pending", input: {}, raw: "" },
+ })) as MessageV2.ToolPart
+ return
+
+ case "tool-input-delta":
+ return
+
+ case "tool-input-end":
+ return
+
+ case "tool-call": {
+ const match = ctx.toolcalls[value.toolCallId]
+ if (!match) return
+ ctx.toolcalls[value.toolCallId] = (yield* session.updatePart({
+ ...match,
+ tool: value.toolName,
+ state: { status: "running", input: value.input, time: { start: Date.now() } },
+ metadata: value.providerMetadata,
+ })) as MessageV2.ToolPart
+
+ const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
+ const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
+
+ if (
+ recentParts.length !== DOOM_LOOP_THRESHOLD ||
+ !recentParts.every(
+ (part) =>
+ part.type === "tool" &&
+ part.tool === value.toolName &&
+ part.state.status !== "pending" &&
+ JSON.stringify(part.state.input) === JSON.stringify(value.input),
+ )
+ ) {
+ return
}
- if (needsCompaction) break
+
+ const agent = yield* agents.get(ctx.assistantMessage.agent)
+ yield* permission.ask({
+ permission: "doom_loop",
+ patterns: [value.toolName],
+ sessionID: ctx.assistantMessage.sessionID,
+ metadata: { tool: value.toolName, input: value.input },
+ always: [value.toolName],
+ ruleset: agent.permission,
+ })
+ return
}
- } catch (e: any) {
- log.error("process", {
- error: e,
- stack: JSON.stringify(e.stack),
- })
- const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted })
- if (MessageV2.ContextOverflowError.isInstance(error)) {
- needsCompaction = true
- Bus.publish(Session.Event.Error, {
- sessionID: input.sessionID,
- error,
+
+ case "tool-result": {
+ const match = ctx.toolcalls[value.toolCallId]
+ if (!match || match.state.status !== "running") return
+ yield* session.updatePart({
+ ...match,
+ state: {
+ status: "completed",
+ input: value.input ?? match.state.input,
+ output: value.output.output,
+ metadata: value.output.metadata,
+ title: value.output.title,
+ time: { start: match.state.time.start, end: Date.now() },
+ attachments: value.output.attachments,
+ },
})
- } else {
- const retry = SessionRetry.retryable(error)
- if (retry !== undefined) {
- attempt++
- const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
- await SessionStatus.set(input.sessionID, {
- type: "retry",
- attempt,
- message: retry,
- next: Date.now() + delay,
- })
- await SessionRetry.sleep(delay, input.abort).catch(() => {})
- continue
+ delete ctx.toolcalls[value.toolCallId]
+ return
+ }
+
+ case "tool-error": {
+ const match = ctx.toolcalls[value.toolCallId]
+ if (!match || match.state.status !== "running") return
+ yield* session.updatePart({
+ ...match,
+ state: {
+ status: "error",
+ input: value.input ?? match.state.input,
+ error: value.error instanceof Error ? value.error.message : String(value.error),
+ time: { start: match.state.time.start, end: Date.now() },
+ },
+ })
+ if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) {
+ ctx.blocked = ctx.shouldBreak
}
- input.assistantMessage.error = error
- Bus.publish(Session.Event.Error, {
- sessionID: input.assistantMessage.sessionID,
- error: input.assistantMessage.error,
+ delete ctx.toolcalls[value.toolCallId]
+ return
+ }
+
+ case "error":
+ throw value.error
+
+ case "start-step":
+ ctx.snapshot = yield* snapshot.track()
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID: ctx.assistantMessage.id,
+ sessionID: ctx.sessionID,
+ snapshot: ctx.snapshot,
+ type: "step-start",
+ })
+ return
+
+ case "finish-step": {
+ const usage = Session.getUsage({
+ model: ctx.model,
+ usage: value.usage,
+ metadata: value.providerMetadata,
})
- await SessionStatus.set(input.sessionID, { type: "idle" })
+ ctx.assistantMessage.finish = value.finishReason
+ ctx.assistantMessage.cost += usage.cost
+ ctx.assistantMessage.tokens = usage.tokens
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ reason: value.finishReason,
+ snapshot: yield* snapshot.track(),
+ messageID: ctx.assistantMessage.id,
+ sessionID: ctx.assistantMessage.sessionID,
+ type: "step-finish",
+ tokens: usage.tokens,
+ cost: usage.cost,
+ })
+ yield* session.updateMessage(ctx.assistantMessage)
+ if (ctx.snapshot) {
+ const patch = yield* snapshot.patch(ctx.snapshot)
+ if (patch.files.length) {
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID: ctx.assistantMessage.id,
+ sessionID: ctx.sessionID,
+ type: "patch",
+ hash: patch.hash,
+ files: patch.files,
+ })
+ }
+ ctx.snapshot = undefined
+ }
+ yield* Effect.promise(() =>
+ SessionSummary.summarize({
+ sessionID: ctx.sessionID,
+ messageID: ctx.assistantMessage.parentID,
+ }),
+ ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach)
+ if (
+ !ctx.assistantMessage.summary &&
+ isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
+ ) {
+ ctx.needsCompaction = true
+ }
+ return
}
+
+ case "text-start":
+ ctx.currentText = {
+ id: PartID.ascending(),
+ messageID: ctx.assistantMessage.id,
+ sessionID: ctx.assistantMessage.sessionID,
+ type: "text",
+ text: "",
+ time: { start: Date.now() },
+ metadata: value.providerMetadata,
+ }
+ yield* session.updatePart(ctx.currentText)
+ return
+
+ case "text-delta":
+ if (!ctx.currentText) return
+ ctx.currentText.text += value.text
+ if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
+ yield* session.updatePartDelta({
+ sessionID: ctx.currentText.sessionID,
+ messageID: ctx.currentText.messageID,
+ partID: ctx.currentText.id,
+ field: "text",
+ delta: value.text,
+ })
+ return
+
+ case "text-end":
+ if (!ctx.currentText) return
+ ctx.currentText.text = ctx.currentText.text.trimEnd()
+ ctx.currentText.text = (yield* plugin.trigger(
+ "experimental.text.complete",
+ {
+ sessionID: ctx.sessionID,
+ messageID: ctx.assistantMessage.id,
+ partID: ctx.currentText.id,
+ },
+ { text: ctx.currentText.text },
+ )).text
+ ctx.currentText.time = { start: Date.now(), end: Date.now() }
+ if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
+ yield* session.updatePart(ctx.currentText)
+ ctx.currentText = undefined
+ return
+
+ case "finish":
+ return
+
+ default:
+ log.info("unhandled", { ...value })
+ return
}
- if (snapshot) {
- const patch = await Snapshot.patch(snapshot)
+ })
+
+ const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () {
+ if (ctx.snapshot) {
+ const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
- await Session.updatePart({
+ yield* session.updatePart({
id: PartID.ascending(),
- messageID: input.assistantMessage.id,
- sessionID: input.sessionID,
+ messageID: ctx.assistantMessage.id,
+ sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
- snapshot = undefined
+ ctx.snapshot = undefined
}
- const p = await MessageV2.parts(input.assistantMessage.id)
- for (const part of p) {
- if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
- await Session.updatePart({
- ...part,
- state: {
- ...part.state,
- status: "error",
- error: "Tool execution aborted",
- time: {
- start: Date.now(),
- end: Date.now(),
- },
- },
- })
- }
+
+ if (ctx.currentText) {
+ const end = Date.now()
+ ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
+ yield* session.updatePart(ctx.currentText)
+ ctx.currentText = undefined
+ }
+
+ for (const part of Object.values(ctx.reasoningMap)) {
+ const end = Date.now()
+ yield* session.updatePart({
+ ...part,
+ time: { start: part.time.start ?? end, end },
+ })
}
- input.assistantMessage.time.completed = Date.now()
- await Session.updateMessage(input.assistantMessage)
- if (needsCompaction) return "compact"
- if (blocked) return "stop"
- if (input.assistantMessage.error) return "stop"
+ ctx.reasoningMap = {}
+
+ const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
+ for (const part of parts) {
+ if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
+ yield* session.updatePart({
+ ...part,
+ state: {
+ ...part.state,
+ status: "error",
+ error: "Tool execution aborted",
+ time: { start: Date.now(), end: Date.now() },
+ },
+ })
+ }
+ ctx.assistantMessage.time.completed = Date.now()
+ yield* session.updateMessage(ctx.assistantMessage)
+ })
+
+ const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
+ log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) })
+ const error = parse(e)
+ if (MessageV2.ContextOverflowError.isInstance(error)) {
+ ctx.needsCompaction = true
+ yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
+ return
+ }
+ ctx.assistantMessage.error = error
+ yield* bus.publish(Session.Event.Error, {
+ sessionID: ctx.assistantMessage.sessionID,
+ error: ctx.assistantMessage.error,
+ })
+ yield* status.set(ctx.sessionID, { type: "idle" })
+ })
+
+ const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
+ log.info("process")
+ ctx.needsCompaction = false
+ ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
+
+ yield* Effect.gen(function* () {
+ ctx.currentText = undefined
+ ctx.reasoningMap = {}
+ const stream = llm.stream(streamInput)
+
+ yield* stream.pipe(
+ Stream.tap((event) =>
+ Effect.gen(function* () {
+ input.abort.throwIfAborted()
+ yield* handleEvent(event)
+ }),
+ ),
+ Stream.takeUntil(() => ctx.needsCompaction),
+ Stream.runDrain,
+ )
+ }).pipe(
+ Effect.catchCauseIf(
+ (cause) => !Cause.hasInterruptsOnly(cause),
+ (cause) => Effect.fail(Cause.squash(cause)),
+ ),
+ Effect.retry(
+ SessionRetry.policy({
+ parse,
+ set: (info) =>
+ status.set(ctx.sessionID, {
+ type: "retry",
+ attempt: info.attempt,
+ message: info.message,
+ next: info.next,
+ }),
+ }),
+ ),
+ Effect.catchCause((cause) =>
+ Cause.hasInterruptsOnly(cause)
+ ? halt(new DOMException("Aborted", "AbortError"))
+ : halt(Cause.squash(cause)),
+ ),
+ Effect.ensuring(cleanup()),
+ )
+
+ if (input.abort.aborted && !ctx.assistantMessage.error) {
+ yield* abort()
+ }
+ if (ctx.needsCompaction) return "compact"
+ if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop"
return "continue"
+ })
+
+ const abort = Effect.fn("SessionProcessor.abort")(() =>
+ Effect.gen(function* () {
+ if (!ctx.assistantMessage.error) {
+ yield* halt(new DOMException("Aborted", "AbortError"))
+ }
+ if (!ctx.assistantMessage.time.completed) {
+ yield* cleanup()
+ return
+ }
+ yield* session.updateMessage(ctx.assistantMessage)
+ }),
+ )
+
+ return {
+ get message() {
+ return ctx.assistantMessage
+ },
+ partFromToolCall(toolCallID: string) {
+ return ctx.toolcalls[toolCallID]
+ },
+ abort,
+ process,
+ } satisfies Handle
+ })
+
+ return Service.of({ create })
+ }),
+ )
+
+ export const defaultLayer = Layer.unwrap(
+ Effect.sync(() =>
+ layer.pipe(
+ Layer.provide(Session.defaultLayer),
+ Layer.provide(Snapshot.defaultLayer),
+ Layer.provide(Agent.defaultLayer),
+ Layer.provide(LLM.defaultLayer),
+ Layer.provide(Permission.layer),
+ Layer.provide(Plugin.defaultLayer),
+ Layer.provide(SessionStatus.layer.pipe(Layer.provide(Bus.layer))),
+ Layer.provide(Bus.layer),
+ Layer.provide(Config.defaultLayer),
+ ),
+ ),
+ )
+
+ const { runPromise } = makeRuntime(Service, defaultLayer)
+
+ export async function create(input: Input): Promise<Info> {
+ const hit = await runPromise((svc) => svc.create(input))
+ return {
+ get message() {
+ return hit.message
+ },
+ partFromToolCall(toolCallID: string) {
+ return hit.partFromToolCall(toolCallID)
+ },
+ async process(streamInput: LLM.StreamInput) {
+ const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort })
+ if (Exit.isFailure(exit)) {
+ if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) {
+ await Effect.runPromise(hit.abort())
+ return "stop"
+ }
+ throw Cause.squash(exit.cause)
}
+ return exit.value
},
}
- return result
}
}
diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index dd74b83f5..acc9f6359 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -594,7 +594,7 @@ export namespace SessionPrompt {
session,
})
- const processor = SessionProcessor.create({
+ const processor = await SessionProcessor.create({
assistantMessage: (await Session.updateMessage({
id: MessageID.ascending(),
parentID: lastUser.id,
diff --git a/packages/opencode/src/session/retry.ts b/packages/opencode/src/session/retry.ts
index 6d057f539..8ba48375b 100644
--- a/packages/opencode/src/session/retry.ts
+++ b/packages/opencode/src/session/retry.ts
@@ -1,28 +1,18 @@
import type { NamedError } from "@opencode-ai/util/error"
+import { Cause, Clock, Duration, Effect, Schedule } from "effect"
import { MessageV2 } from "./message-v2"
import { iife } from "@/util/iife"
export namespace SessionRetry {
+ export type Err = ReturnType<NamedError["toObject"]>
+
export const RETRY_INITIAL_DELAY = 2000
export const RETRY_BACKOFF_FACTOR = 2
export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds
export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout
- export async function sleep(ms: number, signal: AbortSignal): Promise<void> {
- return new Promise((resolve, reject) => {
- const abortHandler = () => {
- clearTimeout(timeout)
- reject(new DOMException("Aborted", "AbortError"))
- }
- const timeout = setTimeout(
- () => {
- signal.removeEventListener("abort", abortHandler)
- resolve()
- },
- Math.min(ms, RETRY_MAX_DELAY),
- )
- signal.addEventListener("abort", abortHandler, { once: true })
- })
+ function cap(ms: number) {
+ return Math.min(ms, RETRY_MAX_DELAY)
}
export function delay(attempt: number, error?: MessageV2.APIError) {
@@ -33,7 +23,7 @@ export namespace SessionRetry {
if (retryAfterMs) {
const parsedMs = Number.parseFloat(retryAfterMs)
if (!Number.isNaN(parsedMs)) {
- return parsedMs
+ return cap(parsedMs)
}
}
@@ -42,23 +32,23 @@ export namespace SessionRetry {
const parsedSeconds = Number.parseFloat(retryAfter)
if (!Number.isNaN(parsedSeconds)) {
// convert seconds to milliseconds
- return Math.ceil(parsedSeconds * 1000)
+ return cap(Math.ceil(parsedSeconds * 1000))
}
// Try parsing as HTTP date format
const parsed = Date.parse(retryAfter) - Date.now()
if (!Number.isNaN(parsed) && parsed > 0) {
- return Math.ceil(parsed)
+ return cap(Math.ceil(parsed))
}
}
- return RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1)
+ return cap(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1))
}
}
- return Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS)
+ return cap(Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS))
}
- export function retryable(error: ReturnType<NamedError["toObject"]>) {
+ export function retryable(error: Err) {
// context overflow errors should not be retried
if (MessageV2.ContextOverflowError.isInstance(error)) return undefined
if (MessageV2.APIError.isInstance(error)) {
@@ -80,22 +70,37 @@ export namespace SessionRetry {
return undefined
}
})
- try {
- if (!json || typeof json !== "object") return undefined
- const code = typeof json.code === "string" ? json.code : ""
+ if (!json || typeof json !== "object") return undefined
+ const code = typeof json.code === "string" ? json.code : ""
- if (json.type === "error" && json.error?.type === "too_many_requests") {
- return "Too Many Requests"
- }
- if (code.includes("exhausted") || code.includes("unavailable")) {
- return "Provider is overloaded"
- }
- if (json.type === "error" && json.error?.code?.includes("rate_limit")) {
- return "Rate Limited"
- }
- return JSON.stringify(json)
- } catch {
- return undefined
+ if (json.type === "error" && json.error?.type === "too_many_requests") {
+ return "Too Many Requests"
+ }
+ if (code.includes("exhausted") || code.includes("unavailable")) {
+ return "Provider is overloaded"
+ }
+ if (json.type === "error" && typeof json.error?.code === "string" && json.error.code.includes("rate_limit")) {
+ return "Rate Limited"
}
+ return undefined
+ }
+
+ export function policy(opts: {
+ parse: (error: unknown) => Err
+ set: (input: { attempt: number; message: string; next: number }) => Effect.Effect<void>
+ }) {
+ return Schedule.fromStepWithMetadata(
+ Effect.succeed((meta: Schedule.InputMetadata<unknown>) => {
+ const error = opts.parse(meta.input)
+ const message = retryable(error)
+ if (!message) return Cause.done(meta.attempt)
+ return Effect.gen(function* () {
+ const wait = delay(meta.attempt, MessageV2.APIError.isInstance(error) ? error : undefined)
+ const now = yield* Clock.currentTimeMillis
+ yield* opts.set({ attempt: meta.attempt, message, next: now + wait })
+ return [meta.attempt, Duration.millis(wait)] as [number, Duration.Duration]
+ })
+ }),
+ )
}
}
diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts
index 9d5f7eeb8..9c8559c35 100644
--- a/packages/opencode/test/session/compaction.test.ts
+++ b/packages/opencode/test/session/compaction.test.ts
@@ -1,18 +1,28 @@
import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"
+import { APICallError } from "ai"
+import { Cause, Effect, Exit, Layer, ManagedRuntime } from "effect"
+import * as Stream from "effect/Stream"
import path from "path"
import { Bus } from "../../src/bus"
+import { Config } from "../../src/config/config"
+import { Agent } from "../../src/agent/agent"
+import { LLM } from "../../src/session/llm"
import { SessionCompaction } from "../../src/session/compaction"
import { Token } from "../../src/util/token"
import { Instance } from "../../src/project/instance"
import { Log } from "../../src/util/log"
+import { Permission } from "../../src/permission"
+import { Plugin } from "../../src/plugin"
import { tmpdir } from "../fixture/fixture"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
+import { SessionStatus } from "../../src/session/status"
import { ModelID, ProviderID } from "../../src/provider/schema"
import type { Provider } from "../../src/provider/provider"
import * as ProviderModule from "../../src/provider/provider"
import * as SessionProcessorModule from "../../src/session/processor"
+import { Snapshot } from "../../src/snapshot"
Log.init({ print: false })
@@ -121,12 +131,13 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou
function fake(
input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0],
result: "continue" | "compact",
-): ReturnType<(typeof SessionProcessorModule.SessionProcessor)["create"]> {
+) {
const msg = input.assistantMessage
return {
get message() {
return msg
},
+ abort: Effect.fn("TestSessionProcessor.abort")(() => Effect.void),
partFromToolCall() {
return {
id: PartID.ascending(),
@@ -138,10 +149,74 @@ function fake(
state: { status: "pending", input: {}, raw: "" },
}
},
- process: async () => result,
+ process: Effect.fn("TestSessionProcessor.process")(() => Effect.succeed(result)),
+ } satisfies SessionProcessorModule.SessionProcessor.Handle
+}
+
+function layer(result: "continue" | "compact") {
+ return Layer.succeed(
+ SessionProcessorModule.SessionProcessor.Service,
+ SessionProcessorModule.SessionProcessor.Service.of({
+ create: Effect.fn("TestSessionProcessor.create")((input) => Effect.succeed(fake(input, result))),
+ }),
+ )
+}
+
+function runtime(result: "continue" | "compact", plugin = Plugin.defaultLayer) {
+ const bus = Bus.layer
+ return ManagedRuntime.make(
+ Layer.mergeAll(SessionCompaction.layer, bus).pipe(
+ Layer.provide(Session.defaultLayer),
+ Layer.provide(layer(result)),
+ Layer.provide(Agent.defaultLayer),
+ Layer.provide(plugin),
+ Layer.provide(bus),
+ Layer.provide(Config.defaultLayer),
+ ),
+ )
+}
+
+function llm() {
+ const queue: Array<
+ Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
+ > = []
+
+ return {
+ push(stream: Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)) {
+ queue.push(stream)
+ },
+ layer: Layer.succeed(
+ LLM.Service,
+ LLM.Service.of({
+ stream: (input) => {
+ const item = queue.shift() ?? Stream.empty
+ const stream = typeof item === "function" ? item(input) : item
+ return stream.pipe(Stream.mapEffect((event) => Effect.succeed(event)))
+ },
+ }),
+ ),
}
}
+function liveRuntime(layer: Layer.Layer<LLM.Service>) {
+ const bus = Bus.layer
+ const status = SessionStatus.layer.pipe(Layer.provide(bus))
+ const processor = SessionProcessorModule.SessionProcessor.layer
+ return ManagedRuntime.make(
+ Layer.mergeAll(SessionCompaction.layer.pipe(Layer.provide(processor)), processor, bus, status).pipe(
+ Layer.provide(Session.defaultLayer),
+ Layer.provide(Snapshot.defaultLayer),
+ Layer.provide(layer),
+ Layer.provide(Permission.layer),
+ Layer.provide(Agent.defaultLayer),
+ Layer.provide(Plugin.defaultLayer),
+ Layer.provide(status),
+ Layer.provide(bus),
+ Layer.provide(Config.defaultLayer),
+ ),
+ )
+}
+
function wait(ms = 50) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
@@ -154,6 +229,17 @@ function defer() {
return { promise, resolve }
}
+function plugin(ready: ReturnType<typeof defer>) {
+ return Layer.mock(Plugin.Service)({
+ trigger: <Name extends string, Input, Output>(name: Name, _input: Input, output: Output) => {
+ if (name !== "experimental.session.compacting") return Effect.succeed(output)
+ return Effect.sync(() => ready.resolve()).pipe(Effect.andThen(Effect.never), Effect.as(output))
+ },
+ list: () => Effect.succeed([]),
+ init: () => Effect.void,
+ })
+}
+
describe("session.compaction.isOverflow", () => {
test("returns true when token count exceeds usable context", async () => {
await using tmp = await tmpdir()
@@ -429,37 +515,49 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
- spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
const msg = await user(session.id, "hello")
const msgs = await Session.messages({ sessionID: session.id })
const done = defer()
let seen = false
- const unsub = Bus.subscribe(SessionCompaction.Event.Compacted, (evt) => {
- if (evt.properties.sessionID !== session.id) return
- seen = true
- done.resolve()
- })
-
- const result = await SessionCompaction.process({
- parentID: msg.id,
- messages: msgs,
- sessionID: session.id,
- abort: new AbortController().signal,
- auto: false,
- })
-
- await Promise.race([
- done.promise,
- wait(500).then(() => {
- throw new Error("timed out waiting for compacted event")
- }),
- ])
- unsub()
-
- expect(result).toBe("continue")
- expect(seen).toBe(true)
+ const rt = runtime("continue")
+ let unsub: (() => void) | undefined
+ try {
+ unsub = await rt.runPromise(
+ Bus.Service.use((svc) =>
+ svc.subscribeCallback(SessionCompaction.Event.Compacted, (evt) => {
+ if (evt.properties.sessionID !== session.id) return
+ seen = true
+ done.resolve()
+ }),
+ ),
+ )
+
+ const result = await rt.runPromise(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ abort: new AbortController().signal,
+ auto: false,
+ }),
+ ),
+ )
+
+ await Promise.race([
+ done.promise,
+ wait(500).then(() => {
+ throw new Error("timed out waiting for compacted event")
+ }),
+ ])
+ expect(result).toBe("continue")
+ expect(seen).toBe(true)
+ } finally {
+ unsub?.()
+ await rt.dispose()
+ }
},
})
})
@@ -470,27 +568,36 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
- spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "compact"))
const session = await Session.create({})
const msg = await user(session.id, "hello")
- const result = await SessionCompaction.process({
- parentID: msg.id,
- messages: await Session.messages({ sessionID: session.id }),
- sessionID: session.id,
- abort: new AbortController().signal,
- auto: false,
- })
-
- const summary = (await Session.messages({ sessionID: session.id })).find(
- (msg) => msg.info.role === "assistant" && msg.info.summary,
- )
-
- expect(result).toBe("stop")
- expect(summary?.info.role).toBe("assistant")
- if (summary?.info.role === "assistant") {
- expect(summary.info.finish).toBe("error")
- expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact")
+ const rt = runtime("compact")
+ try {
+ const msgs = await Session.messages({ sessionID: session.id })
+ const result = await rt.runPromise(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ abort: new AbortController().signal,
+ auto: false,
+ }),
+ ),
+ )
+
+ const summary = (await Session.messages({ sessionID: session.id })).find(
+ (msg) => msg.info.role === "assistant" && msg.info.summary,
+ )
+
+ expect(result).toBe("stop")
+ expect(summary?.info.role).toBe("assistant")
+ if (summary?.info.role === "assistant") {
+ expect(summary.info.finish).toBe("error")
+ expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact")
+ }
+ } finally {
+ await rt.dispose()
}
},
})
@@ -502,30 +609,38 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
- spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
const msg = await user(session.id, "hello")
-
- const result = await SessionCompaction.process({
- parentID: msg.id,
- messages: await Session.messages({ sessionID: session.id }),
- sessionID: session.id,
- abort: new AbortController().signal,
- auto: true,
- })
-
- const msgs = await Session.messages({ sessionID: session.id })
- const last = msgs.at(-1)
-
- expect(result).toBe("continue")
- expect(last?.info.role).toBe("user")
- expect(last?.parts[0]).toMatchObject({
- type: "text",
- synthetic: true,
- })
- if (last?.parts[0]?.type === "text") {
- expect(last.parts[0].text).toContain("Continue if you have next steps")
+ const rt = runtime("continue")
+ try {
+ const msgs = await Session.messages({ sessionID: session.id })
+ const result = await rt.runPromise(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ abort: new AbortController().signal,
+ auto: true,
+ }),
+ ),
+ )
+
+ const all = await Session.messages({ sessionID: session.id })
+ const last = all.at(-1)
+
+ expect(result).toBe("continue")
+ expect(last?.info.role).toBe("user")
+ expect(last?.parts[0]).toMatchObject({
+ type: "text",
+ synthetic: true,
+ })
+ if (last?.parts[0]?.type === "text") {
+ expect(last.parts[0].text).toContain("Continue if you have next steps")
+ }
+ } finally {
+ await rt.dispose()
}
},
})
@@ -537,7 +652,6 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
- spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
await user(session.id, "root")
@@ -552,24 +666,33 @@ describe("session.compaction.process", () => {
url: "https://example.com/cat.png",
})
const msg = await user(session.id, "current")
-
- const result = await SessionCompaction.process({
- parentID: msg.id,
- messages: await Session.messages({ sessionID: session.id }),
- sessionID: session.id,
- abort: new AbortController().signal,
- auto: true,
- overflow: true,
- })
-
- const last = (await Session.messages({ sessionID: session.id })).at(-1)
-
- expect(result).toBe("continue")
- expect(last?.info.role).toBe("user")
- expect(last?.parts.some((part) => part.type === "file")).toBe(false)
- expect(
- last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")),
- ).toBe(true)
+ const rt = runtime("continue")
+ try {
+ const msgs = await Session.messages({ sessionID: session.id })
+ const result = await rt.runPromise(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ abort: new AbortController().signal,
+ auto: true,
+ overflow: true,
+ }),
+ ),
+ )
+
+ const last = (await Session.messages({ sessionID: session.id })).at(-1)
+
+ expect(result).toBe("continue")
+ expect(last?.info.role).toBe("user")
+ expect(last?.parts.some((part) => part.type === "file")).toBe(false)
+ expect(
+ last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")),
+ ).toBe(true)
+ } finally {
+ await rt.dispose()
+ }
},
})
})
@@ -580,27 +703,191 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
- spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
await user(session.id, "earlier")
const msg = await user(session.id, "current")
- const result = await SessionCompaction.process({
- parentID: msg.id,
- messages: await Session.messages({ sessionID: session.id }),
- sessionID: session.id,
- abort: new AbortController().signal,
- auto: true,
- overflow: true,
- })
+ const rt = runtime("continue")
+ try {
+ const msgs = await Session.messages({ sessionID: session.id })
+ const result = await rt.runPromise(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ abort: new AbortController().signal,
+ auto: true,
+ overflow: true,
+ }),
+ ),
+ )
+
+ const last = (await Session.messages({ sessionID: session.id })).at(-1)
+
+ expect(result).toBe("continue")
+ expect(last?.info.role).toBe("user")
+ if (last?.parts[0]?.type === "text") {
+ expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit")
+ }
+ } finally {
+ await rt.dispose()
+ }
+ },
+ })
+ })
+
+ test("stops quickly when aborted during retry backoff", async () => {
+ const stub = llm()
+ const ready = defer()
+ stub.push(
+ Stream.fromAsyncIterable(
+ {
+ async *[Symbol.asyncIterator]() {
+ yield { type: "start" } as LLM.Event
+ throw new APICallError({
+ message: "boom",
+ url: "https://example.com/v1/chat/completions",
+ requestBodyValues: {},
+ statusCode: 503,
+ responseHeaders: { "retry-after-ms": "10000" },
+ responseBody: '{"error":"boom"}',
+ isRetryable: true,
+ })
+ },
+ },
+ (err) => err,
+ ),
+ )
+
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
+
+ const session = await Session.create({})
+ const msg = await user(session.id, "hello")
+ const msgs = await Session.messages({ sessionID: session.id })
+ const abort = new AbortController()
+ const rt = liveRuntime(stub.layer)
+ let off: (() => void) | undefined
+ let run: Promise<"continue" | "stop"> | undefined
+ try {
+ off = await rt.runPromise(
+ Bus.Service.use((svc) =>
+ svc.subscribeCallback(SessionStatus.Event.Status, (evt) => {
+ if (evt.properties.sessionID !== session.id) return
+ if (evt.properties.status.type !== "retry") return
+ ready.resolve()
+ }),
+ ),
+ )
+
+ run = rt
+ .runPromiseExit(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ abort: abort.signal,
+ auto: false,
+ }),
+ ),
+ { signal: abort.signal },
+ )
+ .then((exit) => {
+ if (Exit.isFailure(exit)) {
+ if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
+ throw Cause.squash(exit.cause)
+ }
+ return exit.value
+ })
+
+ await Promise.race([
+ ready.promise,
+ wait(1000).then(() => {
+ throw new Error("timed out waiting for retry status")
+ }),
+ ])
+
+ const start = Date.now()
+ abort.abort()
+ const result = await Promise.race([
+ run.then((value) => ({ kind: "done" as const, value, ms: Date.now() - start })),
+ wait(250).then(() => ({ kind: "timeout" as const })),
+ ])
+
+ expect(result.kind).toBe("done")
+ if (result.kind === "done") {
+ expect(result.value).toBe("stop")
+ expect(result.ms).toBeLessThan(250)
+ }
+ } finally {
+ off?.()
+ abort.abort()
+ await rt.dispose()
+ await run?.catch(() => undefined)
+ }
+ },
+ })
+ })
+
+ test("does not leave a summary assistant when aborted before processor setup", async () => {
+ const ready = defer()
- const last = (await Session.messages({ sessionID: session.id })).at(-1)
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
- expect(result).toBe("continue")
- expect(last?.info.role).toBe("user")
- if (last?.parts[0]?.type === "text") {
- expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit")
+ const session = await Session.create({})
+ const msg = await user(session.id, "hello")
+ const msgs = await Session.messages({ sessionID: session.id })
+ const abort = new AbortController()
+ const rt = runtime("continue", plugin(ready))
+ let run: Promise<"continue" | "stop"> | undefined
+ try {
+ run = rt
+ .runPromiseExit(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ abort: abort.signal,
+ auto: false,
+ }),
+ ),
+ { signal: abort.signal },
+ )
+ .then((exit) => {
+ if (Exit.isFailure(exit)) {
+ if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
+ throw Cause.squash(exit.cause)
+ }
+ return exit.value
+ })
+
+ await Promise.race([
+ ready.promise,
+ wait(1000).then(() => {
+ throw new Error("timed out waiting for compaction hook")
+ }),
+ ])
+
+ abort.abort()
+ expect(await run).toBe("stop")
+
+ const all = await Session.messages({ sessionID: session.id })
+ expect(all.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(false)
+ } finally {
+ abort.abort()
+ await rt.dispose()
+ await run?.catch(() => undefined)
}
},
})
diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts
new file mode 100644
index 000000000..cd9d97e15
--- /dev/null
+++ b/packages/opencode/test/session/processor-effect.test.ts
@@ -0,0 +1,838 @@
+import { NodeFileSystem } from "@effect/platform-node"
+import { expect } from "bun:test"
+import { APICallError } from "ai"
+import { Effect, Layer, ServiceMap } from "effect"
+import * as Stream from "effect/Stream"
+import path from "path"
+import type { Agent } from "../../src/agent/agent"
+import { Agent as AgentSvc } from "../../src/agent/agent"
+import { Bus } from "../../src/bus"
+import { Config } from "../../src/config/config"
+import { Permission } from "../../src/permission"
+import { Plugin } from "../../src/plugin"
+import { Instance } from "../../src/project/instance"
+import type { Provider } from "../../src/provider/provider"
+import { ModelID, ProviderID } from "../../src/provider/schema"
+import { Session } from "../../src/session"
+import { LLM } from "../../src/session/llm"
+import { MessageV2 } from "../../src/session/message-v2"
+import { SessionProcessor } from "../../src/session/processor"
+import { MessageID, PartID, SessionID } from "../../src/session/schema"
+import { SessionStatus } from "../../src/session/status"
+import { Snapshot } from "../../src/snapshot"
+import { Log } from "../../src/util/log"
+import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
+import { provideTmpdirInstance } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
+
+Log.init({ print: false })
+
+const ref = {
+ providerID: ProviderID.make("test"),
+ modelID: ModelID.make("test-model"),
+}
+
+type Script = Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
+
+class TestLLM extends ServiceMap.Service<
+ TestLLM,
+ {
+ readonly push: (stream: Script) => Effect.Effect<void>
+ readonly reply: (...items: LLM.Event[]) => Effect.Effect<void>
+ readonly calls: Effect.Effect<number>
+ readonly inputs: Effect.Effect<LLM.StreamInput[]>
+ }
+>()("@test/SessionProcessorLLM") {}
+
+function stream(...items: LLM.Event[]) {
+ return Stream.make(...items)
+}
+
+function usage(input = 1, output = 1, total = input + output) {
+ return {
+ inputTokens: input,
+ outputTokens: output,
+ totalTokens: total,
+ inputTokenDetails: {
+ noCacheTokens: undefined,
+ cacheReadTokens: undefined,
+ cacheWriteTokens: undefined,
+ },
+ outputTokenDetails: {
+ textTokens: undefined,
+ reasoningTokens: undefined,
+ },
+ }
+}
+
+function start(): LLM.Event {
+ return { type: "start" }
+}
+
+function textStart(id = "t"): LLM.Event {
+ return { type: "text-start", id }
+}
+
+function textDelta(id: string, text: string): LLM.Event {
+ return { type: "text-delta", id, text }
+}
+
+function textEnd(id = "t"): LLM.Event {
+ return { type: "text-end", id }
+}
+
+function reasoningStart(id: string): LLM.Event {
+ return { type: "reasoning-start", id }
+}
+
+function reasoningDelta(id: string, text: string): LLM.Event {
+ return { type: "reasoning-delta", id, text }
+}
+
+function reasoningEnd(id: string): LLM.Event {
+ return { type: "reasoning-end", id }
+}
+
+function finishStep(): LLM.Event {
+ return {
+ type: "finish-step",
+ finishReason: "stop",
+ rawFinishReason: "stop",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(),
+ }
+}
+
+function finish(): LLM.Event {
+ return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
+}
+
+function toolInputStart(id: string, toolName: string): LLM.Event {
+ return { type: "tool-input-start", id, toolName }
+}
+
+function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
+ return { type: "tool-call", toolCallId, toolName, input }
+}
+
+function fail<E>(err: E, ...items: LLM.Event[]) {
+ return stream(...items).pipe(Stream.concat(Stream.fail(err)))
+}
+
+function wait(abort: AbortSignal) {
+ return Effect.promise(
+ () =>
+ new Promise<void>((done) => {
+ abort.addEventListener("abort", () => done(), { once: true })
+ }),
+ )
+}
+
+function hang(input: LLM.StreamInput, ...items: LLM.Event[]) {
+ return stream(...items).pipe(
+ Stream.concat(
+ Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))),
+ ),
+ )
+}
+
+function model(context: number): Provider.Model {
+ return {
+ id: "test-model",
+ providerID: "test",
+ name: "Test",
+ limit: { context, output: 10 },
+ cost: { input: 0, output: 0, cache: { read: 0, write: 0 } },
+ capabilities: {
+ toolcall: true,
+ attachment: false,
+ reasoning: false,
+ temperature: true,
+ input: { text: true, image: false, audio: false, video: false },
+ output: { text: true, image: false, audio: false, video: false },
+ },
+ api: { npm: "@ai-sdk/anthropic" },
+ options: {},
+ } as Provider.Model
+}
+
+function agent(): Agent.Info {
+ return {
+ name: "build",
+ mode: "primary",
+ options: {},
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ }
+}
+
+function defer<T>() {
+ let resolve!: (value: T | PromiseLike<T>) => void
+ const promise = new Promise<T>((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+}
+
+const user = Effect.fn("TestSession.user")(function* (sessionID: SessionID, text: string) {
+ const session = yield* Session.Service
+ const msg = yield* session.updateMessage({
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID,
+ agent: "build",
+ model: ref,
+ time: { created: Date.now() },
+ })
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID: msg.id,
+ sessionID,
+ type: "text",
+ text,
+ })
+ return msg
+})
+
+const assistant = Effect.fn("TestSession.assistant")(function* (
+ sessionID: SessionID,
+ parentID: MessageID,
+ root: string,
+) {
+ const session = yield* Session.Service
+ const msg: MessageV2.Assistant = {
+ id: MessageID.ascending(),
+ role: "assistant",
+ sessionID,
+ mode: "build",
+ agent: "build",
+ path: { cwd: root, root },
+ cost: 0,
+ tokens: {
+ total: 0,
+ input: 0,
+ output: 0,
+ reasoning: 0,
+ cache: { read: 0, write: 0 },
+ },
+ modelID: ref.modelID,
+ providerID: ref.providerID,
+ parentID,
+ time: { created: Date.now() },
+ finish: "end_turn",
+ }
+ yield* session.updateMessage(msg)
+ return msg
+})
+
+const llm = Layer.unwrap(
+ Effect.gen(function* () {
+ const queue: Script[] = []
+ const inputs: LLM.StreamInput[] = []
+ let calls = 0
+
+ const push = Effect.fn("TestLLM.push")((item: Script) => {
+ queue.push(item)
+ return Effect.void
+ })
+
+ const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items)))
+ return Layer.mergeAll(
+ Layer.succeed(
+ LLM.Service,
+ LLM.Service.of({
+ stream: (input) => {
+ calls += 1
+ inputs.push(input)
+ const item = queue.shift() ?? Stream.empty
+ return typeof item === "function" ? item(input) : item
+ },
+ }),
+ ),
+ Layer.succeed(
+ TestLLM,
+ TestLLM.of({
+ push,
+ reply,
+ calls: Effect.sync(() => calls),
+ inputs: Effect.sync(() => [...inputs]),
+ }),
+ ),
+ )
+ }),
+)
+
+const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
+const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
+const deps = Layer.mergeAll(
+ Session.defaultLayer,
+ Snapshot.defaultLayer,
+ AgentSvc.defaultLayer,
+ Permission.layer,
+ Plugin.defaultLayer,
+ Config.defaultLayer,
+ status,
+ llm,
+).pipe(Layer.provideMerge(infra))
+const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
+
+const it = testEffect(env)
+
+it.effect("session.processor effect tests capture llm input cleanly", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+
+ yield* test.reply(start(), textStart(), textDelta("t", "hello"), textEnd(), finishStep(), finish())
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "hi")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const input = {
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "hi" }],
+ tools: {},
+ } satisfies LLM.StreamInput
+
+ const value = yield* handle.process(input)
+ const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
+ const calls = yield* test.calls
+ const inputs = yield* test.inputs
+
+ expect(value).toBe("continue")
+ expect(calls).toBe(1)
+ expect(inputs).toHaveLength(1)
+ expect(inputs[0].messages).toStrictEqual([{ role: "user", content: "hi" }])
+ expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true)
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests stop after token overflow requests compaction", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+
+ yield* test.reply(
+ start(),
+ {
+ type: "finish-step",
+ finishReason: "stop",
+ rawFinishReason: "stop",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(100, 0, 100),
+ },
+ textStart(),
+ textDelta("t", "after"),
+ textEnd(),
+ )
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "compact")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(20)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const value = yield* handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "compact" }],
+ tools: {},
+ })
+
+ const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
+
+ expect(value).toBe("compact")
+ expect(parts.some((part) => part.type === "text")).toBe(false)
+ expect(parts.some((part) => part.type === "step-finish")).toBe(true)
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests reset reasoning state across retries", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+
+ yield* test.push(
+ fail(
+ new APICallError({
+ message: "boom",
+ url: "https://example.com/v1/chat/completions",
+ requestBodyValues: {},
+ statusCode: 503,
+ responseHeaders: { "retry-after-ms": "0" },
+ responseBody: '{"error":"boom"}',
+ isRetryable: true,
+ }),
+ start(),
+ reasoningStart("r"),
+ reasoningDelta("r", "one"),
+ ),
+ )
+
+ yield* test.reply(
+ start(),
+ reasoningStart("r"),
+ reasoningDelta("r", "two"),
+ reasoningEnd("r"),
+ finishStep(),
+ finish(),
+ )
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "reason")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const value = yield* handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "reason" }],
+ tools: {},
+ })
+
+ const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
+ const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
+
+ expect(value).toBe("continue")
+ expect(yield* test.calls).toBe(2)
+ expect(reasoning.some((part) => part.text === "two")).toBe(true)
+ expect(reasoning.some((part) => part.text === "onetwo")).toBe(false)
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests do not retry unknown json errors", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+
+ yield* test.push(fail({ error: { message: "no_kv_space" } }, start()))
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "json")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const value = yield* handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "json" }],
+ tools: {},
+ })
+
+ expect(value).toBe("stop")
+ expect(yield* test.calls).toBe(1)
+ expect(yield* test.inputs).toHaveLength(1)
+ expect(handle.message.error?.name).toBe("UnknownError")
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests retry recognized structured json errors", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+
+ yield* test.push(fail({ type: "error", error: { type: "too_many_requests" } }, start()))
+ yield* test.reply(start(), textStart(), textDelta("t", "after"), textEnd(), finishStep(), finish())
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "retry json")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const value = yield* handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "retry json" }],
+ tools: {},
+ })
+
+ const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
+
+ expect(value).toBe("continue")
+ expect(yield* test.calls).toBe(2)
+ expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
+ expect(handle.message.error).toBeUndefined()
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests publish retry status updates", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+ const bus = yield* Bus.Service
+
+ yield* test.push(
+ fail(
+ new APICallError({
+ message: "boom",
+ url: "https://example.com/v1/chat/completions",
+ requestBodyValues: {},
+ statusCode: 503,
+ responseHeaders: { "retry-after-ms": "0" },
+ responseBody: '{"error":"boom"}',
+ isRetryable: true,
+ }),
+ start(),
+ ),
+ )
+ yield* test.reply(start(), finishStep(), finish())
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "retry")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const states: number[] = []
+ const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
+ if (evt.properties.sessionID !== chat.id) return
+ if (evt.properties.status.type === "retry") states.push(evt.properties.status.attempt)
+ })
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const value = yield* handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "retry" }],
+ tools: {},
+ })
+
+ off()
+
+ expect(value).toBe("continue")
+ expect(yield* test.calls).toBe(2)
+ expect(states).toStrictEqual([1])
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests compact on structured context overflow", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+
+ yield* test.push(fail({ type: "error", error: { code: "context_length_exceeded" } }, start()))
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "compact json")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const value = yield* handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "compact json" }],
+ tools: {},
+ })
+
+ expect(value).toBe("compact")
+ expect(yield* test.calls).toBe(1)
+ expect(handle.message.error).toBeUndefined()
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests mark pending tools as aborted on cleanup", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const seen = defer<void>()
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+
+ yield* test.push((input) =>
+ hang(input, start(), toolInputStart("tool-1", "bash"), toolCall("tool-1", "bash", { cmd: "pwd" })).pipe(
+ Stream.tap((event) => (event.type === "tool-call" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "tool abort")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const run = Effect.runPromise(
+ handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "tool abort" }],
+ tools: {},
+ }),
+ )
+
+ yield* Effect.promise(() => ready.promise)
+ abort.abort()
+
+ const value = yield* Effect.promise(() => run)
+ const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
+ const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
+
+ expect(value).toBe("stop")
+ expect(yield* test.calls).toBe(1)
+ expect(tool?.state.status).toBe("error")
+ if (tool?.state.status === "error") {
+ expect(tool.state.error).toBe("Tool execution aborted")
+ expect(tool.state.time.end).toBeDefined()
+ }
+ }),
+ { git: true },
+ )
+})
+
+it.effect("session.processor effect tests record aborted errors and idle state", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const seen = defer<void>()
+ const test = yield* TestLLM
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+ const bus = yield* Bus.Service
+ const status = yield* SessionStatus.Service
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "abort")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const abort = new AbortController()
+ const mdl = model(100)
+ const errs: string[] = []
+ const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
+ if (evt.properties.sessionID !== chat.id) return
+ if (!evt.properties.error) return
+ errs.push(evt.properties.error.name)
+ seen.resolve()
+ })
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ abort: abort.signal,
+ })
+
+ const run = Effect.runPromise(
+ handle.process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ abort: abort.signal,
+ messages: [{ role: "user", content: "abort" }],
+ tools: {},
+ }),
+ )
+
+ yield* Effect.promise(() => ready.promise)
+ abort.abort()
+
+ const value = yield* Effect.promise(() => run)
+ yield* Effect.promise(() => seen.promise)
+ const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
+ const state = yield* status.get(chat.id)
+ off()
+
+ expect(value).toBe("stop")
+ expect(handle.message.error?.name).toBe("MessageAbortedError")
+ expect(stored.info.role).toBe("assistant")
+ if (stored.info.role === "assistant") {
+ expect(stored.info.error?.name).toBe("MessageAbortedError")
+ }
+ expect(state).toMatchObject({ type: "idle" })
+ expect(errs).toContain("MessageAbortedError")
+ }),
+ { git: true },
+ )
+})
diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts
index 7d1d42905..51d2e1194 100644
--- a/packages/opencode/test/session/prompt.test.ts
+++ b/packages/opencode/test/session/prompt.test.ts
@@ -12,6 +12,83 @@ import { tmpdir } from "../fixture/fixture"
Log.init({ print: false })
+function defer<T>() {
+ let resolve!: (value: T | PromiseLike<T>) => void
+ const promise = new Promise<T>((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+}
+
+function chat(text: string) {
+ const payload =
+ [
+ `data: ${JSON.stringify({
+ id: "chatcmpl-1",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { role: "assistant" } }],
+ })}`,
+ `data: ${JSON.stringify({
+ id: "chatcmpl-1",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { content: text } }],
+ })}`,
+ `data: ${JSON.stringify({
+ id: "chatcmpl-1",
+ object: "chat.completion.chunk",
+ choices: [{ delta: {}, finish_reason: "stop" }],
+ })}`,
+ "data: [DONE]",
+ ].join("\n\n") + "\n\n"
+
+ const encoder = new TextEncoder()
+ return new ReadableStream<Uint8Array>({
+ start(ctrl) {
+ ctrl.enqueue(encoder.encode(payload))
+ ctrl.close()
+ },
+ })
+}
+
+function hanging(ready: () => void) {
+ const encoder = new TextEncoder()
+ let timer: ReturnType<typeof setTimeout> | undefined
+ const first =
+ `data: ${JSON.stringify({
+ id: "chatcmpl-1",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { role: "assistant" } }],
+ })}` + "\n\n"
+ const rest =
+ [
+ `data: ${JSON.stringify({
+ id: "chatcmpl-1",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { content: "late" } }],
+ })}`,
+ `data: ${JSON.stringify({
+ id: "chatcmpl-1",
+ object: "chat.completion.chunk",
+ choices: [{ delta: {}, finish_reason: "stop" }],
+ })}`,
+ "data: [DONE]",
+ ].join("\n\n") + "\n\n"
+
+ return new ReadableStream<Uint8Array>({
+ start(ctrl) {
+ ctrl.enqueue(encoder.encode(first))
+ ready()
+ timer = setTimeout(() => {
+ ctrl.enqueue(encoder.encode(rest))
+ ctrl.close()
+ }, 10000)
+ },
+ cancel() {
+ if (timer) clearTimeout(timer)
+ },
+ })
+}
+
describe("session.prompt missing file", () => {
test("does not fail the prompt when a file part is missing", async () => {
await using tmp = await tmpdir({
@@ -149,6 +226,159 @@ describe("session.prompt special characters", () => {
})
})
+describe("session.prompt regression", () => {
+ test("does not loop empty assistant turns for a simple reply", async () => {
+ let calls = 0
+ const server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ const url = new URL(req.url)
+ if (!url.pathname.endsWith("/chat/completions")) {
+ return new Response("not found", { status: 404 })
+ }
+ calls++
+ return new Response(chat("packages/opencode/src/session/processor.ts"), {
+ status: 200,
+ headers: { "Content-Type": "text/event-stream" },
+ })
+ },
+ })
+
+ try {
+ await using tmp = await tmpdir({
+ git: true,
+ init: async (dir) => {
+ await Bun.write(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({
+ $schema: "https://opencode.ai/config.json",
+ enabled_providers: ["alibaba"],
+ provider: {
+ alibaba: {
+ options: {
+ apiKey: "test-key",
+ baseURL: `${server.url.origin}/v1`,
+ },
+ },
+ },
+ agent: {
+ build: {
+ model: "alibaba/qwen-plus",
+ },
+ },
+ }),
+ )
+ },
+ })
+
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({ title: "Prompt regression" })
+ const result = await SessionPrompt.prompt({
+ sessionID: session.id,
+ agent: "build",
+ parts: [{ type: "text", text: "Where is SessionProcessor?" }],
+ })
+
+ expect(result.info.role).toBe("assistant")
+ expect(result.parts.some((part) => part.type === "text" && part.text.includes("processor.ts"))).toBe(true)
+
+ const msgs = await Session.messages({ sessionID: session.id })
+ expect(msgs.filter((msg) => msg.info.role === "assistant")).toHaveLength(1)
+ expect(calls).toBe(1)
+ },
+ })
+ } finally {
+ server.stop(true)
+ }
+ })
+
+ test("records aborted errors when prompt is cancelled mid-stream", async () => {
+ const ready = defer<void>()
+ const server = Bun.serve({
+ port: 0,
+ fetch(req) {
+ const url = new URL(req.url)
+ if (!url.pathname.endsWith("/chat/completions")) {
+ return new Response("not found", { status: 404 })
+ }
+ return new Response(
+ hanging(() => ready.resolve()),
+ {
+ status: 200,
+ headers: { "Content-Type": "text/event-stream" },
+ },
+ )
+ },
+ })
+
+ try {
+ await using tmp = await tmpdir({
+ git: true,
+ init: async (dir) => {
+ await Bun.write(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({
+ $schema: "https://opencode.ai/config.json",
+ enabled_providers: ["alibaba"],
+ provider: {
+ alibaba: {
+ options: {
+ apiKey: "test-key",
+ baseURL: `${server.url.origin}/v1`,
+ },
+ },
+ },
+ agent: {
+ build: {
+ model: "alibaba/qwen-plus",
+ },
+ },
+ }),
+ )
+ },
+ })
+
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({ title: "Prompt cancel regression" })
+ const run = SessionPrompt.prompt({
+ sessionID: session.id,
+ agent: "build",
+ parts: [{ type: "text", text: "Cancel me" }],
+ })
+
+ await ready.promise
+ await SessionPrompt.cancel(session.id)
+
+ const result = await Promise.race([
+ run,
+ new Promise<never>((_, reject) =>
+ setTimeout(() => reject(new Error("timed out waiting for cancel")), 1000),
+ ),
+ ])
+
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") {
+ expect(result.info.error?.name).toBe("MessageAbortedError")
+ }
+
+ const msgs = await Session.messages({ sessionID: session.id })
+ const last = msgs.findLast((msg) => msg.info.role === "assistant")
+ expect(last?.info.role).toBe("assistant")
+ if (last?.info.role === "assistant") {
+ expect(last.info.error?.name).toBe("MessageAbortedError")
+ }
+ },
+ })
+ } finally {
+ server.stop(true)
+ }
+ })
+})
+
describe("session.prompt agent variant", () => {
test("applies agent variant only when using agent model", async () => {
const prev = process.env.OPENAI_API_KEY
diff --git a/packages/opencode/test/session/retry.test.ts b/packages/opencode/test/session/retry.test.ts
index a61c44262..dfeb7e9a4 100644
--- a/packages/opencode/test/session/retry.test.ts
+++ b/packages/opencode/test/session/retry.test.ts
@@ -2,9 +2,14 @@ import { describe, expect, test } from "bun:test"
import type { NamedError } from "@opencode-ai/util/error"
import { APICallError } from "ai"
import { setTimeout as sleep } from "node:timers/promises"
+import { Effect, Schedule } from "effect"
import { SessionRetry } from "../../src/session/retry"
import { MessageV2 } from "../../src/session/message-v2"
import { ProviderID } from "../../src/provider/schema"
+import { SessionID } from "../../src/session/schema"
+import { SessionStatus } from "../../src/session/status"
+import { Instance } from "../../src/project/instance"
+import { tmpdir } from "../fixture/fixture"
const providerID = ProviderID.make("test")
@@ -69,24 +74,47 @@ describe("session.retry.delay", () => {
expect(SessionRetry.delay(1, longError)).toBe(700000)
})
- test("sleep caps delay to max 32-bit signed integer to avoid TimeoutOverflowWarning", async () => {
- const controller = new AbortController()
-
- const warnings: string[] = []
- const originalWarn = process.emitWarning
- process.emitWarning = (warning: string | Error) => {
- warnings.push(typeof warning === "string" ? warning : warning.message)
- }
-
- const promise = SessionRetry.sleep(2_560_914_000, controller.signal)
- controller.abort()
-
- try {
- await promise
- } catch {}
-
- process.emitWarning = originalWarn
- expect(warnings.some((w) => w.includes("TimeoutOverflowWarning"))).toBe(false)
+ test("caps oversized header delays to the runtime timer limit", () => {
+ const error = apiError({ "retry-after-ms": "999999999999" })
+ expect(SessionRetry.delay(1, error)).toBe(SessionRetry.RETRY_MAX_DELAY)
+ })
+
+ test("policy updates retry status and increments attempts", async () => {
+ await using tmp = await tmpdir()
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const sessionID = SessionID.make("session-retry-test")
+ const error = apiError({ "retry-after-ms": "0" })
+
+ await Effect.runPromise(
+ Effect.gen(function* () {
+ const step = yield* Schedule.toStepWithMetadata(
+ SessionRetry.policy({
+ parse: (err) => err as MessageV2.APIError,
+ set: (info) =>
+ Effect.promise(() =>
+ SessionStatus.set(sessionID, {
+ type: "retry",
+ attempt: info.attempt,
+ message: info.message,
+ next: info.next,
+ }),
+ ),
+ }),
+ )
+ yield* step(error)
+ yield* step(error)
+ }),
+ )
+
+ expect(await SessionStatus.get(sessionID)).toMatchObject({
+ type: "retry",
+ attempt: 2,
+ message: "boom",
+ })
+ },
+ })
})
})
@@ -101,9 +129,9 @@ describe("session.retry.retryable", () => {
expect(SessionRetry.retryable(error)).toBe("Provider is overloaded")
})
- test("handles json messages without code", () => {
+ test("does not retry unknown json messages", () => {
const error = wrap(JSON.stringify({ error: { message: "no_kv_space" } }))
- expect(SessionRetry.retryable(error)).toBe(`{"error":{"message":"no_kv_space"}}`)
+ expect(SessionRetry.retryable(error)).toBeUndefined()
})
test("does not throw on numeric error codes", () => {