summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--bun.lock1
-rw-r--r--packages/opencode/src/bus/index.ts5
-rw-r--r--packages/opencode/src/command/index.ts6
-rw-r--r--packages/opencode/src/control-plane/workspace-context.ts4
-rw-r--r--packages/opencode/src/effect/bridge.ts49
-rw-r--r--packages/opencode/src/effect/run-service.ts23
-rw-r--r--packages/opencode/src/mcp/index.ts16
-rw-r--r--packages/opencode/src/plugin/index.ts23
-rw-r--r--packages/opencode/src/provider/provider.ts6
-rw-r--r--packages/opencode/src/pty/index.ts7
-rw-r--r--packages/opencode/src/session/llm.ts679
-rw-r--r--packages/opencode/src/session/prompt.ts7
-rw-r--r--packages/opencode/test/effect/app-runtime-logger.test.ts31
-rw-r--r--packages/server/package.json3
14 files changed, 480 insertions, 380 deletions
diff --git a/bun.lock b/bun.lock
index 01966b826..fe5d42d7c 100644
--- a/bun.lock
+++ b/bun.lock
@@ -510,6 +510,7 @@
"effect": "catalog:",
},
"devDependencies": {
+ "@typescript/native-preview": "catalog:",
"typescript": "catalog:",
},
},
diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts
index 0638777bd..3a1eea5c7 100644
--- a/packages/opencode/src/bus/index.ts
+++ b/packages/opencode/src/bus/index.ts
@@ -1,6 +1,6 @@
import z from "zod"
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
-import { EffectLogger } from "@/effect/logger"
+import { EffectBridge } from "@/effect/bridge"
import { Log } from "../util/log"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
@@ -128,6 +128,7 @@ export namespace Bus {
function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
return Effect.gen(function* () {
log.info("subscribing", { type })
+ const bridge = yield* EffectBridge.make()
const scope = yield* Scope.make()
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
@@ -147,7 +148,7 @@ export namespace Bus {
return () => {
log.info("unsubscribing", { type })
- Effect.runFork(Scope.close(scope, Exit.void).pipe(Effect.provide(EffectLogger.layer)))
+ bridge.fork(Scope.close(scope, Exit.void))
}
})
}
diff --git a/packages/opencode/src/command/index.ts b/packages/opencode/src/command/index.ts
index 42f53301b..91a9e1b40 100644
--- a/packages/opencode/src/command/index.ts
+++ b/packages/opencode/src/command/index.ts
@@ -1,9 +1,9 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
+import { EffectBridge } from "@/effect/bridge"
import type { InstanceContext } from "@/project/instance"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, Context } from "effect"
-import { EffectLogger } from "@/effect/logger"
import z from "zod"
import { Config } from "../config/config"
import { MCP } from "../mcp"
@@ -82,6 +82,7 @@ export namespace Command {
const init = Effect.fn("Command.state")(function* (ctx: InstanceContext) {
const cfg = yield* config.get()
+ const bridge = yield* EffectBridge.make()
const commands: Record<string, Info> = {}
commands[Default.INIT] = {
@@ -125,7 +126,7 @@ export namespace Command {
source: "mcp",
description: prompt.description,
get template() {
- return Effect.runPromise(
+ return bridge.promise(
mcp
.getPrompt(
prompt.client,
@@ -141,7 +142,6 @@ export namespace Command {
.map((message) => (message.content.type === "text" ? message.content.text : ""))
.join("\n") || "",
),
- Effect.provide(EffectLogger.layer),
),
)
},
diff --git a/packages/opencode/src/control-plane/workspace-context.ts b/packages/opencode/src/control-plane/workspace-context.ts
index 173ec6178..541657b88 100644
--- a/packages/opencode/src/control-plane/workspace-context.ts
+++ b/packages/opencode/src/control-plane/workspace-context.ts
@@ -12,6 +12,10 @@ export const WorkspaceContext = {
return context.provide({ workspaceID: input.workspaceID as string }, () => input.fn())
},
+ restore<R>(workspaceID: string, fn: () => R): R {
+ return context.provide({ workspaceID }, fn)
+ },
+
get workspaceID() {
try {
return context.use().workspaceID
diff --git a/packages/opencode/src/effect/bridge.ts b/packages/opencode/src/effect/bridge.ts
new file mode 100644
index 000000000..bafa5a0ea
--- /dev/null
+++ b/packages/opencode/src/effect/bridge.ts
@@ -0,0 +1,49 @@
+import { Effect, Fiber } from "effect"
+import { WorkspaceContext } from "@/control-plane/workspace-context"
+import { Instance, type InstanceContext } from "@/project/instance"
+import { LocalContext } from "@/util/local-context"
+import { InstanceRef, WorkspaceRef } from "./instance-ref"
+import { attachWith } from "./run-service"
+
+export namespace EffectBridge {
+ export interface Shape {
+ readonly promise: <A, E, R>(effect: Effect.Effect<A, E, R>) => Promise<A>
+ readonly fork: <A, E, R>(effect: Effect.Effect<A, E, R>) => Fiber.Fiber<A, E>
+ }
+
+ function restore<R>(instance: InstanceContext | undefined, workspace: string | undefined, fn: () => R): R {
+ if (instance && workspace !== undefined) {
+ return WorkspaceContext.restore(workspace, () => Instance.restore(instance, fn))
+ }
+ if (instance) return Instance.restore(instance, fn)
+ if (workspace !== undefined) return WorkspaceContext.restore(workspace, fn)
+ return fn()
+ }
+
+ export function make(): Effect.Effect<Shape> {
+ return Effect.gen(function* () {
+ const ctx = yield* Effect.context()
+ const value = yield* InstanceRef
+ const instance =
+ value ??
+ (() => {
+ try {
+ return Instance.current
+ } catch (err) {
+ if (!(err instanceof LocalContext.NotFound)) throw err
+ }
+ })()
+ const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID
+ const attach = <A, E, R>(effect: Effect.Effect<A, E, R>) => attachWith(effect, { instance, workspace })
+ const wrap = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
+ attach(effect).pipe(Effect.provide(ctx)) as Effect.Effect<A, E, never>
+
+ return {
+ promise: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
+ restore(instance, workspace, () => Effect.runPromise(wrap(effect))),
+ fork: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
+ restore(instance, workspace, () => Effect.runFork(wrap(effect))),
+ } satisfies Shape
+ })
+ }
+}
diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts
index bb4307b57..13104c88b 100644
--- a/packages/opencode/src/effect/run-service.ts
+++ b/packages/opencode/src/effect/run-service.ts
@@ -5,14 +5,31 @@ import { LocalContext } from "@/util/local-context"
import { InstanceRef, WorkspaceRef } from "./instance-ref"
import { Observability } from "./observability"
import { WorkspaceContext } from "@/control-plane/workspace-context"
+import type { InstanceContext } from "@/project/instance"
export const memoMap = Layer.makeMemoMapUnsafe()
+type Refs = {
+ instance?: InstanceContext
+ workspace?: string
+}
+
+export function attachWith<A, E, R>(effect: Effect.Effect<A, E, R>, refs: Refs): Effect.Effect<A, E, R> {
+ if (!refs.instance && !refs.workspace) return effect
+ if (!refs.instance) return effect.pipe(Effect.provideService(WorkspaceRef, refs.workspace))
+ if (!refs.workspace) return effect.pipe(Effect.provideService(InstanceRef, refs.instance))
+ return effect.pipe(
+ Effect.provideService(InstanceRef, refs.instance),
+ Effect.provideService(WorkspaceRef, refs.workspace),
+ )
+}
+
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
try {
- const ctx = Instance.current
- const workspaceID = WorkspaceContext.workspaceID
- return effect.pipe(Effect.provideService(InstanceRef, ctx), Effect.provideService(WorkspaceRef, workspaceID))
+ return attachWith(effect, {
+ instance: Instance.current,
+ workspace: WorkspaceContext.workspaceID,
+ })
} catch (err) {
if (!(err instanceof LocalContext.NotFound)) throw err
}
diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts
index 3b6690934..a68c6c1d8 100644
--- a/packages/opencode/src/mcp/index.ts
+++ b/packages/opencode/src/mcp/index.ts
@@ -25,7 +25,7 @@ import { Bus } from "@/bus"
import { TuiEvent } from "@/cli/cmd/tui/event"
import open from "open"
import { Effect, Exit, Layer, Option, Context, Stream } from "effect"
-import { EffectLogger } from "@/effect/logger"
+import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
@@ -471,25 +471,24 @@ export namespace MCP {
Effect.catch(() => Effect.succeed([] as number[])),
)
- function watch(s: State, name: string, client: MCPClient, timeout?: number) {
+ function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
log.info("tools list changed notification received", { server: name })
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
- const listed = await Effect.runPromise(defs(name, client, timeout).pipe(Effect.provide(EffectLogger.layer)))
+ const listed = await bridge.promise(defs(name, client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
s.defs[name] = listed
- await Effect.runPromise(
- bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore, Effect.provide(EffectLogger.layer)),
- )
+ await bridge.promise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
})
}
const state = yield* InstanceState.make<State>(
Effect.fn("MCP.state")(function* () {
const cfg = yield* cfgSvc.get()
+ const bridge = yield* EffectBridge.make()
const config = cfg.mcp ?? {}
const s: State = {
status: {},
@@ -518,7 +517,7 @@ export namespace MCP {
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs!
- watch(s, key, result.mcpClient, mcp.timeout)
+ watch(s, key, result.mcpClient, bridge, mcp.timeout)
}
}),
{ concurrency: "unbounded" },
@@ -565,11 +564,12 @@ export namespace MCP {
listed: MCPToolDef[],
timeout?: number,
) {
+ const bridge = yield* EffectBridge.make()
yield* closeClient(s, name)
s.status[name] = { status: "connected" }
s.clients[name] = client
s.defs[name] = listed
- watch(s, name, client, timeout)
+ watch(s, name, client, bridge, timeout)
return s.status[name]
})
diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts
index c716ffdf8..9f618eff8 100644
--- a/packages/opencode/src/plugin/index.ts
+++ b/packages/opencode/src/plugin/index.ts
@@ -18,7 +18,7 @@ import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
import { PoeAuthPlugin } from "opencode-poe-auth"
import { CloudflareAIGatewayAuthPlugin, CloudflareWorkersAuthPlugin } from "./cloudflare"
import { Effect, Layer, Context, Stream } from "effect"
-import { EffectLogger } from "@/effect/logger"
+import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { errorMessage } from "@/util/error"
import { PluginLoader } from "./loader"
@@ -90,14 +90,6 @@ export namespace Plugin {
return result
}
- function publishPluginError(bus: Bus.Interface, message: string) {
- Effect.runFork(
- bus
- .publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() })
- .pipe(Effect.provide(EffectLogger.layer)),
- )
- }
-
async function applyPlugin(load: PluginLoader.Loaded, input: PluginInput, hooks: Hooks[]) {
const plugin = readV1Plugin(load.mod, load.spec, "server", "detect")
if (plugin) {
@@ -120,6 +112,11 @@ export namespace Plugin {
const state = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
+ const bridge = yield* EffectBridge.make()
+
+ function publishPluginError(message: string) {
+ bridge.fork(bus.publish(Session.Event.Error, { error: new NamedError.Unknown({ message }).toObject() }))
+ }
const { Server } = yield* Effect.promise(() => import("../server/server"))
@@ -187,24 +184,24 @@ export namespace Plugin {
if (stage === "install") {
const parsed = parsePluginSpecifier(spec)
log.error("failed to install plugin", { pkg: parsed.pkg, version: parsed.version, error: message })
- publishPluginError(bus, `Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
+ publishPluginError(`Failed to install plugin ${parsed.pkg}@${parsed.version}: ${message}`)
return
}
if (stage === "compatibility") {
log.warn("plugin incompatible", { path: spec, error: message })
- publishPluginError(bus, `Plugin ${spec} skipped: ${message}`)
+ publishPluginError(`Plugin ${spec} skipped: ${message}`)
return
}
if (stage === "entry") {
log.error("failed to resolve plugin server entry", { path: spec, error: message })
- publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
+ publishPluginError(`Failed to load plugin ${spec}: ${message}`)
return
}
log.error("failed to load plugin", { path: spec, target: resolved?.entry, error: message })
- publishPluginError(bus, `Failed to load plugin ${spec}: ${message}`)
+ publishPluginError(`Failed to load plugin ${spec}: ${message}`)
},
},
}),
diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts
index d34721f1d..8833cfd05 100644
--- a/packages/opencode/src/provider/provider.ts
+++ b/packages/opencode/src/provider/provider.ts
@@ -19,7 +19,7 @@ import { iife } from "@/util/iife"
import { Global } from "../global"
import path from "path"
import { Effect, Layer, Context } from "effect"
-import { EffectLogger } from "@/effect/logger"
+import { EffectBridge } from "@/effect/bridge"
import { InstanceState } from "@/effect/instance-state"
import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { isRecord } from "@/util/record"
@@ -1043,6 +1043,7 @@ export namespace Provider {
const state = yield* InstanceState.make<State>(() =>
Effect.gen(function* () {
using _ = log.time("state")
+ const bridge = yield* EffectBridge.make()
const cfg = yield* config.get()
const modelsDev = yield* Effect.promise(() => ModelsDev.get())
const database = mapValues(modelsDev, fromModelsDevProvider)
@@ -1223,8 +1224,7 @@ export namespace Provider {
const options = yield* Effect.promise(() =>
plugin.auth!.loader!(
- () =>
- Effect.runPromise(auth.get(providerID).pipe(Effect.orDie, Effect.provide(EffectLogger.layer))) as any,
+ () => bridge.promise(auth.get(providerID).pipe(Effect.orDie)) as any,
database[plugin.auth!.provider],
),
)
diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts
index 9c79eb2d4..1c969b4b9 100644
--- a/packages/opencode/src/pty/index.ts
+++ b/packages/opencode/src/pty/index.ts
@@ -10,7 +10,7 @@ import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
import { Effect, Layer, Context } from "effect"
-import { EffectLogger } from "@/effect/logger"
+import { EffectBridge } from "@/effect/bridge"
export namespace Pty {
const log = Log.create({ service: "pty" })
@@ -173,6 +173,7 @@ export namespace Pty {
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
const s = yield* InstanceState.get(state)
+ const bridge = yield* EffectBridge.make()
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
@@ -256,8 +257,8 @@ export namespace Pty {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
- Effect.runFork(bus.publish(Event.Exited, { id, exitCode }).pipe(Effect.provide(EffectLogger.layer)))
- Effect.runFork(remove(id).pipe(Effect.provide(EffectLogger.layer)))
+ bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
+ bridge.fork(remove(id))
}),
)
yield* bus.publish(Event.Created, { info })
diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts
index 5a4c04119..05d788275 100644
--- a/packages/opencode/src/session/llm.ts
+++ b/packages/opencode/src/session/llm.ts
@@ -20,13 +20,12 @@ import { Wildcard } from "@/util/wildcard"
import { SessionID } from "@/session/schema"
import { Auth } from "@/auth"
import { Installation } from "@/installation"
-import { makeRuntime } from "@/effect/run-service"
+import { EffectBridge } from "@/effect/bridge"
import * as Option from "effect/Option"
import * as OtelTracer from "@effect/opentelemetry/Tracer"
export namespace LLM {
const log = Log.create({ service: "llm" })
- const perms = makeRuntime(Permission.Service, Permission.defaultLayer)
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
type Result = Awaited<ReturnType<typeof streamText>>
@@ -57,369 +56,371 @@ export namespace LLM {
export class Service extends Context.Service<Service, Interface>()("@opencode/LLM") {}
- export const layer: Layer.Layer<Service, never, Auth.Service | Config.Service | Provider.Service | Plugin.Service> =
- Layer.effect(
- Service,
- Effect.gen(function* () {
- const auth = yield* Auth.Service
- const config = yield* Config.Service
- const provider = yield* Provider.Service
- const plugin = yield* Plugin.Service
-
- const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
- const l = log
- .clone()
- .tag("providerID", input.model.providerID)
- .tag("modelID", input.model.id)
- .tag("sessionID", input.sessionID)
- .tag("small", (input.small ?? false).toString())
- .tag("agent", input.agent.name)
- .tag("mode", input.agent.mode)
- l.info("stream", {
- modelID: input.model.id,
- providerID: input.model.providerID,
- })
-
- const [language, cfg, item, info] = yield* Effect.all(
- [
- provider.getLanguage(input.model),
- config.get(),
- provider.getProvider(input.model.providerID),
- auth.get(input.model.providerID),
- ],
- { concurrency: "unbounded" },
- )
-
- // TODO: move this to a proper hook
- const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
-
- const system: string[] = []
- system.push(
- [
- // use agent prompt otherwise provider prompt
- ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
- // any custom prompt passed into this call
- ...input.system,
- // any custom prompt from last user message
- ...(input.user.system ? [input.user.system] : []),
- ]
- .filter((x) => x)
- .join("\n"),
- )
-
- const header = system[0]
- yield* plugin.trigger(
- "experimental.chat.system.transform",
- { sessionID: input.sessionID, model: input.model },
- { system },
- )
- // rejoin to maintain 2-part structure for caching if header unchanged
- if (system.length > 2 && system[0] === header) {
- const rest = system.slice(1)
- system.length = 0
- system.push(header, rest.join("\n"))
- }
-
- const variant =
- !input.small && input.model.variants && input.user.model.variant
- ? input.model.variants[input.user.model.variant]
- : {}
- const base = input.small
- ? ProviderTransform.smallOptions(input.model)
- : ProviderTransform.options({
- model: input.model,
- sessionID: input.sessionID,
- providerOptions: item.options,
- })
- const options: Record<string, any> = pipe(
- base,
- mergeDeep(input.model.options),
- mergeDeep(input.agent.options),
- mergeDeep(variant),
- )
- if (isOpenaiOauth) {
- options.instructions = system.join("\n")
- }
+ const live: Layer.Layer<
+ Service,
+ never,
+ Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service
+ > = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const auth = yield* Auth.Service
+ const config = yield* Config.Service
+ const provider = yield* Provider.Service
+ const plugin = yield* Plugin.Service
+ const perm = yield* Permission.Service
+
+ const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
+ const l = log
+ .clone()
+ .tag("providerID", input.model.providerID)
+ .tag("modelID", input.model.id)
+ .tag("sessionID", input.sessionID)
+ .tag("small", (input.small ?? false).toString())
+ .tag("agent", input.agent.name)
+ .tag("mode", input.agent.mode)
+ l.info("stream", {
+ modelID: input.model.id,
+ providerID: input.model.providerID,
+ })
- const isWorkflow = language instanceof GitLabWorkflowLanguageModel
- const messages = isOpenaiOauth
- ? input.messages
- : isWorkflow
- ? input.messages
- : [
- ...system.map(
- (x): ModelMessage => ({
- role: "system",
- content: x,
- }),
- ),
- ...input.messages,
- ]
-
- const params = yield* plugin.trigger(
- "chat.params",
- {
- sessionID: input.sessionID,
- agent: input.agent.name,
+ const [language, cfg, item, info] = yield* Effect.all(
+ [
+ provider.getLanguage(input.model),
+ config.get(),
+ provider.getProvider(input.model.providerID),
+ auth.get(input.model.providerID),
+ ],
+ { concurrency: "unbounded" },
+ )
+
+ // TODO: move this to a proper hook
+ const isOpenaiOauth = item.id === "openai" && info?.type === "oauth"
+
+ const system: string[] = []
+ system.push(
+ [
+ // use agent prompt otherwise provider prompt
+ ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)),
+ // any custom prompt passed into this call
+ ...input.system,
+ // any custom prompt from last user message
+ ...(input.user.system ? [input.user.system] : []),
+ ]
+ .filter((x) => x)
+ .join("\n"),
+ )
+
+ const header = system[0]
+ yield* plugin.trigger(
+ "experimental.chat.system.transform",
+ { sessionID: input.sessionID, model: input.model },
+ { system },
+ )
+ // rejoin to maintain 2-part structure for caching if header unchanged
+ if (system.length > 2 && system[0] === header) {
+ const rest = system.slice(1)
+ system.length = 0
+ system.push(header, rest.join("\n"))
+ }
+
+ const variant =
+ !input.small && input.model.variants && input.user.model.variant
+ ? input.model.variants[input.user.model.variant]
+ : {}
+ const base = input.small
+ ? ProviderTransform.smallOptions(input.model)
+ : ProviderTransform.options({
model: input.model,
- provider: item,
- message: input.user,
- },
- {
- temperature: input.model.capabilities.temperature
- ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
- : undefined,
- topP: input.agent.topP ?? ProviderTransform.topP(input.model),
- topK: ProviderTransform.topK(input.model),
- maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
- options,
- },
- )
-
- const { headers } = yield* plugin.trigger(
- "chat.headers",
- {
sessionID: input.sessionID,
- agent: input.agent.name,
- model: input.model,
- provider: item,
- message: input.user,
- },
- {
- headers: {},
- },
- )
-
- const tools = resolveTools(input)
-
- // LiteLLM and some Anthropic proxies require the tools parameter to be present
- // when message history contains tool calls, even if no tools are being used.
- // Add a dummy tool that is never called to satisfy this validation.
- // This is enabled for:
- // 1. Providers with "litellm" in their ID or API ID (auto-detected)
- // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
- const isLiteLLMProxy =
- item.options?.["litellmProxy"] === true ||
- input.model.providerID.toLowerCase().includes("litellm") ||
- input.model.api.id.toLowerCase().includes("litellm")
-
- // LiteLLM/Bedrock rejects requests where the message history contains tool
- // calls but no tools param is present. When there are no active tools (e.g.
- // during compaction), inject a stub tool to satisfy the validation requirement.
- // The stub description explicitly tells the model not to call it.
- if (
- (isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
- Object.keys(tools).length === 0 &&
- hasToolCalls(input.messages)
- ) {
- tools["_noop"] = tool({
- description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
- inputSchema: jsonSchema({
- type: "object",
- properties: {
- reason: { type: "string", description: "Unused" },
- },
- }),
- execute: async () => ({ output: "", title: "", metadata: {} }),
+ providerOptions: item.options,
})
+ const options: Record<string, any> = pipe(
+ base,
+ mergeDeep(input.model.options),
+ mergeDeep(input.agent.options),
+ mergeDeep(variant),
+ )
+ if (isOpenaiOauth) {
+ options.instructions = system.join("\n")
+ }
+
+ const isWorkflow = language instanceof GitLabWorkflowLanguageModel
+ const messages = isOpenaiOauth
+ ? input.messages
+ : isWorkflow
+ ? input.messages
+ : [
+ ...system.map(
+ (x): ModelMessage => ({
+ role: "system",
+ content: x,
+ }),
+ ),
+ ...input.messages,
+ ]
+
+ const params = yield* plugin.trigger(
+ "chat.params",
+ {
+ sessionID: input.sessionID,
+ agent: input.agent.name,
+ model: input.model,
+ provider: item,
+ message: input.user,
+ },
+ {
+ temperature: input.model.capabilities.temperature
+ ? (input.agent.temperature ?? ProviderTransform.temperature(input.model))
+ : undefined,
+ topP: input.agent.topP ?? ProviderTransform.topP(input.model),
+ topK: ProviderTransform.topK(input.model),
+ maxOutputTokens: ProviderTransform.maxOutputTokens(input.model),
+ options,
+ },
+ )
+
+ const { headers } = yield* plugin.trigger(
+ "chat.headers",
+ {
+ sessionID: input.sessionID,
+ agent: input.agent.name,
+ model: input.model,
+ provider: item,
+ message: input.user,
+ },
+ {
+ headers: {},
+ },
+ )
+
+ const tools = resolveTools(input)
+
+ // LiteLLM and some Anthropic proxies require the tools parameter to be present
+ // when message history contains tool calls, even if no tools are being used.
+ // Add a dummy tool that is never called to satisfy this validation.
+ // This is enabled for:
+ // 1. Providers with "litellm" in their ID or API ID (auto-detected)
+ // 2. Providers with explicit "litellmProxy: true" option (opt-in for custom gateways)
+ const isLiteLLMProxy =
+ item.options?.["litellmProxy"] === true ||
+ input.model.providerID.toLowerCase().includes("litellm") ||
+ input.model.api.id.toLowerCase().includes("litellm")
+
+ // LiteLLM/Bedrock rejects requests where the message history contains tool
+ // calls but no tools param is present. When there are no active tools (e.g.
+ // during compaction), inject a stub tool to satisfy the validation requirement.
+ // The stub description explicitly tells the model not to call it.
+ if (
+ (isLiteLLMProxy || input.model.providerID.includes("github-copilot")) &&
+ Object.keys(tools).length === 0 &&
+ hasToolCalls(input.messages)
+ ) {
+ tools["_noop"] = tool({
+ description: "Do not call this tool. It exists only for API compatibility and must never be invoked.",
+ inputSchema: jsonSchema({
+ type: "object",
+ properties: {
+ reason: { type: "string", description: "Unused" },
+ },
+ }),
+ execute: async () => ({ output: "", title: "", metadata: {} }),
+ })
+ }
+
+ // Wire up toolExecutor for DWS workflow models so that tool calls
+ // from the workflow service are executed via opencode's tool system
+ // and results sent back over the WebSocket.
+ if (language instanceof GitLabWorkflowLanguageModel) {
+ const workflowModel = language as GitLabWorkflowLanguageModel & {
+ sessionID?: string
+ sessionPreapprovedTools?: string[]
+ approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
}
-
- // Wire up toolExecutor for DWS workflow models so that tool calls
- // from the workflow service are executed via opencode's tool system
- // and results sent back over the WebSocket.
- if (language instanceof GitLabWorkflowLanguageModel) {
- const workflowModel = language as GitLabWorkflowLanguageModel & {
- sessionID?: string
- sessionPreapprovedTools?: string[]
- approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
+ workflowModel.sessionID = input.sessionID
+ workflowModel.systemPrompt = system.join("\n")
+ workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
+ const t = tools[toolName]
+ if (!t || !t.execute) {
+ return { result: "", error: `Unknown tool: ${toolName}` }
}
- workflowModel.sessionID = input.sessionID
- workflowModel.systemPrompt = system.join("\n")
- workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
- const t = tools[toolName]
- if (!t || !t.execute) {
- return { result: "", error: `Unknown tool: ${toolName}` }
- }
- try {
- const result = await t.execute!(JSON.parse(argsJson), {
- toolCallId: _requestID,
- messages: input.messages,
- abortSignal: input.abort,
- })
- const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
- return {
- result: output,
- metadata: typeof result === "object" ? result?.metadata : undefined,
- title: typeof result === "object" ? result?.title : undefined,
- }
- } catch (e: any) {
- return { result: "", error: e.message ?? String(e) }
+ try {
+ const result = await t.execute!(JSON.parse(argsJson), {
+ toolCallId: _requestID,
+ messages: input.messages,
+ abortSignal: input.abort,
+ })
+ const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result))
+ return {
+ result: output,
+ metadata: typeof result === "object" ? result?.metadata : undefined,
+ title: typeof result === "object" ? result?.title : undefined,
}
+ } catch (e: any) {
+ return { result: "", error: e.message ?? String(e) }
}
+ }
- const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
- workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
- const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
- return !match || match.action !== "ask"
- })
+ const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? [])
+ workflowModel.sessionPreapprovedTools = Object.keys(tools).filter((name) => {
+ const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission))
+ return !match || match.action !== "ask"
+ })
- const approvedToolsForSession = new Set<string>()
- workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
- const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
- // Auto-approve tools that were already approved in this session
- // (prevents infinite approval loops for server-side MCP tools)
- if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
- return { approved: true }
- }
+ const bridge = yield* EffectBridge.make()
+ const approvedToolsForSession = new Set<string>()
+ workflowModel.approvalHandler = Instance.bind(async (approvalTools) => {
+ const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[]
+ // Auto-approve tools that were already approved in this session
+ // (prevents infinite approval loops for server-side MCP tools)
+ if (uniqueNames.every((name) => approvedToolsForSession.has(name))) {
+ return { approved: true }
+ }
- const id = PermissionID.ascending()
- let reply: Permission.Reply | undefined
- let unsub: (() => void) | undefined
- try {
- unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
- if (evt.properties.requestID === id) reply = evt.properties.reply
- })
- const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
- try {
- const parsed = JSON.parse(t.args) as Record<string, unknown>
- const title = (parsed?.title ?? parsed?.name ?? "") as string
- return title ? `${t.name}: ${title}` : t.name
- } catch {
- return t.name
- }
- })
- const uniquePatterns = [...new Set(toolPatterns)] as string[]
- await perms.runPromise((svc) =>
- svc.ask({
- id,
- sessionID: SessionID.make(input.sessionID),
- permission: "workflow_tool_approval",
- patterns: uniquePatterns,
- metadata: { tools: approvalTools },
- always: uniquePatterns,
- ruleset: [],
- }),
- )
- for (const name of uniqueNames) approvedToolsForSession.add(name)
- workflowModel.sessionPreapprovedTools = [
- ...(workflowModel.sessionPreapprovedTools ?? []),
- ...uniqueNames,
- ]
- return { approved: true }
- } catch {
- return { approved: false }
- } finally {
- unsub?.()
- }
- })
- }
+ const id = PermissionID.ascending()
+ let reply: Permission.Reply | undefined
+ let unsub: (() => void) | undefined
+ try {
+ unsub = Bus.subscribe(Permission.Event.Replied, (evt) => {
+ if (evt.properties.requestID === id) reply = evt.properties.reply
+ })
+ const toolPatterns = approvalTools.map((t: { name: string; args: string }) => {
+ try {
+ const parsed = JSON.parse(t.args) as Record<string, unknown>
+ const title = (parsed?.title ?? parsed?.name ?? "") as string
+ return title ? `${t.name}: ${title}` : t.name
+ } catch {
+ return t.name
+ }
+ })
+ const uniquePatterns = [...new Set(toolPatterns)] as string[]
+ await bridge.promise(
+ perm.ask({
+ id,
+ sessionID: SessionID.make(input.sessionID),
+ permission: "workflow_tool_approval",
+ patterns: uniquePatterns,
+ metadata: { tools: approvalTools },
+ always: uniquePatterns,
+ ruleset: [],
+ }),
+ )
+ for (const name of uniqueNames) approvedToolsForSession.add(name)
+ workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
+ return { approved: true }
+ } catch {
+ return { approved: false }
+ } finally {
+ unsub?.()
+ }
+ })
+ }
- const tracer = cfg.experimental?.openTelemetry
- ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
- : undefined
+ const tracer = cfg.experimental?.openTelemetry
+ ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer))
+ : undefined
- return streamText({
- onError(error) {
- l.error("stream error", {
- error,
+ return streamText({
+ onError(error) {
+ l.error("stream error", {
+ error,
+ })
+ },
+ async experimental_repairToolCall(failed) {
+ const lower = failed.toolCall.toolName.toLowerCase()
+ if (lower !== failed.toolCall.toolName && tools[lower]) {
+ l.info("repairing tool call", {
+ tool: failed.toolCall.toolName,
+ repaired: lower,
})
- },
- async experimental_repairToolCall(failed) {
- const lower = failed.toolCall.toolName.toLowerCase()
- if (lower !== failed.toolCall.toolName && tools[lower]) {
- l.info("repairing tool call", {
- tool: failed.toolCall.toolName,
- repaired: lower,
- })
- return {
- ...failed.toolCall,
- toolName: lower,
- }
- }
return {
...failed.toolCall,
- input: JSON.stringify({
- tool: failed.toolCall.toolName,
- error: failed.error.message,
- }),
- toolName: "invalid",
+ toolName: lower,
}
- },
- temperature: params.temperature,
- topP: params.topP,
- topK: params.topK,
- providerOptions: ProviderTransform.providerOptions(input.model, params.options),
- activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
- tools,
- toolChoice: input.toolChoice,
- maxOutputTokens: params.maxOutputTokens,
- abortSignal: input.abort,
- headers: {
- ...(input.model.providerID.startsWith("opencode")
- ? {
- "x-opencode-project": Instance.project.id,
- "x-opencode-session": input.sessionID,
- "x-opencode-request": input.user.id,
- "x-opencode-client": Flag.OPENCODE_CLIENT,
+ }
+ return {
+ ...failed.toolCall,
+ input: JSON.stringify({
+ tool: failed.toolCall.toolName,
+ error: failed.error.message,
+ }),
+ toolName: "invalid",
+ }
+ },
+ temperature: params.temperature,
+ topP: params.topP,
+ topK: params.topK,
+ providerOptions: ProviderTransform.providerOptions(input.model, params.options),
+ activeTools: Object.keys(tools).filter((x) => x !== "invalid"),
+ tools,
+ toolChoice: input.toolChoice,
+ maxOutputTokens: params.maxOutputTokens,
+ abortSignal: input.abort,
+ headers: {
+ ...(input.model.providerID.startsWith("opencode")
+ ? {
+ "x-opencode-project": Instance.project.id,
+ "x-opencode-session": input.sessionID,
+ "x-opencode-request": input.user.id,
+ "x-opencode-client": Flag.OPENCODE_CLIENT,
+ }
+ : {
+ "x-session-affinity": input.sessionID,
+ ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
+ "User-Agent": `opencode/${Installation.VERSION}`,
+ }),
+ ...input.model.headers,
+ ...headers,
+ },
+ maxRetries: input.retries ?? 0,
+ messages,
+ model: wrapLanguageModel({
+ model: language,
+ middleware: [
+ {
+ specificationVersion: "v3" as const,
+ async transformParams(args) {
+ if (args.type === "stream") {
+ // @ts-expect-error
+ args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
}
- : {
- "x-session-affinity": input.sessionID,
- ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}),
- "User-Agent": `opencode/${Installation.VERSION}`,
- }),
- ...input.model.headers,
- ...headers,
- },
- maxRetries: input.retries ?? 0,
- messages,
- model: wrapLanguageModel({
- model: language,
- middleware: [
- {
- specificationVersion: "v3" as const,
- async transformParams(args) {
- if (args.type === "stream") {
- // @ts-expect-error
- args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options)
- }
- return args.params
- },
+ return args.params
},
- ],
- }),
- experimental_telemetry: {
- isEnabled: cfg.experimental?.openTelemetry,
- functionId: "session.llm",
- tracer,
- metadata: {
- userId: cfg.username ?? "unknown",
- sessionId: input.sessionID,
},
+ ],
+ }),
+ experimental_telemetry: {
+ isEnabled: cfg.experimental?.openTelemetry,
+ functionId: "session.llm",
+ tracer,
+ metadata: {
+ userId: cfg.username ?? "unknown",
+ sessionId: input.sessionID,
},
- })
+ },
})
+ })
- const stream: Interface["stream"] = (input) =>
- Stream.scoped(
- Stream.unwrap(
- Effect.gen(function* () {
- const ctrl = yield* Effect.acquireRelease(
- Effect.sync(() => new AbortController()),
- (ctrl) => Effect.sync(() => ctrl.abort()),
- )
+ const stream: Interface["stream"] = (input) =>
+ Stream.scoped(
+ Stream.unwrap(
+ Effect.gen(function* () {
+ const ctrl = yield* Effect.acquireRelease(
+ Effect.sync(() => new AbortController()),
+ (ctrl) => Effect.sync(() => ctrl.abort()),
+ )
- const result = yield* run({ ...input, abort: ctrl.signal })
+ const result = yield* run({ ...input, abort: ctrl.signal })
- return Stream.fromAsyncIterable(result.fullStream, (e) =>
- e instanceof Error ? e : new Error(String(e)),
- )
- }),
- ),
- )
+ return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
+ }),
+ ),
+ )
- return Service.of({ stream })
- }),
- )
+ return Service.of({ stream })
+ }),
+ )
+
+ export const layer = live.pipe(Layer.provide(Permission.defaultLayer))
export const defaultLayer = Layer.suspend(() =>
layer.pipe(
diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index f8c794505..ffd074d3f 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -48,6 +48,7 @@ import { EffectLogger } from "@/effect/logger"
import { InstanceState } from "@/effect/instance-state"
import { TaskTool, type TaskPromptOps } from "@/tool/task"
import { SessionRunState } from "./run-state"
+import { EffectBridge } from "@/effect/bridge"
// @ts-ignore
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -105,11 +106,7 @@ export namespace SessionPrompt {
const sys = yield* SystemPrompt.Service
const llm = yield* LLM.Service
const runner = Effect.fn("SessionPrompt.runner")(function* () {
- const ctx = yield* Effect.context()
- return {
- promise: <A, E>(effect: Effect.Effect<A, E>) => Effect.runPromiseWith(ctx)(effect),
- fork: <A, E>(effect: Effect.Effect<A, E>) => Effect.runForkWith(ctx)(effect),
- }
+ return yield* EffectBridge.make()
})
const ops = Effect.fn("SessionPrompt.ops")(function* () {
const run = yield* runner()
diff --git a/packages/opencode/test/effect/app-runtime-logger.test.ts b/packages/opencode/test/effect/app-runtime-logger.test.ts
index 8a7aab6cf..7388748f9 100644
--- a/packages/opencode/test/effect/app-runtime-logger.test.ts
+++ b/packages/opencode/test/effect/app-runtime-logger.test.ts
@@ -1,6 +1,7 @@
import { expect, test } from "bun:test"
import { Context, Effect, Layer, Logger } from "effect"
import { AppRuntime } from "../../src/effect/app-runtime"
+import { EffectBridge } from "../../src/effect/bridge"
import { InstanceRef } from "../../src/effect/instance-ref"
import { EffectLogger } from "../../src/effect/logger"
import { makeRuntime } from "../../src/effect/run-service"
@@ -59,3 +60,33 @@ test("AppRuntime attaches InstanceRef from ALS", async () => {
expect(dir).toBe(tmp.path)
})
+
+test("EffectBridge preserves logger and instance context across async boundaries", async () => {
+ await using tmp = await tmpdir({ git: true })
+
+ const result = await Instance.provide({
+ directory: tmp.path,
+ fn: () =>
+ AppRuntime.runPromise(
+ Effect.gen(function* () {
+ const bridge = yield* EffectBridge.make()
+ return yield* Effect.promise(() =>
+ Promise.resolve().then(() =>
+ bridge.promise(
+ Effect.gen(function* () {
+ return {
+ directory: (yield* InstanceRef)?.directory,
+ ...check(yield* Effect.service(Logger.CurrentLoggers)),
+ }
+ }),
+ ),
+ ),
+ )
+ }),
+ ),
+ })
+
+ expect(result.directory).toBe(tmp.path)
+ expect(result.effectLogger).toBe(true)
+ expect(result.defaultLogger).toBe(false)
+})
diff --git a/packages/server/package.json b/packages/server/package.json
index c397c40d9..9b8b31299 100644
--- a/packages/server/package.json
+++ b/packages/server/package.json
@@ -17,10 +17,11 @@
"dist"
],
"scripts": {
- "typecheck": "tsc --noEmit",
+ "typecheck": "tsgo --noEmit",
"build": "tsc"
},
"devDependencies": {
+ "@typescript/native-preview": "catalog:",
"typescript": "catalog:"
},
"dependencies": {