summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-04-08 21:19:01 -0400
committerGitHub <[email protected]>2026-04-08 21:19:01 -0400
commit8bdcc22541e074be668093734fc3bcc4191dc705 (patch)
tree3054aec69809dddf2955f25e776a754c04b947bf
parent2bdd279467632ab048c39c5dbaadee1ee33dc510 (diff)
downloadopencode-8bdcc22541e074be668093734fc3bcc4191dc705.tar.gz
opencode-8bdcc22541e074be668093734fc3bcc4191dc705.zip
refactor(effect): inline session processor interrupt cleanup (#21593)
-rw-r--r--packages/opencode/src/file/time.ts2
-rw-r--r--packages/opencode/src/mcp/index.ts2
-rw-r--r--packages/opencode/src/project/project.ts2
-rw-r--r--packages/opencode/src/session/compaction.ts32
-rw-r--r--packages/opencode/src/session/processor.ts31
-rw-r--r--packages/opencode/src/session/prompt.ts190
-rw-r--r--packages/opencode/src/tool/read.ts4
-rw-r--r--packages/opencode/test/session/compaction.test.ts1
-rw-r--r--packages/opencode/test/session/processor-effect.test.ts6
9 files changed, 119 insertions, 151 deletions
diff --git a/packages/opencode/src/file/time.ts b/packages/opencode/src/file/time.ts
index bd2b5f04f..d5ca3db85 100644
--- a/packages/opencode/src/file/time.ts
+++ b/packages/opencode/src/file/time.ts
@@ -46,7 +46,7 @@ export namespace FileTime {
const disableCheck = yield* Flag.OPENCODE_DISABLE_FILETIME_CHECK
const stamp = Effect.fnUntraced(function* (file: string) {
- const info = yield* fsys.stat(file).pipe(Effect.catch(() => Effect.succeed(undefined)))
+ const info = yield* fsys.stat(file).pipe(Effect.catch(() => Effect.void))
return {
read: yield* DateTime.nowAsDate,
mtime: info ? Option.getOrUndefined(info.mtime)?.getTime() : undefined,
diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts
index 8c92bb6b2..45e3e2567 100644
--- a/packages/opencode/src/mcp/index.ts
+++ b/packages/opencode/src/mcp/index.ts
@@ -501,7 +501,7 @@ export namespace MCP {
return
}
- const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.succeed(undefined)))
+ const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.void))
if (!result) return
s.status[key] = result.status
diff --git a/packages/opencode/src/project/project.ts b/packages/opencode/src/project/project.ts
index f4b8b940d..f587f50b3 100644
--- a/packages/opencode/src/project/project.ts
+++ b/packages/opencode/src/project/project.ts
@@ -158,7 +158,7 @@ export namespace Project {
return yield* fs.readFileString(pathSvc.join(dir, "opencode")).pipe(
Effect.map((x) => x.trim()),
Effect.map(ProjectID.make),
- Effect.catch(() => Effect.succeed(undefined)),
+ Effect.catch(() => Effect.void),
)
})
diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts
index bbdce9fd7..0961c20a6 100644
--- a/packages/opencode/src/session/compaction.ts
+++ b/packages/opencode/src/session/compaction.ts
@@ -253,23 +253,21 @@ When constructing the summary, try to stick to this template:
sessionID: input.sessionID,
model,
})
- const result = yield* processor
- .process({
- user: userMessage,
- agent,
- sessionID: input.sessionID,
- tools: {},
- system: [],
- messages: [
- ...modelMessages,
- {
- role: "user",
- content: [{ type: "text", text: prompt }],
- },
- ],
- model,
- })
- .pipe(Effect.onInterrupt(() => processor.abort()))
+ const result = yield* processor.process({
+ user: userMessage,
+ agent,
+ sessionID: input.sessionID,
+ tools: {},
+ system: [],
+ messages: [
+ ...modelMessages,
+ {
+ role: "user",
+ content: [{ type: "text", text: prompt }],
+ },
+ ],
+ model,
+ })
if (result === "compact") {
processor.message.error = new MessageV2.ContextOverflowError({
diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts
index 146c73f27..74c4a0cdb 100644
--- a/packages/opencode/src/session/processor.ts
+++ b/packages/opencode/src/session/processor.ts
@@ -30,7 +30,6 @@ export namespace SessionProcessor {
export interface Handle {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
- readonly abort: () => Effect.Effect<void>
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
}
@@ -429,19 +428,6 @@ export namespace SessionProcessor {
yield* status.set(ctx.sessionID, { type: "idle" })
})
- const abort = Effect.fn("SessionProcessor.abort")(() =>
- Effect.gen(function* () {
- if (!ctx.assistantMessage.error) {
- yield* halt(new DOMException("Aborted", "AbortError"))
- }
- if (!ctx.assistantMessage.time.completed) {
- yield* cleanup()
- return
- }
- yield* session.updateMessage(ctx.assistantMessage)
- }),
- )
-
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
log.info("process")
ctx.needsCompaction = false
@@ -459,7 +445,14 @@ export namespace SessionProcessor {
Stream.runDrain,
)
}).pipe(
- Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))),
+ Effect.onInterrupt(() =>
+ Effect.gen(function* () {
+ aborted = true
+ if (!ctx.assistantMessage.error) {
+ yield* halt(new DOMException("Aborted", "AbortError"))
+ }
+ }),
+ ),
Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause),
(cause) => Effect.fail(Cause.squash(cause)),
@@ -480,13 +473,10 @@ export namespace SessionProcessor {
Effect.ensuring(cleanup()),
)
- if (aborted && !ctx.assistantMessage.error) {
- yield* abort()
- }
if (ctx.needsCompaction) return "compact"
- if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
+ if (ctx.blocked || ctx.assistantMessage.error) return "stop"
return "continue"
- }).pipe(Effect.onInterrupt(() => abort().pipe(Effect.asVoid)))
+ })
})
return {
@@ -496,7 +486,6 @@ export namespace SessionProcessor {
partFromToolCall(toolCallID: string) {
return ctx.toolcalls[toolCallID]
},
- abort,
process,
} satisfies Handle
})
diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index a18f9e379..118206332 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -964,9 +964,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const same = ag.model && model.providerID === ag.model.providerID && model.modelID === ag.model.modelID
const full =
!input.variant && ag.variant && same
- ? yield* provider
- .getModel(model.providerID, model.modelID)
- .pipe(Effect.catch(() => Effect.succeed(undefined)))
+ ? yield* provider.getModel(model.providerID, model.modelID).pipe(Effect.catchDefect(() => Effect.void))
: undefined
const variant = input.variant ?? (ag.variant && full?.variants?.[ag.variant] ? ag.variant : undefined)
@@ -986,9 +984,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
format: input.format,
}
- yield* Effect.addFinalizer(() =>
- InstanceState.withALS(() => instruction.clear(info.id)).pipe(Effect.flatMap((x) => x)),
- )
+ yield* Effect.addFinalizer(() => instruction.clear(info.id))
type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
@@ -1459,110 +1455,104 @@ NOTE: At any point in time through this workflow you should feel free to ask the
model,
})
- const outcome: "break" | "continue" = yield* Effect.onExit(
- Effect.gen(function* () {
- const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
- const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
-
- const tools = yield* resolveTools({
- agent,
- session,
- model,
- tools: lastUser.tools,
- processor: handle,
- bypassAgentCheck,
- messages: msgs,
- })
+ const outcome: "break" | "continue" = yield* Effect.gen(function* () {
+ const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
+ const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
- if (lastUser.format?.type === "json_schema") {
- tools["StructuredOutput"] = createStructuredOutputTool({
- schema: lastUser.format.schema,
- onSuccess(output) {
- structured = output
- },
- })
- }
+ const tools = yield* resolveTools({
+ agent,
+ session,
+ model,
+ tools: lastUser.tools,
+ processor: handle,
+ bypassAgentCheck,
+ messages: msgs,
+ })
- if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
-
- if (step > 1 && lastFinished) {
- for (const m of msgs) {
- if (m.info.role !== "user" || m.info.id <= lastFinished.id) continue
- for (const p of m.parts) {
- if (p.type !== "text" || p.ignored || p.synthetic) continue
- if (!p.text.trim()) continue
- p.text = [
- "<system-reminder>",
- "The user sent the following message:",
- p.text,
- "",
- "Please address this message and continue with your tasks.",
- "</system-reminder>",
- ].join("\n")
- }
+ if (lastUser.format?.type === "json_schema") {
+ tools["StructuredOutput"] = createStructuredOutputTool({
+ schema: lastUser.format.schema,
+ onSuccess(output) {
+ structured = output
+ },
+ })
+ }
+
+ if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
+
+ if (step > 1 && lastFinished) {
+ for (const m of msgs) {
+ if (m.info.role !== "user" || m.info.id <= lastFinished.id) continue
+ for (const p of m.parts) {
+ if (p.type !== "text" || p.ignored || p.synthetic) continue
+ if (!p.text.trim()) continue
+ p.text = [
+ "<system-reminder>",
+ "The user sent the following message:",
+ p.text,
+ "",
+ "Please address this message and continue with your tasks.",
+ "</system-reminder>",
+ ].join("\n")
}
}
+ }
- yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
-
- const [skills, env, instructions, modelMsgs] = yield* Effect.all([
- Effect.promise(() => SystemPrompt.skills(agent)),
- Effect.promise(() => SystemPrompt.environment(model)),
- instruction.system().pipe(Effect.orDie),
- Effect.promise(() => MessageV2.toModelMessages(msgs, model)),
- ])
- const system = [...env, ...(skills ? [skills] : []), ...instructions]
- const format = lastUser.format ?? { type: "text" as const }
- if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
- const result = yield* handle.process({
- user: lastUser,
- agent,
- permission: session.permission,
- sessionID,
- parentSessionID: session.parentID,
- system,
- messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
- tools,
- model,
- toolChoice: format.type === "json_schema" ? "required" : undefined,
- })
+ yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
+
+ const [skills, env, instructions, modelMsgs] = yield* Effect.all([
+ Effect.promise(() => SystemPrompt.skills(agent)),
+ Effect.promise(() => SystemPrompt.environment(model)),
+ instruction.system().pipe(Effect.orDie),
+ Effect.promise(() => MessageV2.toModelMessages(msgs, model)),
+ ])
+ const system = [...env, ...(skills ? [skills] : []), ...instructions]
+ const format = lastUser.format ?? { type: "text" as const }
+ if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
+ const result = yield* handle.process({
+ user: lastUser,
+ agent,
+ permission: session.permission,
+ sessionID,
+ parentSessionID: session.parentID,
+ system,
+ messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
+ tools,
+ model,
+ toolChoice: format.type === "json_schema" ? "required" : undefined,
+ })
+
+ if (structured !== undefined) {
+ handle.message.structured = structured
+ handle.message.finish = handle.message.finish ?? "stop"
+ yield* sessions.updateMessage(handle.message)
+ return "break" as const
+ }
- if (structured !== undefined) {
- handle.message.structured = structured
- handle.message.finish = handle.message.finish ?? "stop"
+ const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
+ if (finished && !handle.message.error) {
+ if (format.type === "json_schema") {
+ handle.message.error = new MessageV2.StructuredOutputError({
+ message: "Model did not produce structured output",
+ retries: 0,
+ }).toObject()
yield* sessions.updateMessage(handle.message)
return "break" as const
}
+ }
- const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
- if (finished && !handle.message.error) {
- if (format.type === "json_schema") {
- handle.message.error = new MessageV2.StructuredOutputError({
- message: "Model did not produce structured output",
- retries: 0,
- }).toObject()
- yield* sessions.updateMessage(handle.message)
- return "break" as const
- }
- }
-
- if (result === "stop") return "break" as const
- if (result === "compact") {
- yield* compaction.create({
- sessionID,
- agent: lastUser.agent,
- model: lastUser.model,
- auto: true,
- overflow: !handle.message.finish,
- })
- }
- return "continue" as const
- }),
- Effect.fnUntraced(function* (exit) {
- if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
- yield* InstanceState.withALS(() => instruction.clear(handle.message.id)).pipe(Effect.flatMap((x) => x))
- }),
- )
+ if (result === "stop") return "break" as const
+ if (result === "compact") {
+ yield* compaction.create({
+ sessionID,
+ agent: lastUser.agent,
+ model: lastUser.model,
+ auto: true,
+ overflow: !handle.message.finish,
+ })
+ }
+ return "continue" as const
+ }).pipe(Effect.ensuring(instruction.clear(handle.message.id)))
if (outcome === "break") break
continue
}
diff --git a/packages/opencode/src/tool/read.ts b/packages/opencode/src/tool/read.ts
index 0b44c7ad5..f963b415b 100644
--- a/packages/opencode/src/tool/read.ts
+++ b/packages/opencode/src/tool/read.ts
@@ -67,9 +67,7 @@ export const ReadTool = Tool.defineEffect(
if (item.type === "directory") return item.name + "/"
if (item.type !== "symlink") return item.name
- const target = yield* fs
- .stat(path.join(filepath, item.name))
- .pipe(Effect.catch(() => Effect.succeed(undefined)))
+ const target = yield* fs.stat(path.join(filepath, item.name)).pipe(Effect.catch(() => Effect.void))
if (target?.type === "Directory") return item.name + "/"
return item.name
}),
diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts
index 799bb3e2a..c37371d9f 100644
--- a/packages/opencode/test/session/compaction.test.ts
+++ b/packages/opencode/test/session/compaction.test.ts
@@ -139,7 +139,6 @@ function fake(
get message() {
return msg
},
- abort: Effect.fn("TestSessionProcessor.abort")(() => Effect.void),
partFromToolCall() {
return {
id: PartID.ascending(),
diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts
index 0fc25c1a6..86149272b 100644
--- a/packages/opencode/test/session/processor-effect.test.ts
+++ b/packages/opencode/test/session/processor-effect.test.ts
@@ -593,9 +593,6 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
yield* Fiber.interrupt(run)
const exit = yield* Fiber.await(run)
- if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
- yield* handle.abort()
- }
const parts = MessageV2.parts(msg.id)
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
@@ -665,9 +662,6 @@ it.live("session.processor effect tests record aborted errors and idle state", (
yield* Fiber.interrupt(run)
const exit = yield* Fiber.await(run)
- if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
- yield* handle.abort()
- }
yield* Effect.promise(() => seen.promise)
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)