summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--packages/opencode/src/session/llm.ts675
-rw-r--r--packages/opencode/test/session/llm.test.ts119
2 files changed, 355 insertions, 439 deletions
diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts
index 8df70b673..3ab35958a 100644
--- a/packages/opencode/src/session/llm.ts
+++ b/packages/opencode/src/session/llm.ts
@@ -1,7 +1,6 @@
import { Provider } from "@/provider/provider"
import { Log } from "@/util/log"
-import { Cause, Effect, Layer, Record, Context } from "effect"
-import * as Queue from "effect/Queue"
+import { Context, Effect, Layer, Record } from "effect"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import { mergeDeep, pipe } from "remeda"
@@ -21,11 +20,13 @@ import { Wildcard } from "@/util/wildcard"
import { SessionID } from "@/session/schema"
import { Auth } from "@/auth"
import { Installation } from "@/installation"
-import { AppRuntime } from "@/effect/app-runtime"
+import { makeRuntime } from "@/effect/run-service"
export namespace LLM {
const log = Log.create({ service: "llm" })
+ const perms = makeRuntime(Permission.Service, Permission.defaultLayer)
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
+ type Result = Awaited<ReturnType<typeof streamText>>
export type StreamInput = {
user: MessageV2.User
@@ -46,7 +47,7 @@ export namespace LLM {
abort: AbortSignal
}
- export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never
+ export type Event = Result["fullStream"] extends AsyncIterable<infer T> ? T : never
export interface Interface {
readonly stream: (input: StreamInput) => Stream.Stream<Event, unknown>
@@ -54,360 +55,368 @@ export namespace LLM {
export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
- export const layer = Layer.effect(
- Service,
- Effect.gen(function* () {
- return Service.of({
- stream(input) {
- return Stream.scoped(
- Stream.unwrap(
- Effect.gen(function* () {
- const ctrl = yield* Effect.acquireRelease(
- Effect.sync(() => new AbortController()),
- (ctrl) => Effect.sync(() => ctrl.abort()),
- )
+ export const layer: Layer.Layer<Service, never, Auth.Service | Config.Service | Provider.Service | Plugin.Service> =
+ Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const auth = yield* Auth.Service
+ const config = yield* Config.Service
+ const provider = yield* Provider.Service
+ const plugin = yield* Plugin.Service
- const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal }))
+ const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
+ const l = log
+ .clone()
+ .tag("providerID", input.model.providerID)
+ .tag("modelID", input.model.id)
+ .tag("sessionID", input.sessionID)
+ .tag("small", (input.small ?? false).toString())
+ .tag("agent", input.agent.name)
+ .tag("mode", input.agent.mode)
+ l.info("stream", {
+ modelID: input.model.id,
+ providerID: input.model.providerID,
+ })
- return Stream.fromAsyncIterable(result.fullStream, (e) =>
- e instanceof Error ? e : new Error(String(e)),
- )
- }),
- ),
+ const [language, cfg, item, info] = yield* Effect.all(
+ [
+ provider.getLanguage(input.model),
+ config.get(),
+ provider.getProvider(input.model.providerID),
+ auth.get(input.model.providerID),
+ ],
+ { concurrency: "unbounded" },
)
- },
- })
- }),
- )
- export const defaultLayer = layer
+ // TODO: move this to a proper hook
+ const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
- export async function stream(input: StreamRequest) {
- const l = log
- .clone()
- .tag("providerID", input.model.providerID)
- .tag("modelID", input.model.id)
- .tag("sessionID", input.sessionID)
- .tag("small", (input.small ?? false).toString())
- .tag("agent", input.agent.name)
- .tag("mode", input.agent.mode)
- l.info("stream", {
- modelID: input.model.id,
- providerID: input.model.providerID,
- })
- const [language, cfg, provider, info] = await Effect.runPromise(
- Effect.gen(function* () {
- const auth = yield* Auth.Service
- const cfg = yield* Config.Service
- const provider = yield* Provider.Service
- return yield* Effect.all(
- [
- provider.getLanguage(input.model),
- cfg.get(),
- provider.getProvider(input.model.providerID),
- auth.get(input.model.providerID),
- ],
- { concurrency: "unbounded" },
- )
- }).pipe(Effect.provide(Layer.mergeAll(Auth.defaultLayer, Config.defaultLayer, Provider.defaultLayer))),
- )
- // TODO: move this to a proper hook
- const isOpenaiOauth = provider.id === "openai" && info?.type === "oauth"
+ const system: string[] = []
+ system.push(
+ [
+ // use agent prompt otherwise provider prompt
+ ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
+ // any custom prompt passed into this call
+ ...input.system,
+ // any custom prompt from last user message
+ ...(input.user.system ? [input.user.system] : []),
+ ]
+ .filter((x) => x)
+ .join("\n"),
+ )
- const system: string[] = []
- system.push(
- [
- // use agent prompt otherwise provider prompt
- ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
- // any custom prompt passed into this call
- ...input.system,
- // any custom prompt from last user message
- ...(input.user.system ? [input.user.system] : []),
- ]
- .filter((x) => x)
- .join("\n"),
- )
+ const header = system[0]
+ yield* plugin.trigger(
+ "experimental.chat.system.transform",
+ { sessionID: input.sessionID, model: input.model },
+ { system },
+ )
+ // rejoin to maintain 2-part structure for caching if header unchanged
+ if (system.length > 2 && system[0] === header) {
+ const rest = system.slice(1)
+ system.length = 0
+ system.push(header, rest.join("\n"))
+ }
- const header = system[0]
- await Plugin.trigger(
- "experimental.chat.system.transform",
- { sessionID: input.sessionID, model: input.model },
- { system },
- )
- // rejoin to maintain 2-part structure for caching if header unchanged
- if (system.length > 2 && system[0] === header) {
- const rest = system.slice(1)
- system.length = 0
- system.push(header, rest.join("\n"))
- }
+ const variant =
+ !input.small && input.model.variants && input.user.model.variant
+ ? input.model.variants[input.user.model.variant]
+ : {}
+ const base = input.small
+ ? ProviderTransform.smallOptions(input.model)
+ : ProviderTransform.options({
+ model: input.model,
+ sessionID: input.sessionID,
+ providerOptions: item.options,
+ })
+ const options: Record<string, any> = pipe(
+ base,
+ mergeDeep(input.model.options),
+ mergeDeep(input.agent.options),
+ mergeDeep(variant),
+ )
+ if (isOpenaiOauth) {
+ options.instructions = system.join("\n")
+ }
- const variant =
- !input.small && input.model.variants && input.user.model.variant
- ? input.model.variants[input.user.model.variant]
- : {}
- const base = input.small
- ? ProviderTransform.smallOptions(input.model)
- : ProviderTransform.options({
- model: input.model,
- sessionID: input.sessionID,
- providerOptions: provider.options,
- })
- const options: Record<string, any> = pipe(
- base,
- mergeDeep(input.model.options),
- mergeDeep(input.agent.options),
- mergeDeep(variant),
- )
- if (isOpenaiOauth) {
- options.instructions = system.join("\n")
- }
+ const isWorkflow = language instanceof GitLabWorkflowLanguageModel
+ const messages = isOpenaiOauth
+ ? input.messages
+ : isWorkflow
+ ? input.messages
+ : [
+ ...system.map(
+ (x): ModelMessage => ({
+ role: "system",
+ content: x,
+ }),
+ ),
+ ...input.messages,
+ ]
- const isWorkflow = language instanceof GitLabWorkflowLanguageModel
- const messages = isOpenaiOauth
- ? input.messages
- : isWorkflow
- ? input.messages
- : [
- ...system.map(
- (x): ModelMessage => ({
- role: "system",
- content: x,
- }),
- ),
- ...input.messages,
- ]
+ const params = yield* plugin.trigger(
+ "chat.params",
+ {
+ sessionID: input.sessionID,
+ agent: input.agent.name,
+ model: input.model,
+ provider: item,
+ message: input.user,
+ },
+ {
+ temperature: input.model.capabilities.temperature
+ ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
+ : undefined,
+ topP: input.agent.topP ?? ProviderTransform.topP(input.model),
+ topK: ProviderTransform.topK(input.model),
+ maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
+ options,
+ },
+ )
- const params = await Plugin.trigger(
- "chat.params",
- {
- sessionID: input.sessionID,
- agent: input.agent.name,
- model: input.model,
- provider,
- message: input.user,
- },
- {
- temperature: input.model.capabilities.temperature
- ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
- : undefined,
- topP: input.agent.topP ?? ProviderTransform.topP(input.model),
- topK: ProviderTransform.topK(input.model),
- maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
- options,
- },
- )
+ const { headers } = yield* plugin.trigger(
+ "chat.headers",
+ {
+ sessionID: input.sessionID,
+ agent: input.agent.name,
+ model: input.model,
+ provider: item,
+ message: input.user,
+ },
+ {
+ headers: {},
+ },
+ )
- const { headers } = await Plugin.trigger(
- "chat.headers",
- {
- sessionID: input.sessionID,
- agent: input.agent.name,
- model: input.model,
- provider,
- message: input.user,
- },
- {
- headers: {},
- },
- )
+ const tools = resolveTools(input)
- const tools = resolveTools(input)
+ // LiteLLM and some Anthropic proxies require the tools parameter to be present
+ // when message history contains tool calls, even if no tools are being used.
+ // Add a dummy tool that is never called to satisfy this validation.
+ // This is enabled for:
+ // 1. Providers with "litellm" in their ID or API ID (auto-detected)
+ // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
+ const isLiteLLMProxy =
+ item.options?.["litellmProxy"] === true ||
+ input.model.providerID.toLowerCase().includes("litellm") ||
+ input.model.api.id.toLowerCase().includes("litellm")
- // LiteLLM and some Anthropic proxies require the tools parameter to be present
- // when message history contains tool calls, even if no tools are being used.
- // Add a dummy tool that is never called to satisfy this validation.
- // This is enabled for:
- // 1. Providers with "litellm" in their ID or API ID (auto-detected)
- // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
- const isLiteLLMProxy =
- provider.options?.["litellmProxy"] === true ||
- input.model.providerID.toLowerCase().includes("litellm") ||
- input.model.api.id.toLowerCase().includes("litellm")
+ // LiteLLM/Bedrock rejects requests where the message history contains tool
+ // calls but no tools param is present. When there are no active tools (e.g.
+ // during compaction), inject a stub tool to satisfy the validation requirement.
+ // The stub description explicitly tells the model not to call it.
+ if (isLiteLLMProxy && Object.keys(tools).length === 0 && hasToolCalls(input.messages)) {
+ tools["_noop"] = tool({
+ description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
+ inputSchema: jsonSchema({
+ type: "object",
+ properties: {
+ reason: { type: "string", description: "Unused" },
+ },
+ }),
+ execute: async () => ({ output: "", title: "", metadata: {} }),
+ })
+ }
- // LiteLLM/Bedrock rejects requests where the message history contains tool
- // calls but no tools param is present. When there are no active tools (e.g.
- // during compaction), inject a stub tool to satisfy the validation requirement.
- // The stub description explicitly tells the model not to call it.
- if (isLiteLLMProxy && Object.keys(tools).length === 0 && hasToolCalls(input.messages)) {
- tools["_noop"] = tool({
- description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
- inputSchema: jsonSchema({
- type: "object",
- properties: {
- reason: { type: "string", description: "Unused" },
- },
- }),
- execute: async () => ({ output: "", title: "", metadata: {} }),
- })
- }
+ // Wire up toolExecutor for DWS workflow models so that tool calls
+ // from the workflow service are executed via opencode's tool system
+ // and results sent back over the WebSocket.
+ if (language instanceof GitLabWorkflowLanguageModel) {
+ const workflowModel = language as GitLabWorkflowLanguageModel & {
+ sessionID?: string
+ sessionPreapprovedTools?: string[]
+ approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
+ }
+ workflowModel.sessionID = input.sessionID
+ workflowModel.systemPrompt = system.join("\n")
+ workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
+ const t = tools[toolName]
+ if (!t || !t.execute) {
+ return { result: "", error: `Unknown tool: ${toolName}` }
+ }
+ try {
+ const result = await t.execute!(JSON.parse(argsJson), {
+ toolCallId: _requestID,
+ messages: input.messages,
+ abortSignal: input.abort,
+ })
+ const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
+ return {
+ result: output,
+ metadata: typeof result === "object" ? result?.metadata : undefined,
+ title: typeof result === "object" ? result?.title : undefined,
+ }
+ } catch (e: any) {
+ return { result: "", error: e.message ?? String(e) }
+ }
+ }
- // Wire up toolExecutor for DWS workflow models so that tool calls
- // from the workflow service are executed via opencode's tool system
- // and results sent back over the WebSocket.
- if (language instanceof GitLabWorkflowLanguageModel) {
- const workflowModel = language as GitLabWorkflowLanguageModel & {
- sessionID?: string
- sessionPreapprovedTools?: string[]
- approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
- }
- workflowModel.sessionID = input.sessionID
- workflowModel.systemPrompt = system.join("\n")
- workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
- const t = tools[toolName]
- if (!t || !t.execute) {
- return { result: "", error: `Unknown tool: ${toolName}` }
- }
- try {
- const result = await t.execute!(JSON.parse(argsJson), {
- toolCallId: _requestID,
- messages: input.messages,
+ const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
+ workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
+ const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
+ return !match || match.action !== "ask"
+ })
+
+ const approvedToolsForSession = new Set<string>()
+ workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
+ const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
+ // Auto-approve tools that were already approved in this session
+ // (prevents infinite approval loops for server-side MCP tools)
+ if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
+ return { approved: true }
+ }
+
+ const id = PermissionID.ascending()
+ let reply: Permission.Reply | undefined
+ let unsub: (() => void) | undefined
+ try {
+ unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
+ if (evt.properties.requestID === id) reply = evt.properties.reply
+ })
+ const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
+ try {
+ const parsed = JSON.parse(t.args) as Record<string, unknown>
+ const title = (parsed?.title ?? parsed?.name ?? "") as string
+ return title ? `${t.name}: ${title}` : t.name
+ } catch {
+ return t.name
+ }
+ })
+ const uniquePatterns = [...new Set(toolPatterns)] as string[]
+ await perms.runPromise((svc) =>
+ svc.ask({
+ id,
+ sessionID: SessionID.make(input.sessionID),
+ permission: "workflow_tool_approval",
+ patterns: uniquePatterns,
+ metadata: { tools: approvalTools },
+ always: uniquePatterns,
+ ruleset: [],
+ }),
+ )
+ for (const name of uniqueNames) approvedToolsForSession.add(name)
+ workflowModel.sessionPreapprovedTools = [
+ ...(workflowModel.sessionPreapprovedTools ?? []),
+ ...uniqueNames,
+ ]
+ return { approved: true }
+ } catch {
+ return { approved: false }
+ } finally {
+ unsub?.()
+ }
+ })
+ }
+
+ return streamText({
+ onError(error) {
+ l.error("stream error", {
+ error,
+ })
+ },
+ async experimental_repairToolCall(failed) {
+ const lower = failed.toolCall.toolName.toLowerCase()
+ if (lower !== failed.toolCall.toolName && tools[lower]) {
+ l.info("repairing tool call", {
+ tool: failed.toolCall.toolName,
+ repaired: lower,
+ })
+ return {
+ ...failed.toolCall,
+ toolName: lower,
+ }
+ }
+ return {
+ ...failed.toolCall,
+ input: JSON.stringify({
+ tool: failed.toolCall.toolName,
+ error: failed.error.message,
+ }),
+ toolName: "invalid",
+ }
+ },
+ temperature: params.temperature,
+ topP: params.topP,
+ topK: params.topK,
+ providerOptions: ProviderTransform.providerOptions(input.model, params.options),
+ activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
+ tools,
+ toolChoice: input.toolChoice,
+ maxOutputTokens: params.maxOutputTokens,
abortSignal: input.abort,
+ headers: {
+ ...(input.model.providerID.startsWith("opencode")
+ ? {
+ "x-opencode-project": Instance.project.id,
+ "x-opencode-session": input.sessionID,
+ "x-opencode-request": input.user.id,
+ "x-opencode-client": Flag.OPENCODE_CLIENT,
+ }
+ : {
+ "x-session-affinity": input.sessionID,
+ ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
+ "User-Agent": `opencode/${Installation.VERSION}`,
+ }),
+ ...input.model.headers,
+ ...headers,
+ },
+ maxRetries: input.retries ?? 0,
+ messages,
+ model: wrapLanguageModel({
+ model: language,
+ middleware: [
+ {
+ specificationVersion: "v3" as const,
+ async transformParams(args) {
+ if (args.type === "stream") {
+ // @ts-expect-error
+ args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
+ }
+ return args.params
+ },
+ },
+ ],
+ }),
+ experimental_telemetry: {
+ isEnabled: cfg.experimental?.openTelemetry,
+ metadata: {
+ userId: cfg.username ?? "unknown",
+ sessionId: input.sessionID,
+ },
+ },
})
- const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
- return {
- result: output,
- metadata: typeof result === "object" ? result?.metadata : undefined,
- title: typeof result === "object" ? result?.title : undefined,
- }
- } catch (e: any) {
- return { result: "", error: e.message ?? String(e) }
- }
- }
+ })
- const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
- workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
- const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
- return !match || match.action !== "ask"
- })
+ const stream: Interface["stream"] = (input) =>
+ Stream.scoped(
+ Stream.unwrap(
+ Effect.gen(function* () {
+ const ctrl = yield* Effect.acquireRelease(
+ Effect.sync(() => new AbortController()),
+ (ctrl) => Effect.sync(() => ctrl.abort()),
+ )
- const approvedToolsForSession = new Set<string>()
- workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
- const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
- // Auto-approve tools that were already approved in this session
- // (prevents infinite approval loops for server-side MCP tools)
- if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
- return { approved: true }
- }
+ const result = yield* run({ ...input, abort: ctrl.signal })
- const id = PermissionID.ascending()
- let reply: Permission.Reply | undefined
- let unsub: (() => void) | undefined
- try {
- unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
- if (evt.properties.requestID === id) reply = evt.properties.reply
- })
- const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
- try {
- const parsed = JSON.parse(t.args) as Record<string, unknown>
- const title = (parsed?.title ?? parsed?.name ?? "") as string
- return title ? `${t.name}: ${title}` : t.name
- } catch {
- return t.name
- }
- })
- const uniquePatterns = [...new Set(toolPatterns)] as string[]
- await AppRuntime.runPromise(
- Permission.Service.use((svc) =>
- svc.ask({
- id,
- sessionID: SessionID.make(input.sessionID),
- permission: "workflow_tool_approval",
- patterns: uniquePatterns,
- metadata: { tools: approvalTools },
- always: uniquePatterns,
- ruleset: [],
+ return Stream.fromAsyncIterable(result.fullStream, (e) =>
+ e instanceof Error ? e : new Error(String(e)),
+ )
}),
),
)
- for (const name of uniqueNames) approvedToolsForSession.add(name)
- workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
- return { approved: true }
- } catch {
- return { approved: false }
- } finally {
- unsub?.()
- }
- })
- }
- return streamText({
- onError(error) {
- l.error("stream error", {
- error,
- })
- },
- async experimental_repairToolCall(failed) {
- const lower = failed.toolCall.toolName.toLowerCase()
- if (lower !== failed.toolCall.toolName && tools[lower]) {
- l.info("repairing tool call", {
- tool: failed.toolCall.toolName,
- repaired: lower,
- })
- return {
- ...failed.toolCall,
- toolName: lower,
- }
- }
- return {
- ...failed.toolCall,
- input: JSON.stringify({
- tool: failed.toolCall.toolName,
- error: failed.error.message,
- }),
- toolName: "invalid",
- }
- },
- temperature: params.temperature,
- topP: params.topP,
- topK: params.topK,
- providerOptions: ProviderTransform.providerOptions(input.model, params.options),
- activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
- tools,
- toolChoice: input.toolChoice,
- maxOutputTokens: params.maxOutputTokens,
- abortSignal: input.abort,
- headers: {
- ...(input.model.providerID.startsWith("opencode")
- ? {
- "x-opencode-project": Instance.project.id,
- "x-opencode-session": input.sessionID,
- "x-opencode-request": input.user.id,
- "x-opencode-client": Flag.OPENCODE_CLIENT,
- }
- : {
- "x-session-affinity": input.sessionID,
- ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
- "User-Agent": `opencode/${Installation.VERSION}`,
- }),
- ...input.model.headers,
- ...headers,
- },
- maxRetries: input.retries ?? 0,
- messages,
- model: wrapLanguageModel({
- model: language,
- middleware: [
- {
- specificationVersion: "v3" as const,
- async transformParams(args) {
- if (args.type === "stream") {
- // @ts-expect-error
- args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
- }
- return args.params
- },
- },
- ],
+ return Service.of({ stream })
}),
- experimental_telemetry: {
- isEnabled: cfg.experimental?.openTelemetry,
- metadata: {
- userId: cfg.username ?? "unknown",
- sessionId: input.sessionID,
- },
- },
- })
- }
+ )
+
+ export const defaultLayer = Layer.suspend(() =>
+ layer.pipe(
+ Layer.provide(Auth.defaultLayer),
+ Layer.provide(Config.defaultLayer),
+ Layer.provide(Provider.defaultLayer),
+ Layer.provide(Plugin.defaultLayer),
+ ),
+ )
function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
const disabled = Permission.disabled(
diff --git a/packages/opencode/test/session/llm.test.ts b/packages/opencode/test/session/llm.test.ts
index 3974ca981..cbf767b4b 100644
--- a/packages/opencode/test/session/llm.test.ts
+++ b/packages/opencode/test/session/llm.test.ts
@@ -26,6 +26,12 @@ async function getModel(providerID: ProviderID, modelID: ModelID) {
)
}
+const llm = makeRuntime(LLM.Service, LLM.defaultLayer)
+
+async function drain(input: LLM.StreamInput) {
+ return llm.runPromise((svc) => svc.stream(input).pipe(Stream.runDrain))
+}
+
describe("session.llm.hasToolCalls", () => {
test("returns false for empty messages array", () => {
expect(LLM.hasToolCalls([])).toBe(false)
@@ -355,20 +361,16 @@ describe("session.llm.stream", () => {
model: { providerID: ProviderID.make(providerID), modelID: resolved.id, variant: "high" },
} satisfies MessageV2.User
- const stream = await LLM.stream({
+ await drain({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
- abort: new AbortController().signal,
messages: [{ role: "user", content: "Hello" }],
tools: {},
})
- for await (const _ of stream.fullStream) {
- }
-
const capture = await request
const body = capture.body
const headers = capture.headers
@@ -393,80 +395,6 @@ describe("session.llm.stream", () => {
})
})
- test("raw stream abort signal cancels provider response body promptly", async () => {
- const server = state.server
- if (!server) throw new Error("Server not initialized")
-
- const providerID = "alibaba"
- const modelID = "qwen-plus"
- const fixture = await loadFixture(providerID, modelID)
- const model = fixture.model
- const pending = waitStreamingRequest("/chat/completions")
-
- await using tmp = await tmpdir({
- init: async (dir) => {
- await Bun.write(
- path.join(dir, "opencode.json"),
- JSON.stringify({
- $schema: "https://opencode.ai/config.json",
- enabled_providers: [providerID],
- provider: {
- [providerID]: {
- options: {
- apiKey: "test-key",
- baseURL: `${server.url.origin}/v1`,
- },
- },
- },
- }),
- )
- },
- })
-
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const resolved = await getModel(ProviderID.make(providerID), ModelID.make(model.id))
- const sessionID = SessionID.make("session-test-raw-abort")
- const agent = {
- name: "test",
- mode: "primary",
- options: {},
- permission: [{ permission: "*", pattern: "*", action: "allow" }],
- } satisfies Agent.Info
- const user = {
- id: MessageID.make("user-raw-abort"),
- sessionID,
- role: "user",
- time: { created: Date.now() },
- agent: agent.name,
- model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
- } satisfies MessageV2.User
-
- const ctrl = new AbortController()
- const result = await LLM.stream({
- user,
- sessionID,
- model: resolved,
- agent,
- system: ["You are a helpful assistant."],
- abort: ctrl.signal,
- messages: [{ role: "user", content: "Hello" }],
- tools: {},
- })
-
- const iter = result.fullStream[Symbol.asyncIterator]()
- await pending.request
- await iter.next()
- ctrl.abort()
-
- await Promise.race([pending.responseCanceled, timeout(500)])
- await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
- await iter.return?.()
- },
- })
- })
-
test("service stream cancellation cancels provider response body promptly", async () => {
const server = state.server
if (!server) throw new Error("Server not initialized")
@@ -518,8 +446,7 @@ describe("session.llm.stream", () => {
} satisfies MessageV2.User
const ctrl = new AbortController()
- const { runPromiseExit } = makeRuntime(LLM.Service, LLM.defaultLayer)
- const run = runPromiseExit(
+ const run = llm.runPromiseExit(
(svc) =>
svc
.stream({
@@ -610,14 +537,13 @@ describe("session.llm.stream", () => {
tools: { question: true },
} satisfies MessageV2.User
- const stream = await LLM.stream({
+ await drain({
user,
sessionID,
model: resolved,
agent,
permission: [{ permission: "question", pattern: "*", action: "allow" }],
system: ["You are a helpful assistant."],
- abort: new AbortController().signal,
messages: [{ role: "user", content: "Hello" }],
tools: {
question: tool({
@@ -628,9 +554,6 @@ describe("session.llm.stream", () => {
},
})
- for await (const _ of stream.fullStream) {
- }
-
const capture = await request
const tools = capture.body.tools as Array<{ function?: { name?: string } }> | undefined
expect(tools?.some((item) => item.function?.name === "question")).toBe(true)
@@ -728,20 +651,16 @@ describe("session.llm.stream", () => {
model: { providerID: ProviderID.make("openai"), modelID: resolved.id, variant: "high" },
} satisfies MessageV2.User
- const stream = await LLM.stream({
+ await drain({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
- abort: new AbortController().signal,
messages: [{ role: "user", content: "Hello" }],
tools: {},
})
- for await (const _ of stream.fullStream) {
- }
-
const capture = await request
const body = capture.body
@@ -847,13 +766,12 @@ describe("session.llm.stream", () => {
model: { providerID: ProviderID.make("openai"), modelID: resolved.id },
} satisfies MessageV2.User
- const stream = await LLM.stream({
+ await drain({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
- abort: new AbortController().signal,
messages: [
{
role: "user",
@@ -871,9 +789,6 @@ describe("session.llm.stream", () => {
tools: {},
})
- for await (const _ of stream.fullStream) {
- }
-
const capture = await request
expect(capture.url.pathname.endsWith("/responses")).toBe(true)
},
@@ -972,20 +887,16 @@ describe("session.llm.stream", () => {
model: { providerID: ProviderID.make("minimax"), modelID: ModelID.make("MiniMax-M2.5") },
} satisfies MessageV2.User
- const stream = await LLM.stream({
+ await drain({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
- abort: new AbortController().signal,
messages: [{ role: "user", content: "Hello" }],
tools: {},
})
- for await (const _ of stream.fullStream) {
- }
-
const capture = await request
const body = capture.body
@@ -1073,20 +984,16 @@ describe("session.llm.stream", () => {
model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
} satisfies MessageV2.User
- const stream = await LLM.stream({
+ await drain({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
- abort: new AbortController().signal,
messages: [{ role: "user", content: "Hello" }],
tools: {},
})
- for await (const _ of stream.fullStream) {
- }
-
const capture = await request
const body = capture.body
const config = body.generationConfig as