summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJames Long <[email protected]>2026-03-19 13:51:14 -0400
committerGitHub <[email protected]>2026-03-19 13:51:14 -0400
commit05407518973a494785b85259ad4ec29abf4f8158 (patch)
tree5c586e1c0ee52c5b88c08d0907be3d76cb558b11
parentbaa204193c02f1a4699ea2790df19b48e40440db (diff)
downloadopencode-05407518973a494785b85259ad4ec29abf4f8158.tar.gz
opencode-05407518973a494785b85259ad4ec29abf4f8158.zip
fix(core): use a queue to process events in event routes (#18259)
-rw-r--r--packages/opencode/src/bus/index.ts4
-rw-r--r--packages/opencode/src/server/routes/event.ts85
-rw-r--r--packages/opencode/src/server/routes/global.ts54
-rw-r--r--packages/opencode/src/server/server.ts63
4 files changed, 123 insertions, 83 deletions
diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts
index edb093f19..625f29662 100644
--- a/packages/opencode/src/bus/index.ts
+++ b/packages/opencode/src/bus/index.ts
@@ -51,8 +51,8 @@ export namespace Bus {
})
const pending = []
for (const key of [def.type, "*"]) {
- const match = state().subscriptions.get(key)
- for (const sub of match ?? []) {
+ const match = [...(state().subscriptions.get(key) ?? [])]
+ for (const sub of match) {
pending.push(sub(payload))
}
}
diff --git a/packages/opencode/src/server/routes/event.ts b/packages/opencode/src/server/routes/event.ts
new file mode 100644
index 000000000..f34ff0566
--- /dev/null
+++ b/packages/opencode/src/server/routes/event.ts
@@ -0,0 +1,85 @@
+import { Hono } from "hono"
+import { describeRoute, resolver } from "hono-openapi"
+import { streamSSE } from "hono/streaming"
+import { Log } from "@/util/log"
+import { BusEvent } from "@/bus/bus-event"
+import { Bus } from "@/bus"
+import { lazy } from "../../util/lazy"
+import { AsyncQueue } from "../../util/queue"
+import { Instance } from "@/project/instance"
+
+const log = Log.create({ service: "server" })
+
+export const EventRoutes = lazy(() =>
+ new Hono().get(
+ "/event",
+ describeRoute({
+ summary: "Subscribe to events",
+ description: "Get events",
+ operationId: "event.subscribe",
+ responses: {
+ 200: {
+ description: "Event stream",
+ content: {
+ "text/event-stream": {
+ schema: resolver(BusEvent.payloads()),
+ },
+ },
+ },
+ },
+ }),
+ async (c) => {
+ log.info("event connected")
+ c.header("X-Accel-Buffering", "no")
+ c.header("X-Content-Type-Options", "nosniff")
+ return streamSSE(c, async (stream) => {
+ const q = new AsyncQueue<string | null>()
+ let done = false
+
+ q.push(
+ JSON.stringify({
+ type: "server.connected",
+ properties: {},
+ }),
+ )
+
+ // Send heartbeat every 10s to prevent stalled proxy streams.
+ const heartbeat = setInterval(() => {
+ q.push(
+ JSON.stringify({
+ type: "server.heartbeat",
+ properties: {},
+ }),
+ )
+ }, 10_000)
+
+ const unsub = Bus.subscribeAll((event) => {
+ q.push(JSON.stringify(event))
+ if (event.type === Bus.InstanceDisposed.type) {
+ stop()
+ }
+ })
+
+ const stop = () => {
+ if (done) return
+ done = true
+ clearInterval(heartbeat)
+ unsub()
+ q.push(null)
+ log.info("event disconnected")
+ }
+
+ stream.onAbort(stop)
+
+ try {
+ for await (const data of q) {
+ if (data === null) return
+ await stream.writeSSE({ data })
+ }
+ } finally {
+ stop()
+ }
+ })
+ },
+ ),
+)
diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts
index 4d019f6a7..4a6a3ebc7 100644
--- a/packages/opencode/src/server/routes/global.ts
+++ b/packages/opencode/src/server/routes/global.ts
@@ -4,6 +4,7 @@ import { streamSSE } from "hono/streaming"
import z from "zod"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
+import { AsyncQueue } from "@/util/queue"
import { Instance } from "../../project/instance"
import { Installation } from "@/installation"
import { Log } from "../../util/log"
@@ -69,41 +70,54 @@ export const GlobalRoutes = lazy(() =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
- stream.writeSSE({
- data: JSON.stringify({
+ const q = new AsyncQueue<string | null>()
+ let done = false
+
+ q.push(
+ JSON.stringify({
payload: {
type: "server.connected",
properties: {},
},
}),
- })
- async function handler(event: any) {
- await stream.writeSSE({
- data: JSON.stringify(event),
- })
- }
- GlobalBus.on("event", handler)
+ )
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
- stream.writeSSE({
- data: JSON.stringify({
+ q.push(
+ JSON.stringify({
payload: {
type: "server.heartbeat",
properties: {},
},
}),
- })
+ )
}, 10_000)
- await new Promise<void>((resolve) => {
- stream.onAbort(() => {
- clearInterval(heartbeat)
- GlobalBus.off("event", handler)
- resolve()
- log.info("global event disconnected")
- })
- })
+ async function handler(event: any) {
+ q.push(JSON.stringify(event))
+ }
+ GlobalBus.on("event", handler)
+
+ const stop = () => {
+ if (done) return
+ done = true
+ clearInterval(heartbeat)
+ GlobalBus.off("event", handler)
+ q.push(null)
+ log.info("event disconnected")
+ }
+
+ stream.onAbort(stop)
+
+ try {
+ for await (const data of q) {
+ if (data === null) return
+ await stream.writeSSE({ data })
+ }
+ } finally {
+ stop()
+ }
})
},
)
diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts
index c485654fd..a68becb1f 100644
--- a/packages/opencode/src/server/server.ts
+++ b/packages/opencode/src/server/server.ts
@@ -1,10 +1,7 @@
-import { BusEvent } from "@/bus/bus-event"
-import { Bus } from "@/bus"
import { Log } from "../util/log"
import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
import { Hono } from "hono"
import { cors } from "hono/cors"
-import { streamSSE } from "hono/streaming"
import { proxy } from "hono/proxy"
import { basicAuth } from "hono/basic-auth"
import z from "zod"
@@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file"
import { ConfigRoutes } from "./routes/config"
import { ExperimentalRoutes } from "./routes/experimental"
import { ProviderRoutes } from "./routes/provider"
+import { EventRoutes } from "./routes/event"
import { InstanceBootstrap } from "../project/bootstrap"
import { NotFoundError } from "../storage/db"
import type { ContentfulStatusCode } from "hono/utils/http-status"
@@ -251,6 +249,7 @@ export namespace Server {
.route("/question", QuestionRoutes())
.route("/provider", ProviderRoutes())
.route("/", FileRoutes())
+ .route("/", EventRoutes())
.route("/mcp", McpRoutes())
.route("/tui", TuiRoutes())
.post(
@@ -498,64 +497,6 @@ export namespace Server {
return c.json(await Format.status())
},
)
- .get(
- "/event",
- describeRoute({
- summary: "Subscribe to events",
- description: "Get events",
- operationId: "event.subscribe",
- responses: {
- 200: {
- description: "Event stream",
- content: {
- "text/event-stream": {
- schema: resolver(BusEvent.payloads()),
- },
- },
- },
- },
- }),
- async (c) => {
- log.info("event connected")
- c.header("X-Accel-Buffering", "no")
- c.header("X-Content-Type-Options", "nosniff")
- return streamSSE(c, async (stream) => {
- stream.writeSSE({
- data: JSON.stringify({
- type: "server.connected",
- properties: {},
- }),
- })
- const unsub = Bus.subscribeAll(async (event) => {
- await stream.writeSSE({
- data: JSON.stringify(event),
- })
- if (event.type === Bus.InstanceDisposed.type) {
- stream.close()
- }
- })
-
- // Send heartbeat every 10s to prevent stalled proxy streams.
- const heartbeat = setInterval(() => {
- stream.writeSSE({
- data: JSON.stringify({
- type: "server.heartbeat",
- properties: {},
- }),
- })
- }, 10_000)
-
- await new Promise<void>((resolve) => {
- stream.onAbort(() => {
- clearInterval(heartbeat)
- unsub()
- resolve()
- log.info("event disconnected")
- })
- })
- })
- },
- )
.all("/*", async (c) => {
const path = c.req.path