summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorNoam Bressler <[email protected]>2026-01-18 09:29:42 +0200
committerGitHub <[email protected]>2026-01-18 01:29:42 -0600
commitbef1f6628118359f98d6c41e195b16e1f68794fa (patch)
tree78a6612f30ee5e26f8747fc4ef94faf43b01744f
parentd13c0ea915dfceeda0003247f5a513df9787429d (diff)
downloadopencode-bef1f6628118359f98d6c41e195b16e1f68794fa.tar.gz
opencode-bef1f6628118359f98d6c41e195b16e1f68794fa.zip
fix(acp): use single global event subscription and route by sessionID (#5628)
Co-authored-by: noamzbr <[email protected]> Co-authored-by: noam-v <[email protected]>
-rw-r--r--packages/opencode/src/acp/agent.ts582
-rw-r--r--packages/opencode/src/acp/session.ts4
-rw-r--r--packages/opencode/test/acp/event-subscription.test.ts436
3 files changed, 754 insertions, 268 deletions
diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts
index f8792393c..a077bb9fb 100644
--- a/packages/opencode/src/acp/agent.ts
+++ b/packages/opencode/src/acp/agent.ts
@@ -20,7 +20,7 @@ import {
} from "@agentclientprotocol/sdk"
import { Log } from "../util/log"
import { ACPSessionManager } from "./session"
-import type { ACPConfig, ACPSessionState } from "./types"
+import type { ACPConfig } from "./types"
import { Provider } from "../provider/provider"
import { Agent as AgentModule } from "../agent/agent"
import { Installation } from "@/installation"
@@ -29,7 +29,7 @@ import { Config } from "@/config/config"
import { Todo } from "@/session/todo"
import { z } from "zod"
import { LoadAPIKeyError } from "ai"
-import type { OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2"
+import type { Event, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2"
import { applyPatch } from "diff"
export namespace ACP {
@@ -47,304 +47,354 @@ export namespace ACP {
private connection: AgentSideConnection
private config: ACPConfig
private sdk: OpencodeClient
- private sessionManager
+ private sessionManager: ACPSessionManager
+ private eventAbort = new AbortController()
+ private eventStarted = false
+ private permissionQueues = new Map<string, Promise<void>>()
+ private permissionOptions: PermissionOption[] = [
+ { optionId: "once", kind: "allow_once", name: "Allow once" },
+ { optionId: "always", kind: "allow_always", name: "Always allow" },
+ { optionId: "reject", kind: "reject_once", name: "Reject" },
+ ]
constructor(connection: AgentSideConnection, config: ACPConfig) {
this.connection = connection
this.config = config
this.sdk = config.sdk
this.sessionManager = new ACPSessionManager(this.sdk)
+ this.startEventSubscription()
}
- private setupEventSubscriptions(session: ACPSessionState) {
- const sessionId = session.id
- const directory = session.cwd
+ private startEventSubscription() {
+ if (this.eventStarted) return
+ this.eventStarted = true
+ this.runEventSubscription().catch((error) => {
+ if (this.eventAbort.signal.aborted) return
+ log.error("event subscription failed", { error })
+ })
+ }
- const options: PermissionOption[] = [
- { optionId: "once", kind: "allow_once", name: "Allow once" },
- { optionId: "always", kind: "allow_always", name: "Always allow" },
- { optionId: "reject", kind: "reject_once", name: "Reject" },
- ]
- this.config.sdk.event.subscribe({ directory }).then(async (events) => {
+ private async runEventSubscription() {
+ while (true) {
+ if (this.eventAbort.signal.aborted) return
+ const events = await this.sdk.global.event({
+ signal: this.eventAbort.signal,
+ })
for await (const event of events.stream) {
- switch (event.type) {
- case "permission.asked":
- try {
- const permission = event.properties
- const res = await this.connection
- .requestPermission({
- sessionId,
- toolCall: {
- toolCallId: permission.tool?.callID ?? permission.id,
- status: "pending",
- title: permission.permission,
- rawInput: permission.metadata,
- kind: toToolKind(permission.permission),
- locations: toLocations(permission.permission, permission.metadata),
- },
- options,
- })
- .catch(async (error) => {
- log.error("failed to request permission from ACP", {
- error,
- permissionID: permission.id,
- sessionID: permission.sessionID,
- })
- await this.config.sdk.permission.reply({
- requestID: permission.id,
- reply: "reject",
- directory,
- })
- return
+ if (this.eventAbort.signal.aborted) return
+ const payload = (event as any)?.payload
+ if (!payload) continue
+ await this.handleEvent(payload as Event).catch((error) => {
+ log.error("failed to handle event", { error, type: payload.type })
+ })
+ }
+ }
+ }
+
+ private async handleEvent(event: Event) {
+ switch (event.type) {
+ case "permission.asked": {
+ const permission = event.properties
+ const session = this.sessionManager.tryGet(permission.sessionID)
+ if (!session) return
+
+ const prev = this.permissionQueues.get(permission.sessionID) ?? Promise.resolve()
+ const next = prev
+ .then(async () => {
+ const directory = session.cwd
+
+ const res = await this.connection
+ .requestPermission({
+ sessionId: permission.sessionID,
+ toolCall: {
+ toolCallId: permission.tool?.callID ?? permission.id,
+ status: "pending",
+ title: permission.permission,
+ rawInput: permission.metadata,
+ kind: toToolKind(permission.permission),
+ locations: toLocations(permission.permission, permission.metadata),
+ },
+ options: this.permissionOptions,
+ })
+ .catch(async (error) => {
+ log.error("failed to request permission from ACP", {
+ error,
+ permissionID: permission.id,
+ sessionID: permission.sessionID,
})
- if (!res) return
- if (res.outcome.outcome !== "selected") {
- await this.config.sdk.permission.reply({
+ await this.sdk.permission.reply({
requestID: permission.id,
reply: "reject",
directory,
})
- return
- }
- if (res.outcome.optionId !== "reject" && permission.permission == "edit") {
- const metadata = permission.metadata || {}
- const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : ""
- const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : ""
-
- const content = await Bun.file(filepath).text()
- const newContent = getNewContent(content, diff)
-
- if (newContent) {
- this.connection.writeTextFile({
- sessionId: sessionId,
- path: filepath,
- content: newContent,
- })
- }
- }
- await this.config.sdk.permission.reply({
+ return undefined
+ })
+
+ if (!res) return
+ if (res.outcome.outcome !== "selected") {
+ await this.sdk.permission.reply({
requestID: permission.id,
- reply: res.outcome.optionId as "once" | "always" | "reject",
+ reply: "reject",
directory,
})
- } catch (err) {
- log.error("unexpected error when handling permission", { error: err })
- } finally {
- break
+ return
}
- case "message.part.updated":
- log.info("message part updated", { event: event.properties })
- try {
- const props = event.properties
- const { part } = props
-
- const message = await this.config.sdk.session
- .message(
- {
- sessionID: part.sessionID,
- messageID: part.messageID,
- directory,
+ if (res.outcome.optionId !== "reject" && permission.permission == "edit") {
+ const metadata = permission.metadata || {}
+ const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : ""
+ const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : ""
+
+ const content = await Bun.file(filepath).text()
+ const newContent = getNewContent(content, diff)
+
+ if (newContent) {
+ this.connection.writeTextFile({
+ sessionId: session.id,
+ path: filepath,
+ content: newContent,
+ })
+ }
+ }
+
+ await this.sdk.permission.reply({
+ requestID: permission.id,
+ reply: res.outcome.optionId as "once" | "always" | "reject",
+ directory,
+ })
+ })
+ .catch((error) => {
+ log.error("failed to handle permission", { error, permissionID: permission.id })
+ })
+ .finally(() => {
+ if (this.permissionQueues.get(permission.sessionID) === next) {
+ this.permissionQueues.delete(permission.sessionID)
+ }
+ })
+ this.permissionQueues.set(permission.sessionID, next)
+ return
+ }
+
+ case "message.part.updated": {
+ log.info("message part updated", { event: event.properties })
+ const props = event.properties
+ const part = props.part
+ const session = this.sessionManager.tryGet(part.sessionID)
+ if (!session) return
+ const sessionId = session.id
+ const directory = session.cwd
+
+ const message = await this.sdk.session
+ .message(
+ {
+ sessionID: part.sessionID,
+ messageID: part.messageID,
+ directory,
+ },
+ { throwOnError: true },
+ )
+ .then((x) => x.data)
+ .catch((error) => {
+ log.error("unexpected error when fetching message", { error })
+ return undefined
+ })
+
+ if (!message || message.info.role !== "assistant") return
+
+ if (part.type === "tool") {
+ switch (part.state.status) {
+ case "pending":
+ await this.connection
+ .sessionUpdate({
+ sessionId,
+ update: {
+ sessionUpdate: "tool_call",
+ toolCallId: part.callID,
+ title: part.tool,
+ kind: toToolKind(part.tool),
+ status: "pending",
+ locations: [],
+ rawInput: {},
},
- { throwOnError: true },
- )
- .then((x) => x.data)
- .catch((err) => {
- log.error("unexpected error when fetching message", { error: err })
- return undefined
})
+ .catch((error) => {
+ log.error("failed to send tool pending to ACP", { error })
+ })
+ return
- if (!message || message.info.role !== "assistant") return
-
- if (part.type === "tool") {
- switch (part.state.status) {
- case "pending":
- await this.connection
- .sessionUpdate({
- sessionId,
- update: {
- sessionUpdate: "tool_call",
- toolCallId: part.callID,
- title: part.tool,
- kind: toToolKind(part.tool),
- status: "pending",
- locations: [],
- rawInput: {},
- },
- })
- .catch((err) => {
- log.error("failed to send tool pending to ACP", { error: err })
- })
- break
- case "running":
- await this.connection
- .sessionUpdate({
- sessionId,
- update: {
- sessionUpdate: "tool_call_update",
- toolCallId: part.callID,
- status: "in_progress",
- kind: toToolKind(part.tool),
- title: part.tool,
- locations: toLocations(part.tool, part.state.input),
- rawInput: part.state.input,
- },
- })
- .catch((err) => {
- log.error("failed to send tool in_progress to ACP", { error: err })
- })
- break
- case "completed":
- const kind = toToolKind(part.tool)
- const content: ToolCallContent[] = [
- {
- type: "content",
- content: {
- type: "text",
- text: part.state.output,
- },
- },
- ]
-
- if (kind === "edit") {
- const input = part.state.input
- const filePath = typeof input["filePath"] === "string" ? input["filePath"] : ""
- const oldText = typeof input["oldString"] === "string" ? input["oldString"] : ""
- const newText =
- typeof input["newString"] === "string"
- ? input["newString"]
- : typeof input["content"] === "string"
- ? input["content"]
- : ""
- content.push({
- type: "diff",
- path: filePath,
- oldText,
- newText,
- })
- }
-
- if (part.tool === "todowrite") {
- const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output))
- if (parsedTodos.success) {
- await this.connection
- .sessionUpdate({
- sessionId,
- update: {
- sessionUpdate: "plan",
- entries: parsedTodos.data.map((todo) => {
- const status: PlanEntry["status"] =
- todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"])
- return {
- priority: "medium",
- status,
- content: todo.content,
- }
- }),
- },
- })
- .catch((err) => {
- log.error("failed to send session update for todo", { error: err })
- })
- } else {
- log.error("failed to parse todo output", { error: parsedTodos.error })
- }
- }
-
- await this.connection
- .sessionUpdate({
- sessionId,
- update: {
- sessionUpdate: "tool_call_update",
- toolCallId: part.callID,
- status: "completed",
- kind,
- content,
- title: part.state.title,
- rawInput: part.state.input,
- rawOutput: {
- output: part.state.output,
- metadata: part.state.metadata,
- },
- },
- })
- .catch((err) => {
- log.error("failed to send tool completed to ACP", { error: err })
- })
- break
- case "error":
- await this.connection
- .sessionUpdate({
- sessionId,
- update: {
- sessionUpdate: "tool_call_update",
- toolCallId: part.callID,
- status: "failed",
- kind: toToolKind(part.tool),
- title: part.tool,
- rawInput: part.state.input,
- content: [
- {
- type: "content",
- content: {
- type: "text",
- text: part.state.error,
- },
- },
- ],
- rawOutput: {
- error: part.state.error,
- },
- },
- })
- .catch((err) => {
- log.error("failed to send tool error to ACP", { error: err })
- })
- break
- }
- } else if (part.type === "text") {
- const delta = props.delta
- if (delta && part.synthetic !== true) {
+ case "running":
+ await this.connection
+ .sessionUpdate({
+ sessionId,
+ update: {
+ sessionUpdate: "tool_call_update",
+ toolCallId: part.callID,
+ status: "in_progress",
+ kind: toToolKind(part.tool),
+ title: part.tool,
+ locations: toLocations(part.tool, part.state.input),
+ rawInput: part.state.input,
+ },
+ })
+ .catch((error) => {
+ log.error("failed to send tool in_progress to ACP", { error })
+ })
+ return
+
+ case "completed": {
+ const kind = toToolKind(part.tool)
+ const content: ToolCallContent[] = [
+ {
+ type: "content",
+ content: {
+ type: "text",
+ text: part.state.output,
+ },
+ },
+ ]
+
+ if (kind === "edit") {
+ const input = part.state.input
+ const filePath = typeof input["filePath"] === "string" ? input["filePath"] : ""
+ const oldText = typeof input["oldString"] === "string" ? input["oldString"] : ""
+ const newText =
+ typeof input["newString"] === "string"
+ ? input["newString"]
+ : typeof input["content"] === "string"
+ ? input["content"]
+ : ""
+ content.push({
+ type: "diff",
+ path: filePath,
+ oldText,
+ newText,
+ })
+ }
+
+ if (part.tool === "todowrite") {
+ const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output))
+ if (parsedTodos.success) {
await this.connection
.sessionUpdate({
sessionId,
update: {
- sessionUpdate: "agent_message_chunk",
- content: {
- type: "text",
- text: delta,
- },
+ sessionUpdate: "plan",
+ entries: parsedTodos.data.map((todo) => {
+ const status: PlanEntry["status"] =
+ todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"])
+ return {
+ priority: "medium",
+ status,
+ content: todo.content,
+ }
+ }),
},
})
- .catch((err) => {
- log.error("failed to send text to ACP", { error: err })
+ .catch((error) => {
+ log.error("failed to send session update for todo", { error })
})
+ } else {
+ log.error("failed to parse todo output", { error: parsedTodos.error })
}
- } else if (part.type === "reasoning") {
- const delta = props.delta
- if (delta) {
- await this.connection
- .sessionUpdate({
- sessionId,
- update: {
- sessionUpdate: "agent_thought_chunk",
+ }
+
+ await this.connection
+ .sessionUpdate({
+ sessionId,
+ update: {
+ sessionUpdate: "tool_call_update",
+ toolCallId: part.callID,
+ status: "completed",
+ kind,
+ content,
+ title: part.state.title,
+ rawInput: part.state.input,
+ rawOutput: {
+ output: part.state.output,
+ metadata: part.state.metadata,
+ },
+ },
+ })
+ .catch((error) => {
+ log.error("failed to send tool completed to ACP", { error })
+ })
+ return
+ }
+ case "error":
+ await this.connection
+ .sessionUpdate({
+ sessionId,
+ update: {
+ sessionUpdate: "tool_call_update",
+ toolCallId: part.callID,
+ status: "failed",
+ kind: toToolKind(part.tool),
+ title: part.tool,
+ rawInput: part.state.input,
+ content: [
+ {
+ type: "content",
content: {
type: "text",
- text: delta,
+ text: part.state.error,
},
},
- })
- .catch((err) => {
- log.error("failed to send reasoning to ACP", { error: err })
- })
- }
- }
- } finally {
- break
- }
+ ],
+ rawOutput: {
+ error: part.state.error,
+ },
+ },
+ })
+ .catch((error) => {
+ log.error("failed to send tool error to ACP", { error })
+ })
+ return
+ }
+ }
+
+ if (part.type === "text") {
+ const delta = props.delta
+ if (delta && part.synthetic !== true) {
+ await this.connection
+ .sessionUpdate({
+ sessionId,
+ update: {
+ sessionUpdate: "agent_message_chunk",
+ content: {
+ type: "text",
+ text: delta,
+ },
+ },
+ })
+ .catch((error) => {
+ log.error("failed to send text to ACP", { error })
+ })
+ }
+ return
}
+
+ if (part.type === "reasoning") {
+ const delta = props.delta
+ if (delta) {
+ await this.connection
+ .sessionUpdate({
+ sessionId,
+ update: {
+ sessionUpdate: "agent_thought_chunk",
+ content: {
+ type: "text",
+ text: delta,
+ },
+ },
+ })
+ .catch((error) => {
+ log.error("failed to send reasoning to ACP", { error })
+ })
+ }
+ }
+ return
}
- })
+ }
}
async initialize(params: InitializeRequest): Promise<InitializeResponse> {
@@ -409,8 +459,6 @@ export namespace ACP {
sessionId,
})
- this.setupEventSubscriptions(state)
-
return {
sessionId,
models: load.models,
@@ -436,7 +484,7 @@ export namespace ACP {
const model = await defaultModel(this.config, directory)
// Store ACP session state
- const state = await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model)
+ await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model)
log.info("load_session", { sessionId, mcpServers: params.mcpServers.length })
@@ -446,8 +494,6 @@ export namespace ACP {
sessionId,
})
- this.setupEventSubscriptions(state)
-
// Replay session history
const messages = await this.sdk.session
.messages(
diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts
index 70b658347..151fa5646 100644
--- a/packages/opencode/src/acp/session.ts
+++ b/packages/opencode/src/acp/session.ts
@@ -13,6 +13,10 @@ export class ACPSessionManager {
this.sdk = sdk
}
+ tryGet(sessionId: string): ACPSessionState | undefined {
+ return this.sessions.get(sessionId)
+ }
+
async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise<ACPSessionState> {
const session = await this.sdk.session
.create(
diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts
new file mode 100644
index 000000000..8e139ff59
--- /dev/null
+++ b/packages/opencode/test/acp/event-subscription.test.ts
@@ -0,0 +1,436 @@
+import { describe, expect, test } from "bun:test"
+import { ACP } from "../../src/acp/agent"
+import type { AgentSideConnection } from "@agentclientprotocol/sdk"
+import type { Event } from "@opencode-ai/sdk/v2"
+import { Instance } from "../../src/project/instance"
+import { tmpdir } from "../fixture/fixture"
+
+type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
+type RequestPermissionParams = Parameters<AgentSideConnection["requestPermission"]>[0]
+type RequestPermissionResult = Awaited<ReturnType<AgentSideConnection["requestPermission"]>>
+
+type GlobalEventEnvelope = {
+ directory?: string
+ payload?: Event
+}
+
+type EventController = {
+ push: (event: GlobalEventEnvelope) => void
+ close: () => void
+}
+
+function createEventStream() {
+ const queue: GlobalEventEnvelope[] = []
+ const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = []
+ const state = { closed: false }
+
+ const push = (event: GlobalEventEnvelope) => {
+ const waiter = waiters.shift()
+ if (waiter) {
+ waiter(event)
+ return
+ }
+ queue.push(event)
+ }
+
+ const close = () => {
+ state.closed = true
+ for (const waiter of waiters.splice(0)) {
+ waiter(undefined)
+ }
+ }
+
+ const stream = async function* (signal?: AbortSignal) {
+ while (true) {
+ if (signal?.aborted) return
+ const next = queue.shift()
+ if (next) {
+ yield next
+ continue
+ }
+ if (state.closed) return
+ const value = await new Promise<GlobalEventEnvelope | undefined>((resolve) => {
+ waiters.push(resolve)
+ if (!signal) return
+ signal.addEventListener("abort", () => resolve(undefined), { once: true })
+ })
+ if (!value) return
+ yield value
+ }
+ }
+
+ return { controller: { push, close } satisfies EventController, stream }
+}
+
+function createFakeAgent() {
+ const updates = new Map<string, string[]>()
+ const chunks = new Map<string, string>()
+ const record = (sessionId: string, type: string) => {
+ const list = updates.get(sessionId) ?? []
+ list.push(type)
+ updates.set(sessionId, list)
+ }
+
+ const connection = {
+ async sessionUpdate(params: SessionUpdateParams) {
+ const update = params.update
+ const type = update?.sessionUpdate ?? "unknown"
+ record(params.sessionId, type)
+ if (update?.sessionUpdate === "agent_message_chunk") {
+ const content = update.content
+ if (content?.type !== "text") return
+ if (typeof content.text !== "string") return
+ chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text)
+ }
+ },
+ async requestPermission(_params: RequestPermissionParams): Promise<RequestPermissionResult> {
+ return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult
+ },
+ } as unknown as AgentSideConnection
+
+ const { controller, stream } = createEventStream()
+ const calls = {
+ eventSubscribe: 0,
+ sessionCreate: 0,
+ }
+
+ const sdk = {
+ global: {
+ event: async (opts?: { signal?: AbortSignal }) => {
+ calls.eventSubscribe++
+ return { stream: stream(opts?.signal) }
+ },
+ },
+ session: {
+ create: async (_params?: any) => {
+ calls.sessionCreate++
+ return {
+ data: {
+ id: `ses_${calls.sessionCreate}`,
+ time: { created: new Date().toISOString() },
+ },
+ }
+ },
+ get: async (_params?: any) => {
+ return {
+ data: {
+ id: "ses_1",
+ time: { created: new Date().toISOString() },
+ },
+ }
+ },
+ messages: async () => {
+ return { data: [] }
+ },
+ message: async () => {
+ return {
+ data: {
+ info: {
+ role: "assistant",
+ },
+ },
+ }
+ },
+ },
+ permission: {
+ respond: async () => {
+ return { data: true }
+ },
+ },
+ config: {
+ providers: async () => {
+ return {
+ data: {
+ providers: [
+ {
+ id: "opencode",
+ name: "opencode",
+ models: {
+ "big-pickle": { id: "big-pickle", name: "big-pickle" },
+ },
+ },
+ ],
+ },
+ }
+ },
+ },
+ app: {
+ agents: async () => {
+ return {
+ data: [
+ {
+ name: "build",
+ description: "build",
+ mode: "agent",
+ },
+ ],
+ }
+ },
+ },
+ command: {
+ list: async () => {
+ return { data: [] }
+ },
+ },
+ mcp: {
+ add: async () => {
+ return { data: true }
+ },
+ },
+ } as any
+
+ const agent = new ACP.Agent(connection, {
+ sdk,
+ defaultModel: { providerID: "opencode", modelID: "big-pickle" },
+ } as any)
+
+ const stop = () => {
+ controller.close()
+ ;(agent as any).eventAbort.abort()
+ }
+
+ return { agent, controller, calls, updates, chunks, stop, sdk, connection }
+}
+
+describe("acp.agent event subscription", () => {
+ test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => {
+ await using tmp = await tmpdir()
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const { agent, controller, updates, stop } = createFakeAgent()
+ const cwd = "/tmp/opencode-acp-test"
+
+ const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+ const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+ controller.push({
+ directory: cwd,
+ payload: {
+ type: "message.part.updated",
+ properties: {
+ part: {
+ sessionID: sessionB,
+ messageID: "msg_1",
+ type: "text",
+ synthetic: false,
+ },
+ delta: "hello",
+ },
+ },
+ } as any)
+
+ await new Promise((r) => setTimeout(r, 10))
+
+ expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false)
+ expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true)
+
+ stop()
+ },
+ })
+ })
+
+ test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => {
+ await using tmp = await tmpdir()
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const { agent, controller, chunks, stop } = createFakeAgent()
+ const cwd = "/tmp/opencode-acp-test"
+
+ const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+ const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+ const tokenA = ["ALPHA_", "111", "_X"]
+ const tokenB = ["BETA_", "222", "_Y"]
+
+ const push = (sessionId: string, messageID: string, delta: string) => {
+ controller.push({
+ directory: cwd,
+ payload: {
+ type: "message.part.updated",
+ properties: {
+ part: {
+ sessionID: sessionId,
+ messageID,
+ type: "text",
+ synthetic: false,
+ },
+ delta,
+ },
+ },
+ } as any)
+ }
+
+ push(sessionA, "msg_a", tokenA[0])
+ push(sessionB, "msg_b", tokenB[0])
+ push(sessionA, "msg_a", tokenA[1])
+ push(sessionB, "msg_b", tokenB[1])
+ push(sessionA, "msg_a", tokenA[2])
+ push(sessionB, "msg_b", tokenB[2])
+
+ await new Promise((r) => setTimeout(r, 20))
+
+ const a = chunks.get(sessionA) ?? ""
+ const b = chunks.get(sessionB) ?? ""
+
+ expect(a).toContain(tokenA.join(""))
+ expect(b).toContain(tokenB.join(""))
+ for (const part of tokenB) expect(a).not.toContain(part)
+ for (const part of tokenA) expect(b).not.toContain(part)
+
+ stop()
+ },
+ })
+ })
+
+ test("does not create additional event subscriptions on repeated loadSession()", async () => {
+ await using tmp = await tmpdir()
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const { agent, calls, stop } = createFakeAgent()
+ const cwd = "/tmp/opencode-acp-test"
+
+ const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+ await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+ await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+ await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+ await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any)
+
+ expect(calls.eventSubscribe).toBe(1)
+
+ stop()
+ },
+ })
+ })
+
+ test("permission.asked events are handled and replied", async () => {
+ await using tmp = await tmpdir()
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const permissionReplies: string[] = []
+ const { agent, controller, stop, sdk } = createFakeAgent()
+ sdk.permission.reply = async (params: any) => {
+ permissionReplies.push(params.requestID)
+ return { data: true }
+ }
+ const cwd = "/tmp/opencode-acp-test"
+
+ const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+ controller.push({
+ directory: cwd,
+ payload: {
+ type: "permission.asked",
+ properties: {
+ id: "perm_1",
+ sessionID: sessionA,
+ permission: "bash",
+ patterns: ["*"],
+ metadata: {},
+ always: [],
+ },
+ },
+ } as any)
+
+ await new Promise((r) => setTimeout(r, 20))
+
+ expect(permissionReplies).toContain("perm_1")
+
+ stop()
+ },
+ })
+ })
+
+ test("permission prompt on session A does not block message updates for session B", async () => {
+ await using tmp = await tmpdir()
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const permissionReplies: string[] = []
+ let resolvePermissionA: (() => void) | undefined
+ const permissionABlocking = new Promise<void>((r) => {
+ resolvePermissionA = r
+ })
+
+ const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent()
+
+ // Make permission request for session A block until we release it
+ const originalRequestPermission = connection.requestPermission.bind(connection)
+ let permissionCalls = 0
+ connection.requestPermission = async (params: RequestPermissionParams) => {
+ permissionCalls++
+ if (params.sessionId.endsWith("1")) {
+ await permissionABlocking
+ }
+ return originalRequestPermission(params)
+ }
+
+ sdk.permission.reply = async (params: any) => {
+ permissionReplies.push(params.requestID)
+ return { data: true }
+ }
+
+ const cwd = "/tmp/opencode-acp-test"
+
+ const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+ const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)
+
+ // Push permission.asked for session A (will block)
+ controller.push({
+ directory: cwd,
+ payload: {
+ type: "permission.asked",
+ properties: {
+ id: "perm_a",
+ sessionID: sessionA,
+ permission: "bash",
+ patterns: ["*"],
+ metadata: {},
+ always: [],
+ },
+ },
+ } as any)
+
+ // Give time for permission handling to start
+ await new Promise((r) => setTimeout(r, 10))
+
+ // Push message for session B while A's permission is pending
+ controller.push({
+ directory: cwd,
+ payload: {
+ type: "message.part.updated",
+ properties: {
+ part: {
+ sessionID: sessionB,
+ messageID: "msg_b",
+ type: "text",
+ synthetic: false,
+ },
+ delta: "session_b_message",
+ },
+ },
+ } as any)
+
+ // Wait for session B's message to be processed
+ await new Promise((r) => setTimeout(r, 20))
+
+ // Session B should have received message even though A's permission is still pending
+ expect(chunks.get(sessionB) ?? "").toContain("session_b_message")
+ expect(permissionReplies).not.toContain("perm_a")
+
+ // Release session A's permission
+ resolvePermissionA!()
+ await new Promise((r) => setTimeout(r, 20))
+
+ // Now session A's permission should be replied
+ expect(permissionReplies).toContain("perm_a")
+
+ stop()
+ },
+ })
+ })
+})