summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-03-21 21:17:13 -0400
committerGitHub <[email protected]>2026-03-22 01:17:13 +0000
commit13bac9c91a908f560f74f19a49c7c865e4bfd5ec (patch)
treec9cea7c7531c5d57e4a09ac162099c47089b3ed5
parentfe53af48195446dbb6f080fd93245ed3ca72caed (diff)
downloadopencode-13bac9c91a908f560f74f19a49c7c865e4bfd5ec.tar.gz
opencode-13bac9c91a908f560f74f19a49c7c865e4bfd5ec.zip
effectify Pty service (#18572)
-rw-r--r--packages/opencode/src/pty/index.ts482
-rw-r--r--packages/opencode/src/server/routes/pty.ts24
2 files changed, 296 insertions, 210 deletions
diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts
index 7436abec9..f866b18ad 100644
--- a/packages/opencode/src/pty/index.ts
+++ b/packages/opencode/src/pty/index.ts
@@ -1,13 +1,16 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
+import { InstanceState } from "@/effect/instance-state"
+import { makeRunPromise } from "@/effect/run-service"
+import { Instance } from "@/project/instance"
import { type IPty } from "bun-pty"
import z from "zod"
import { Log } from "../util/log"
-import { Instance } from "../project/instance"
import { lazy } from "@opencode-ai/util/lazy"
import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
+import { Effect, Layer, ServiceMap } from "effect"
export namespace Pty {
const log = Log.create({ service: "pty" })
@@ -23,6 +26,20 @@ export namespace Pty {
close: (code?: number, reason?: string) => void
}
+ type Active = {
+ info: Info
+ process: IPty
+ buffer: string
+ bufferCursor: number
+ cursor: number
+ subscribers: Map<unknown, Socket>
+ }
+
+ type State = {
+ dir: string
+ sessions: Map<PtyID, Active>
+ }
+
// WebSocket control frame: 0x00 + UTF-8 JSON.
const meta = (cursor: number) => {
const json = JSON.stringify({ cursor })
@@ -81,241 +98,300 @@ export namespace Pty {
Deleted: BusEvent.define("pty.deleted", z.object({ id: PtyID.zod })),
}
- interface ActiveSession {
- info: Info
- process: IPty
- buffer: string
- bufferCursor: number
- cursor: number
- subscribers: Map<unknown, Socket>
+ export interface Interface {
+ readonly list: () => Effect.Effect<Info[]>
+ readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
+ readonly create: (input: CreateInput) => Effect.Effect<Info>
+ readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
+ readonly remove: (id: PtyID) => Effect.Effect<void>
+ readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
+ readonly write: (id: PtyID, data: string) => Effect.Effect<void>
+ readonly connect: (
+ id: PtyID,
+ ws: Socket,
+ cursor?: number,
+ ) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
}
- const state = Instance.state(
- () => new Map<PtyID, ActiveSession>(),
- async (sessions) => {
- for (const session of sessions.values()) {
+ export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Pty") {}
+
+ export const layer = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ function teardown(session: Active) {
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
- } catch {
- // ignore
- }
+ } catch {}
}
+ session.subscribers.clear()
}
- sessions.clear()
- },
- )
- export function list() {
- return Array.from(state().values()).map((s) => s.info)
- }
+ const cache = yield* InstanceState.make<State>(
+ Effect.fn("Pty.state")(function* (ctx) {
+ const state = {
+ dir: ctx.directory,
+ sessions: new Map<PtyID, Active>(),
+ }
- export function get(id: PtyID) {
- return state().get(id)?.info
- }
+ yield* Effect.addFinalizer(() =>
+ Effect.sync(() => {
+ for (const session of state.sessions.values()) {
+ teardown(session)
+ }
+ state.sessions.clear()
+ }),
+ )
+
+ return state
+ }),
+ )
+
+ const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
+ const state = yield* InstanceState.get(cache)
+ const session = state.sessions.get(id)
+ if (!session) return
+ state.sessions.delete(id)
+ log.info("removing session", { id })
+ teardown(session)
+ void Bus.publish(Event.Deleted, { id: session.info.id })
+ })
- export async function create(input: CreateInput) {
- const id = PtyID.ascending()
- const command = input.command || Shell.preferred()
- const args = input.args || []
- if (command.endsWith("sh")) {
- args.push("-l")
- }
-
- const cwd = input.cwd || Instance.directory
- const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
- const env = {
- ...process.env,
- ...input.env,
- ...shellEnv.env,
- TERM: "xterm-256color",
- OPENCODE_TERMINAL: "1",
- } as Record<string, string>
-
- if (process.platform === "win32") {
- env.LC_ALL = "C.UTF-8"
- env.LC_CTYPE = "C.UTF-8"
- env.LANG = "C.UTF-8"
- }
- log.info("creating session", { id, cmd: command, args, cwd })
-
- const spawn = await pty()
- const ptyProcess = spawn(command, args, {
- name: "xterm-256color",
- cwd,
- env,
- })
+ const list = Effect.fn("Pty.list")(function* () {
+ const state = yield* InstanceState.get(cache)
+ return Array.from(state.sessions.values()).map((session) => session.info)
+ })
- const info = {
- id,
- title: input.title || `Terminal ${id.slice(-4)}`,
- command,
- args,
- cwd,
- status: "running",
- pid: ptyProcess.pid,
- } as const
- const session: ActiveSession = {
- info,
- process: ptyProcess,
- buffer: "",
- bufferCursor: 0,
- cursor: 0,
- subscribers: new Map(),
- }
- state().set(id, session)
- ptyProcess.onData(
- Instance.bind((chunk) => {
- session.cursor += chunk.length
+ const get = Effect.fn("Pty.get")(function* (id: PtyID) {
+ const state = yield* InstanceState.get(cache)
+ return state.sessions.get(id)?.info
+ })
- for (const [key, ws] of session.subscribers.entries()) {
- if (ws.readyState !== 1) {
- session.subscribers.delete(key)
- continue
+ const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
+ const state = yield* InstanceState.get(cache)
+ return yield* Effect.promise(async () => {
+ const id = PtyID.ascending()
+ const command = input.command || Shell.preferred()
+ const args = input.args || []
+ if (command.endsWith("sh")) {
+ args.push("-l")
}
- if (ws.data !== key) {
- session.subscribers.delete(key)
- continue
+ const cwd = input.cwd || state.dir
+ const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
+ const env = {
+ ...process.env,
+ ...input.env,
+ ...shellEnv.env,
+ TERM: "xterm-256color",
+ OPENCODE_TERMINAL: "1",
+ } as Record<string, string>
+
+ if (process.platform === "win32") {
+ env.LC_ALL = "C.UTF-8"
+ env.LC_CTYPE = "C.UTF-8"
+ env.LANG = "C.UTF-8"
}
+ log.info("creating session", { id, cmd: command, args, cwd })
+
+ const spawn = await pty()
+ const proc = spawn(command, args, {
+ name: "xterm-256color",
+ cwd,
+ env,
+ })
+
+ const info = {
+ id,
+ title: input.title || `Terminal ${id.slice(-4)}`,
+ command,
+ args,
+ cwd,
+ status: "running",
+ pid: proc.pid,
+ } as const
+ const session: Active = {
+ info,
+ process: proc,
+ buffer: "",
+ bufferCursor: 0,
+ cursor: 0,
+ subscribers: new Map(),
+ }
+ state.sessions.set(id, session)
+ proc.onData(
+ Instance.bind((chunk) => {
+ session.cursor += chunk.length
+
+ for (const [key, ws] of session.subscribers.entries()) {
+ if (ws.readyState !== 1) {
+ session.subscribers.delete(key)
+ continue
+ }
+ if (ws.data !== key) {
+ session.subscribers.delete(key)
+ continue
+ }
+ try {
+ ws.send(chunk)
+ } catch {
+ session.subscribers.delete(key)
+ }
+ }
+
+ session.buffer += chunk
+ if (session.buffer.length <= BUFFER_LIMIT) return
+ const excess = session.buffer.length - BUFFER_LIMIT
+ session.buffer = session.buffer.slice(excess)
+ session.bufferCursor += excess
+ }),
+ )
+ proc.onExit(
+ Instance.bind(({ exitCode }) => {
+ if (session.info.status === "exited") return
+ log.info("session exited", { id, exitCode })
+ session.info.status = "exited"
+ void Bus.publish(Event.Exited, { id, exitCode })
+ Effect.runFork(remove(id))
+ }),
+ )
+ await Bus.publish(Event.Created, { info })
+ return info
+ })
+ })
+
+ const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
+ const state = yield* InstanceState.get(cache)
+ const session = state.sessions.get(id)
+ if (!session) return
+ if (input.title) {
+ session.info.title = input.title
+ }
+ if (input.size) {
+ session.process.resize(input.size.cols, input.size.rows)
+ }
+ yield* Effect.promise(() => Bus.publish(Event.Updated, { info: session.info }))
+ return session.info
+ })
+ const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
+ const state = yield* InstanceState.get(cache)
+ const session = state.sessions.get(id)
+ if (session && session.info.status === "running") {
+ session.process.resize(cols, rows)
+ }
+ })
+
+ const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
+ const state = yield* InstanceState.get(cache)
+ const session = state.sessions.get(id)
+ if (session && session.info.status === "running") {
+ session.process.write(data)
+ }
+ })
+
+ const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
+ const state = yield* InstanceState.get(cache)
+ const session = state.sessions.get(id)
+ if (!session) {
+ ws.close()
+ return
+ }
+ log.info("client connected to session", { id })
+
+ // Use ws.data as the unique key for this connection lifecycle.
+ // If ws.data is undefined, fallback to ws object.
+ const key = ws.data && typeof ws.data === "object" ? ws.data : ws
+ // Optionally cleanup if the key somehow exists
+ session.subscribers.delete(key)
+ session.subscribers.set(key, ws)
+
+ const cleanup = () => {
+ session.subscribers.delete(key)
+ }
+
+ const start = session.bufferCursor
+ const end = session.cursor
+ const from =
+ cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
+
+ const data = (() => {
+ if (!session.buffer) return ""
+ if (from >= end) return ""
+ const offset = Math.max(0, from - start)
+ if (offset >= session.buffer.length) return ""
+ return session.buffer.slice(offset)
+ })()
+
+ if (data) {
try {
- ws.send(chunk)
+ for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
+ ws.send(data.slice(i, i + BUFFER_CHUNK))
+ }
} catch {
- session.subscribers.delete(key)
+ cleanup()
+ ws.close()
+ return
}
}
- session.buffer += chunk
- if (session.buffer.length <= BUFFER_LIMIT) return
- const excess = session.buffer.length - BUFFER_LIMIT
- session.buffer = session.buffer.slice(excess)
- session.bufferCursor += excess
- }),
- )
- ptyProcess.onExit(
- Instance.bind(({ exitCode }) => {
- if (session.info.status === "exited") return
- log.info("session exited", { id, exitCode })
- session.info.status = "exited"
- Bus.publish(Event.Exited, { id, exitCode })
- remove(id)
- }),
- )
- Bus.publish(Event.Created, { info })
- return info
+ try {
+ ws.send(meta(end))
+ } catch {
+ cleanup()
+ ws.close()
+ return
+ }
+
+ return {
+ onMessage: (message: string | ArrayBuffer) => {
+ session.process.write(String(message))
+ },
+ onClose: () => {
+ log.info("client disconnected from session", { id })
+ cleanup()
+ },
+ }
+ })
+
+ return Service.of({ list, get, create, update, remove, resize, write, connect })
+ }),
+ )
+
+ const runPromise = makeRunPromise(Service, layer)
+
+ export async function list() {
+ return runPromise((svc) => svc.list())
}
- export async function update(id: PtyID, input: UpdateInput) {
- const session = state().get(id)
- if (!session) return
- if (input.title) {
- session.info.title = input.title
- }
- if (input.size) {
- session.process.resize(input.size.cols, input.size.rows)
- }
- Bus.publish(Event.Updated, { info: session.info })
- return session.info
+ export async function get(id: PtyID) {
+ return runPromise((svc) => svc.get(id))
}
- export async function remove(id: PtyID) {
- const session = state().get(id)
- if (!session) return
- state().delete(id)
- log.info("removing session", { id })
- try {
- session.process.kill()
- } catch {}
- for (const [key, ws] of session.subscribers.entries()) {
- try {
- if (ws.data === key) ws.close()
- } catch {
- // ignore
- }
- }
- session.subscribers.clear()
- Bus.publish(Event.Deleted, { id: session.info.id })
+ export async function resize(id: PtyID, cols: number, rows: number) {
+ return runPromise((svc) => svc.resize(id, cols, rows))
}
- export function resize(id: PtyID, cols: number, rows: number) {
- const session = state().get(id)
- if (session && session.info.status === "running") {
- session.process.resize(cols, rows)
- }
+ export async function write(id: PtyID, data: string) {
+ return runPromise((svc) => svc.write(id, data))
}
- export function write(id: PtyID, data: string) {
- const session = state().get(id)
- if (session && session.info.status === "running") {
- session.process.write(data)
- }
+ export async function connect(id: PtyID, ws: Socket, cursor?: number) {
+ return runPromise((svc) => svc.connect(id, ws, cursor))
}
- export function connect(id: PtyID, ws: Socket, cursor?: number) {
- const session = state().get(id)
- if (!session) {
- ws.close()
- return
- }
- log.info("client connected to session", { id })
-
- // Use ws.data as the unique key for this connection lifecycle.
- // If ws.data is undefined, fallback to ws object.
- const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
-
- // Optionally cleanup if the key somehow exists
- session.subscribers.delete(connectionKey)
- session.subscribers.set(connectionKey, ws)
-
- const cleanup = () => {
- session.subscribers.delete(connectionKey)
- }
-
- const start = session.bufferCursor
- const end = session.cursor
-
- const from =
- cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
-
- const data = (() => {
- if (!session.buffer) return ""
- if (from >= end) return ""
- const offset = Math.max(0, from - start)
- if (offset >= session.buffer.length) return ""
- return session.buffer.slice(offset)
- })()
-
- if (data) {
- try {
- for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
- ws.send(data.slice(i, i + BUFFER_CHUNK))
- }
- } catch {
- cleanup()
- ws.close()
- return
- }
- }
-
- try {
- ws.send(meta(end))
- } catch {
- cleanup()
- ws.close()
- return
- }
- return {
- onMessage: (message: string | ArrayBuffer) => {
- session.process.write(String(message))
- },
- onClose: () => {
- log.info("client disconnected from session", { id })
- cleanup()
- },
- }
+ export async function create(input: CreateInput) {
+ return runPromise((svc) => svc.create(input))
+ }
+
+ export async function update(id: PtyID, input: UpdateInput) {
+ return runPromise((svc) => svc.update(id, input))
+ }
+
+ export async function remove(id: PtyID) {
+ return runPromise((svc) => svc.remove(id))
}
}
diff --git a/packages/opencode/src/server/routes/pty.ts b/packages/opencode/src/server/routes/pty.ts
index 0f9151ba4..de79801e2 100644
--- a/packages/opencode/src/server/routes/pty.ts
+++ b/packages/opencode/src/server/routes/pty.ts
@@ -28,7 +28,7 @@ export const PtyRoutes = lazy(() =>
},
}),
async (c) => {
- return c.json(Pty.list())
+ return c.json(await Pty.list())
},
)
.post(
@@ -75,7 +75,7 @@ export const PtyRoutes = lazy(() =>
}),
validator("param", z.object({ ptyID: PtyID.zod })),
async (c) => {
- const info = Pty.get(c.req.valid("param").ptyID)
+ const info = await Pty.get(c.req.valid("param").ptyID)
if (!info) {
throw new NotFoundError({ message: "Session not found" })
}
@@ -150,7 +150,7 @@ export const PtyRoutes = lazy(() =>
},
}),
validator("param", z.object({ ptyID: PtyID.zod })),
- upgradeWebSocket((c) => {
+ upgradeWebSocket(async (c) => {
const id = PtyID.zod.parse(c.req.param("ptyID"))
const cursor = (() => {
const value = c.req.query("cursor")
@@ -159,8 +159,8 @@ export const PtyRoutes = lazy(() =>
if (!Number.isSafeInteger(parsed) || parsed < -1) return
return parsed
})()
- let handler: ReturnType<typeof Pty.connect>
- if (!Pty.get(id)) throw new Error("Session not found")
+ let handler: Awaited<ReturnType<typeof Pty.connect>>
+ if (!(await Pty.get(id))) throw new Error("Session not found")
type Socket = {
readyState: number
@@ -176,17 +176,27 @@ export const PtyRoutes = lazy(() =>
return typeof (value as { readyState?: unknown }).readyState === "number"
}
+ const pending: string[] = []
+ let ready = false
+
return {
- onOpen(_event, ws) {
+ async onOpen(_event, ws) {
const socket = ws.raw
if (!isSocket(socket)) {
ws.close()
return
}
- handler = Pty.connect(id, socket, cursor)
+ handler = await Pty.connect(id, socket, cursor)
+ ready = true
+ for (const msg of pending) handler?.onMessage(msg)
+ pending.length = 0
},
onMessage(event) {
if (typeof event.data !== "string") return
+ if (!ready) {
+ pending.push(event.data)
+ return
+ }
handler?.onMessage(event.data)
},
onClose() {