summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-04-30 14:24:43 -0400
committerGitHub <[email protected]>2026-04-30 14:24:43 -0400
commitf4ce240a2ed419bba49dd1ff1326c036ba53b2ca (patch)
tree453d930cb07ad1d4f0875378bcb543f50431f45c /packages
parent320527a3e4c9c064d3a3e7ce28a138ad8976e830 (diff)
downloadopencode-f4ce240a2ed419bba49dd1ff1326c036ba53b2ca.tar.gz
opencode-f4ce240a2ed419bba49dd1ff1326c036ba53b2ca.zip
Use PTY service directly in HTTP routes (#25138)
Diffstat (limited to 'packages')
-rw-r--r--packages/opencode/src/pty/index.ts65
-rw-r--r--packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts112
-rw-r--r--packages/opencode/test/server/httpapi-pty.test.ts89
3 files changed, 175 insertions, 91 deletions
diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts
index 2518800ce..ade4b5d02 100644
--- a/packages/opencode/src/pty/index.ts
+++ b/packages/opencode/src/pty/index.ts
@@ -5,7 +5,6 @@ import { InstanceState } from "@/effect/instance-state"
import { EffectBridge } from "@/effect/bridge"
import { lazy } from "@opencode-ai/core/util/lazy"
import { Plugin } from "@/plugin"
-import { Instance } from "@/project/instance"
import { Shell } from "@/shell/shell"
import type { Proc } from "#pty"
import * as Log from "@opencode-ai/core/util/log"
@@ -229,42 +228,38 @@ export const layer = Layer.effect(
subscribers: new Map(),
}
s.sessions.set(id, session)
- proc.onData(
- Instance.bind((chunk) => {
- session.cursor += chunk.length
-
- for (const [key, ws] of session.subscribers.entries()) {
- if (ws.readyState !== 1) {
- session.subscribers.delete(key)
- continue
- }
- if (sock(ws) !== key) {
- session.subscribers.delete(key)
- continue
- }
- try {
- ws.send(chunk)
- } catch {
- session.subscribers.delete(key)
- }
+ proc.onData((chunk) => {
+ session.cursor += chunk.length
+
+ for (const [key, ws] of session.subscribers.entries()) {
+ if (ws.readyState !== 1) {
+ session.subscribers.delete(key)
+ continue
+ }
+ if (sock(ws) !== key) {
+ session.subscribers.delete(key)
+ continue
+ }
+ try {
+ ws.send(chunk)
+ } catch {
+ session.subscribers.delete(key)
}
+ }
- session.buffer += chunk
- if (session.buffer.length <= BUFFER_LIMIT) return
- const excess = session.buffer.length - BUFFER_LIMIT
- session.buffer = session.buffer.slice(excess)
- session.bufferCursor += excess
- }),
- )
- proc.onExit(
- Instance.bind(({ exitCode }) => {
- if (session.info.status === "exited") return
- log.info("session exited", { id, exitCode })
- session.info.status = "exited"
- bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
- bridge.fork(remove(id))
- }),
- )
+ session.buffer += chunk
+ if (session.buffer.length <= BUFFER_LIMIT) return
+ const excess = session.buffer.length - BUFFER_LIMIT
+ session.buffer = session.buffer.slice(excess)
+ session.bufferCursor += excess
+ })
+ proc.onExit(({ exitCode }) => {
+ if (session.info.status === "exited") return
+ log.info("session exited", { id, exitCode })
+ session.info.status = "exited"
+ bridge.fork(bus.publish(Event.Exited, { id, exitCode }))
+ bridge.fork(remove(id))
+ })
yield* bus.publish(Event.Created, { info })
return info
})
diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts
index aa151cece..cc7c385b3 100644
--- a/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts
+++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/pty.ts
@@ -1,4 +1,3 @@
-import { EffectBridge } from "@/effect/bridge"
import { Pty } from "@/pty"
import { PtyID } from "@/pty/schema"
import { handlePtyInput } from "@/pty/input"
@@ -23,16 +22,11 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
})
const create = Effect.fn("PtyHttpApi.create")(function* (ctx: { payload: typeof Pty.CreateInput.Type }) {
- const bridge = yield* EffectBridge.make()
- return yield* Effect.promise(() =>
- bridge.promise(
- pty.create({
- ...ctx.payload,
- args: ctx.payload.args ? [...ctx.payload.args] : undefined,
- env: ctx.payload.env ? { ...ctx.payload.env } : undefined,
- }),
- ),
- )
+ return yield* pty.create({
+ ...ctx.payload,
+ args: ctx.payload.args ? [...ctx.payload.args] : undefined,
+ env: ctx.payload.env ? { ...ctx.payload.env } : undefined,
+ })
})
const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) {
@@ -68,52 +62,60 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
}),
)
-export const ptyConnectRoute = HttpRouter.add(
- "GET",
- PtyPaths.connect,
+export const ptyConnectRoute = HttpRouter.use((router) =>
Effect.gen(function* () {
const pty = yield* Pty.Service
- const params = yield* HttpRouter.schemaPathParams(Params)
- if (!(yield* pty.get(params.ptyID))) return HttpServerResponse.empty({ status: 404 })
-
- const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery)
- const parsedCursor = query.cursor === undefined ? undefined : Number(query.cursor)
- const cursor =
- parsedCursor !== undefined && Number.isSafeInteger(parsedCursor) && parsedCursor >= -1 ? parsedCursor : undefined
- const socket = yield* Effect.orDie((yield* HttpServerRequest.HttpServerRequest).upgrade)
- const write = yield* socket.writer
- let closed = false
- const adapter = {
- get readyState() {
- return closed ? 3 : 1
- },
- send: (data: string | Uint8Array | ArrayBuffer) => {
- if (closed) return
- Effect.runFork(
- write(data instanceof ArrayBuffer ? new Uint8Array(data) : data).pipe(Effect.catch(() => Effect.void)),
- )
- },
- close: (code?: number, reason?: string) => {
- if (closed) return
- closed = true
- Effect.runFork(write(new Socket.CloseEvent(code, reason)).pipe(Effect.catch(() => Effect.void)))
- },
- }
- const handler = yield* pty.connect(params.ptyID, adapter, cursor)
- if (!handler) return HttpServerResponse.empty()
+ yield* router.add(
+ "GET",
+ PtyPaths.connect,
+ Effect.gen(function* () {
+ const params = yield* HttpRouter.schemaPathParams(Params)
+ if (!(yield* pty.get(params.ptyID))) return HttpServerResponse.empty({ status: 404 })
- yield* socket
- .runRaw((message) => handlePtyInput(handler, message))
- .pipe(
- Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
- Effect.ensuring(
- Effect.sync(() => {
+ const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery)
+ const parsedCursor = query.cursor === undefined ? undefined : Number(query.cursor)
+ const cursor =
+ parsedCursor !== undefined && Number.isSafeInteger(parsedCursor) && parsedCursor >= -1
+ ? parsedCursor
+ : undefined
+ const socket = yield* Effect.orDie((yield* HttpServerRequest.HttpServerRequest).upgrade)
+ const write = yield* socket.writer
+ const services = yield* Effect.context()
+ const writeScoped = (effect: Effect.Effect<void, unknown>) => {
+ Effect.runForkWith(services)(effect.pipe(Effect.catch(() => Effect.void)))
+ }
+ let closed = false
+ const adapter = {
+ get readyState() {
+ return closed ? 3 : 1
+ },
+ send: (data: string | Uint8Array | ArrayBuffer) => {
+ if (closed) return
+ writeScoped(write(data instanceof ArrayBuffer ? new Uint8Array(data) : data))
+ },
+ close: (code?: number, reason?: string) => {
+ if (closed) return
closed = true
- handler.onClose()
- }),
- ),
- Effect.orDie,
- )
- return HttpServerResponse.empty()
- }).pipe(Effect.provide(Pty.defaultLayer)),
+ writeScoped(write(new Socket.CloseEvent(code, reason)))
+ },
+ }
+ const handler = yield* pty.connect(params.ptyID, adapter, cursor)
+ if (!handler) return HttpServerResponse.empty()
+
+ yield* socket
+ .runRaw((message) => handlePtyInput(handler, message))
+ .pipe(
+ Effect.catchReason("SocketError", "SocketCloseError", () => Effect.void),
+ Effect.ensuring(
+ Effect.sync(() => {
+ closed = true
+ handler.onClose()
+ }),
+ ),
+ Effect.orDie,
+ )
+ return HttpServerResponse.empty()
+ }),
+ )
+ }),
)
diff --git a/packages/opencode/test/server/httpapi-pty.test.ts b/packages/opencode/test/server/httpapi-pty.test.ts
index 37d2a4f64..e4d22427c 100644
--- a/packages/opencode/test/server/httpapi-pty.test.ts
+++ b/packages/opencode/test/server/httpapi-pty.test.ts
@@ -1,4 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test"
+import { NodeHttpServer, NodeServices } from "@effect/platform-node"
import { Flag } from "@opencode-ai/core/flag/flag"
import { PtyID } from "../../src/pty/schema"
import { Instance } from "../../src/project/instance"
@@ -6,18 +7,60 @@ import { Server } from "../../src/server/server"
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
import * as Log from "@opencode-ai/core/util/log"
import { resetDatabase } from "../fixture/db"
-import { tmpdir } from "../fixture/fixture"
+import { tmpdir, tmpdirScoped } from "../fixture/fixture"
+import { Config, Effect, Layer, Queue, Schema } from "effect"
+import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http"
+import * as Socket from "effect/unstable/socket/Socket"
+import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
+import { Pty } from "../../src/pty"
+import { testEffect } from "../lib/effect"
void Log.init({ print: false })
const original = Flag.OPENCODE_EXPERIMENTAL_HTTPAPI
const testPty = process.platform === "win32" ? test.skip : test
+const testStateLayer = Layer.effectDiscard(
+ Effect.gen(function* () {
+ Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
+ yield* Effect.promise(() => resetDatabase())
+ yield* Effect.addFinalizer(() =>
+ Effect.promise(async () => {
+ Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original
+ await resetDatabase()
+ }),
+ )
+ }),
+)
+
+const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
+ ExperimentalHttpApiServer.routes,
+ { disableListenLog: true, disableLogger: true },
+)
+
+const effectIt = testEffect(
+ Layer.mergeAll(
+ testStateLayer,
+ Socket.layerWebSocketConstructorGlobal,
+ servedRoutes.pipe(
+ Layer.provide(Socket.layerWebSocketConstructorGlobal),
+ Layer.provideMerge(NodeHttpServer.layerTest),
+ Layer.provideMerge(NodeServices.layer),
+ ),
+ ),
+)
+
function app() {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = true
return Server.Default().app
}
+function serverUrl() {
+ return HttpServer.HttpServer.use((server) => Effect.succeed(HttpServer.formatAddress(server.address)))
+}
+
+const directoryHeader = (dir: string) => HttpClientRequest.setHeader("x-opencode-directory", dir)
+
afterEach(async () => {
Flag.OPENCODE_EXPERIMENTAL_HTTPAPI = original
await Instance.disposeAll()
@@ -85,4 +128,48 @@ describe("pty HttpApi bridge", () => {
})
expect(response.status).toBe(404)
})
+ ;(process.platform === "win32" ? effectIt.live.skip : effectIt.live)(
+ "serves PTY websocket output and input through Effect routes",
+ () =>
+ Effect.gen(function* () {
+ const dir = yield* tmpdirScoped({ git: true, config: { formatter: false, lsp: false } })
+ const created = yield* HttpClientRequest.post(PtyPaths.create).pipe(
+ directoryHeader(dir),
+ HttpClientRequest.bodyJson({ command: "/bin/cat", title: "websocket" }),
+ Effect.flatMap(HttpClient.execute),
+ )
+ expect(created.status).toBe(200)
+ const info = yield* Schema.decodeUnknownEffect(Pty.Info)(yield* created.json)
+
+ const socket = yield* Socket.makeWebSocket(
+ `${(yield* serverUrl()).replace(/^http/, "ws")}${PtyPaths.connect.replace(":ptyID", info.id)}?cursor=-1&directory=${encodeURIComponent(dir)}`,
+ { closeCodeIsError: () => false },
+ )
+ const messages = yield* Queue.unbounded<string>()
+ yield* socket
+ .runRaw((message) =>
+ Queue.offer(messages, typeof message === "string" ? message : new TextDecoder().decode(message)),
+ )
+ .pipe(Effect.catch(() => Effect.void))
+ .pipe(Effect.forkScoped)
+ const write = yield* socket.writer
+
+ const takeUntil = (expected: string, seen = ""): Effect.Effect<string, unknown> =>
+ Effect.gen(function* () {
+ const next = seen + (yield* Queue.take(messages).pipe(Effect.timeout("5 seconds")))
+ if (next.includes(expected)) return next
+ return yield* takeUntil(expected, next)
+ })
+
+ yield* write("ping-route\n")
+ expect(yield* takeUntil("ping-route")).toContain("ping-route")
+ yield* write(new Socket.CloseEvent(1000, "done")).pipe(Effect.catch(() => Effect.void))
+
+ const removed = yield* HttpClientRequest.delete(PtyPaths.remove.replace(":ptyID", info.id)).pipe(
+ directoryHeader(dir),
+ HttpClient.execute,
+ )
+ expect(removed.status).toBe(200)
+ }),
+ )
})