summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJames Long <[email protected]>2026-04-15 21:04:37 -0400
committerGitHub <[email protected]>2026-04-15 21:04:37 -0400
commit074ef032eef2cb6a9a9b8dde5626ad5c0080d808 (patch)
tree8a9a51ab3166c02ce8390f2826fedc84e876103d
parent4ca809ef4e71ee6d62990c815c82c7ee57395a8b (diff)
downloadopencode-074ef032eef2cb6a9a9b8dde5626ad5c0080d808.tar.gz
opencode-074ef032eef2cb6a9a9b8dde5626ad5c0080d808.zip
feat(core): add fence to make all methods strongly consistent when syncing (#22679)
-rw-r--r--packages/opencode/src/bus/global.ts16
-rw-r--r--packages/opencode/src/control-plane/util.ts37
-rw-r--r--packages/opencode/src/control-plane/workspace.ts101
-rw-r--r--packages/opencode/src/flag/flag.ts4
-rw-r--r--packages/opencode/src/server/fence.ts81
-rw-r--r--packages/opencode/src/server/instance/middleware.ts9
-rw-r--r--packages/opencode/src/server/proxy.ts47
-rw-r--r--packages/opencode/src/server/server.ts18
-rw-r--r--packages/opencode/test/plugin/workspace-adaptor.test.ts13
9 files changed, 288 insertions, 38 deletions
diff --git a/packages/opencode/src/bus/global.ts b/packages/opencode/src/bus/global.ts
index e751b59fa..b5392a81b 100644
--- a/packages/opencode/src/bus/global.ts
+++ b/packages/opencode/src/bus/global.ts
@@ -1,12 +1,12 @@
import { EventEmitter } from "events"
+export type GlobalEvent = {
+ directory?: string
+ project?: string
+ workspace?: string
+ payload: any
+}
+
export const GlobalBus = new EventEmitter<{
- event: [
- {
- directory?: string
- project?: string
- workspace?: string
- payload: any
- },
- ]
+ event: [GlobalEvent]
}>()
diff --git a/packages/opencode/src/control-plane/util.ts b/packages/opencode/src/control-plane/util.ts
new file mode 100644
index 000000000..023c2ae15
--- /dev/null
+++ b/packages/opencode/src/control-plane/util.ts
@@ -0,0 +1,37 @@
+import { GlobalBus, type GlobalEvent } from "@/bus/global"
+
+export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) {
+ if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted"))
+
+ return new Promise<void>((resolve, reject) => {
+ const abort = () => {
+ cleanup()
+ reject(input.signal?.reason ?? new Error("Request aborted"))
+ }
+
+ const handler = (event: GlobalEvent) => {
+ try {
+ if (!input.fn(event)) return
+ cleanup()
+ resolve()
+ } catch (error) {
+ cleanup()
+ reject(error)
+ }
+ }
+
+ const cleanup = () => {
+ clearTimeout(timeout)
+ GlobalBus.off("event", handler)
+ input.signal?.removeEventListener("abort", abort)
+ }
+
+ const timeout = setTimeout(() => {
+ cleanup()
+ reject(new Error("Timed out waiting for global event"))
+ }, input.timeout)
+
+ GlobalBus.on("event", handler)
+ input.signal?.addEventListener("abort", abort, { once: true })
+ })
+}
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index b9ac0a6b4..67583107f 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -1,7 +1,7 @@
import z from "zod"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
-import { Database, asc, eq } from "@/storage/db"
+import { Database, asc, eq, inArray } from "@/storage/db"
import { Project } from "@/project/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
@@ -22,6 +22,8 @@ import { SessionTable } from "@/session/session.sql"
import { SessionID } from "@/session/schema"
import { errorData } from "@/util/error"
import { AppRuntime } from "@/effect/app-runtime"
+import { EventSequenceTable } from "@/sync/event.sql"
+import { waitEvent } from "./util"
export namespace Workspace {
export const Info = WorkspaceInfo.meta({
@@ -114,6 +116,17 @@ export namespace Workspace {
startSync(info)
+ await waitEvent({
+ timeout: TIMEOUT,
+ fn(event) {
+ if (event.workspace === info.id && event.payload.type === Event.Status.type) {
+ const { status } = event.payload.properties
+ return status === "error" || status === "connected"
+ }
+ return false
+ },
+ })
+
return info
})
@@ -285,10 +298,15 @@ export namespace Workspace {
return spaces
}
- export const get = fn(WorkspaceID.zod, async (id) => {
+ function lookup(id: WorkspaceID) {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
- const space = fromRow(row)
+ return fromRow(row)
+ }
+
+ export const get = fn(WorkspaceID.zod, async (id) => {
+ const space = lookup(id)
+ if (!space) return
startSync(space)
return space
})
@@ -320,12 +338,18 @@ export namespace Workspace {
const connections = new Map<WorkspaceID, ConnectionStatus>()
const aborts = new Map<WorkspaceID, AbortController>()
+ const TIMEOUT = 5000
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
const prev = connections.get(id)
if (prev?.status === status && prev?.error === error) return
const next = { workspaceID: id, status, error }
connections.set(id, next)
+
+ if (status === "error") {
+ aborts.delete(id)
+ }
+
GlobalBus.emit("event", {
directory: "global",
workspace: id,
@@ -340,6 +364,52 @@ export namespace Workspace {
return [...connections.values()]
}
+ function synced(state: Record<string, number>) {
+ const ids = Object.keys(state)
+ if (ids.length === 0) return true
+
+ const done = Object.fromEntries(
+ Database.use((db) =>
+ db
+ .select({
+ id: EventSequenceTable.aggregate_id,
+ seq: EventSequenceTable.seq,
+ })
+ .from(EventSequenceTable)
+ .where(inArray(EventSequenceTable.aggregate_id, ids))
+ .all(),
+ ).map((row) => [row.id, row.seq]),
+ ) as Record<string, number>
+
+ return ids.every((id) => {
+ return (done[id] ?? -1) >= state[id]
+ })
+ }
+
+ export async function isSyncing(workspaceID: WorkspaceID) {
+ return aborts.has(workspaceID)
+ }
+
+ export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
+ if (synced(state)) return
+
+ try {
+ await waitEvent({
+ timeout: TIMEOUT,
+ signal,
+ fn(event) {
+ if (event.workspace !== workspaceID && event.payload.type !== "sync") {
+ return false
+ }
+ return synced(state)
+ },
+ })
+ } catch (error) {
+ if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
+ throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
+ }
+ }
+
const log = Log.create({ service: "workspace-sync" })
function route(url: string | URL, path: string) {
@@ -353,6 +423,7 @@ export namespace Workspace {
async function syncWorkspace(space: Info, signal: AbortSignal) {
while (!signal.aborted) {
log.info("connecting to global sync", { workspace: space.name })
+ setStatus(space.id, "connecting")
const adaptor = await getAdaptor(space.projectID, space.type)
const target = await adaptor.target(space)
@@ -364,7 +435,7 @@ export namespace Workspace {
headers: target.headers,
signal,
}).catch((err: unknown) => {
- setStatus(space.id, "error")
+ setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
log.info("failed to connect to global sync", {
workspace: space.name,
@@ -374,8 +445,9 @@ export namespace Workspace {
})
if (!res || !res.ok || !res.body) {
- log.info("failed to connect to global sync", { workspace: space.name })
- setStatus(space.id, "error")
+ const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
+ log.info("failed to connect to global sync", { workspace: space.name, error })
+ setStatus(space.id, "error", error)
await sleep(1000)
continue
}
@@ -414,22 +486,29 @@ export namespace Workspace {
}
}
- function startSync(space: Info) {
+ async function startSync(space: Info) {
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
- if (space.type === "worktree") {
- void Filesystem.exists(space.directory!).then((exists) => {
+ const adaptor = await getAdaptor(space.projectID, space.type)
+ const target = await adaptor.target(space)
+
+ if (target.type === "local") {
+ void Filesystem.exists(target.directory).then((exists) => {
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
})
return
}
- if (aborts.has(space.id)) return
+ if (aborts.has(space.id)) return true
+
+ setStatus(space.id, "disconnected")
+
const abort = new AbortController()
aborts.set(space.id, abort)
- setStatus(space.id, "disconnected")
void syncWorkspace(space, abort.signal).catch((error) => {
+ aborts.delete(space.id)
+
setStatus(space.id, "error", String(error))
log.warn("workspace listener failed", {
workspaceID: space.id,
diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts
index f091fa02a..a63f8d1c6 100644
--- a/packages/opencode/src/flag/flag.ts
+++ b/packages/opencode/src/flag/flag.ts
@@ -74,7 +74,6 @@ export namespace Flag {
Config.withDefault(false),
)
export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE")
- export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN")
export const OPENCODE_MODELS_URL = process.env["OPENCODE_MODELS_URL"]
export const OPENCODE_MODELS_PATH = process.env["OPENCODE_MODELS_PATH"]
@@ -84,6 +83,9 @@ export namespace Flag {
export const OPENCODE_SKIP_MIGRATIONS = truthy("OPENCODE_SKIP_MIGRATIONS")
export const OPENCODE_STRICT_CONFIG_DEPS = truthy("OPENCODE_STRICT_CONFIG_DEPS")
+ export const OPENCODE_WORKSPACE_ID = process.env["OPENCODE_WORKSPACE_ID"]
+ export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
+
function number(key: string) {
const value = process.env[key]
if (!value) return undefined
diff --git a/packages/opencode/src/server/fence.ts b/packages/opencode/src/server/fence.ts
new file mode 100644
index 000000000..bb41bd7a4
--- /dev/null
+++ b/packages/opencode/src/server/fence.ts
@@ -0,0 +1,81 @@
+import type { MiddlewareHandler } from "hono"
+import { Database, inArray } from "@/storage/db"
+import { EventSequenceTable } from "@/sync/event.sql"
+import { Workspace } from "@/control-plane/workspace"
+import type { WorkspaceID } from "@/control-plane/schema"
+import { Log } from "@/util/log"
+
+const HEADER = "x-opencode-sync"
+type State = Record<string, number>
+const log = Log.create({ service: "fence" })
+
+export function load(ids?: string[]) {
+ const rows = Database.use((db) => {
+ if (!ids?.length) {
+ return db.select().from(EventSequenceTable).all()
+ }
+
+ return db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, ids)).all()
+ })
+
+ return Object.fromEntries(rows.map((row) => [row.aggregate_id, row.seq])) as State
+}
+
+export function diff(prev: State, next: State) {
+ const ids = new Set([...Object.keys(prev), ...Object.keys(next)])
+ return Object.fromEntries(
+ [...ids]
+ .map((id) => [id, next[id] ?? -1] as const)
+ .filter(([id, seq]) => {
+ return (prev[id] ?? -1) !== seq
+ }),
+ ) as State
+}
+
+export function parse(headers: Headers) {
+ const raw = headers.get(HEADER)
+ if (!raw) return
+
+ let data
+
+ try {
+ data = JSON.parse(raw)
+ } catch (err) {
+ return
+ }
+
+ if (!data || typeof data !== "object") return
+
+ return Object.fromEntries(
+ Object.entries(data).filter(([id, seq]) => {
+ return typeof id === "string" && Number.isInteger(seq)
+ }),
+ ) as State
+}
+
+export async function wait(workspaceID: WorkspaceID, state: State, signal?: AbortSignal) {
+ log.info("waiting for state", {
+ workspaceID,
+ state,
+ })
+ await Workspace.waitForSync(workspaceID, state, signal)
+ log.info("state fully synced", {
+ workspaceID,
+ state,
+ })
+}
+
+export const FenceMiddleware: MiddlewareHandler = async (c, next) => {
+ if (c.req.method === "GET" || c.req.method === "HEAD" || c.req.method === "OPTIONS") return next()
+
+ const prev = load()
+ await next()
+ const current = diff(prev, load())
+
+ if (Object.keys(current).length > 0) {
+ log.info("header", {
+ diff: current,
+ })
+ c.res.headers.set(HEADER, JSON.stringify(current))
+ }
+}
diff --git a/packages/opencode/src/server/instance/middleware.ts b/packages/opencode/src/server/instance/middleware.ts
index 549fb38d5..0e29daa9e 100644
--- a/packages/opencode/src/server/instance/middleware.ts
+++ b/packages/opencode/src/server/instance/middleware.ts
@@ -6,6 +6,7 @@ import { Workspace } from "@/control-plane/workspace"
import { ServerProxy } from "../proxy"
import { Instance } from "@/project/instance"
import { InstanceBootstrap } from "@/project/bootstrap"
+import { Flag } from "@/flag/flag"
import { Session } from "@/session"
import { SessionID } from "@/session/schema"
import { WorkspaceContext } from "@/control-plane/workspace-context"
@@ -68,10 +69,10 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
const sessionWorkspaceID = await getSessionWorkspace(url)
const workspaceID = sessionWorkspaceID || url.searchParams.get("workspace")
- if (!workspaceID || url.pathname.startsWith("/console") || OPENCODE_WORKSPACE) {
- if (OPENCODE_WORKSPACE) {
+ if (!workspaceID || url.pathname.startsWith("/console") || Flag.OPENCODE_WORKSPACE_ID) {
+ if (Flag.OPENCODE_WORKSPACE_ID) {
return WorkspaceContext.provide({
- workspaceID: WorkspaceID.make(OPENCODE_WORKSPACE),
+ workspaceID: WorkspaceID.make(Flag.OPENCODE_WORKSPACE_ID),
async fn() {
return Instance.provide({
directory,
@@ -148,6 +149,6 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
headers.delete("x-opencode-workspace")
const req = new Request(c.req.raw, { headers })
- return ServerProxy.http(proxyURL, target.headers, req)
+ return ServerProxy.http(proxyURL, target.headers, req, workspace.id)
}
}
diff --git a/packages/opencode/src/server/proxy.ts b/packages/opencode/src/server/proxy.ts
index 0c0deba20..5effa5d05 100644
--- a/packages/opencode/src/server/proxy.ts
+++ b/packages/opencode/src/server/proxy.ts
@@ -1,6 +1,9 @@
import { Hono } from "hono"
import type { UpgradeWebSocket } from "hono/ws"
import { Log } from "@/util/log"
+import * as Fence from "./fence"
+import type { WorkspaceID } from "@/control-plane/schema"
+import { Workspace } from "@/control-plane/workspace"
const hop = new Set([
"connection",
@@ -101,12 +104,27 @@ const app = (upgrade: UpgradeWebSocket) =>
export namespace ServerProxy {
const log = Log.Default.clone().tag("service", "server-proxy")
- export function http(url: string | URL, extra: HeadersInit | undefined, req: Request) {
+ export async function http(
+ url: string | URL,
+ extra: HeadersInit | undefined,
+ req: Request,
+ workspaceID: WorkspaceID,
+ ) {
console.log("proxy http request", {
method: req.method,
request: req.url,
url: String(url),
})
+
+ if (!Workspace.isSyncing(workspaceID)) {
+ return new Response(`broken sync connection for workspace: ${workspaceID}`, {
+ status: 503,
+ headers: {
+ "content-type": "text/plain; charset=utf-8",
+ },
+ })
+ }
+
return fetch(
new Request(url, {
method: req.method,
@@ -116,21 +134,26 @@ export namespace ServerProxy {
signal: req.signal,
}),
).then((res) => {
+ const sync = Fence.parse(res.headers)
const next = new Headers(res.headers)
next.delete("content-encoding")
next.delete("content-length")
- console.log("proxy http response", {
- method: req.method,
- request: req.url,
- url: String(url),
- status: res.status,
- statusText: res.statusText,
- })
- return new Response(res.body, {
- status: res.status,
- statusText: res.statusText,
- headers: next,
+ const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve()
+
+ return done.then(async () => {
+ console.log("proxy http response", {
+ method: req.method,
+ request: req.url,
+ url: String(url),
+ status: res.status,
+ statusText: res.statusText,
+ })
+ return new Response(res.body, {
+ status: res.status,
+ statusText: res.statusText,
+ headers: next,
+ })
})
})
}
diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts
index 02ec7356e..c6c37ee43 100644
--- a/packages/opencode/src/server/server.ts
+++ b/packages/opencode/src/server/server.ts
@@ -4,9 +4,11 @@ import { adapter } from "#hono"
import { MDNS } from "./mdns"
import { lazy } from "@/util/lazy"
import { AuthMiddleware, CompressionMiddleware, CorsMiddleware, ErrorMiddleware, LoggerMiddleware } from "./middleware"
+import { FenceMiddleware } from "./fence"
import { InstanceRoutes } from "./instance"
import { initProjectors } from "./projectors"
import { Log } from "@/util/log"
+import { Flag } from "@/flag/flag"
import { ControlPlaneRoutes } from "./control"
import { UIRoutes } from "./ui"
@@ -30,6 +32,22 @@ export namespace Server {
function create(opts: { cors?: string[] }) {
const app = new Hono()
const runtime = adapter.create(app)
+
+ if (Flag.OPENCODE_WORKSPACE_ID) {
+ return {
+ app: app
+ .onError(ErrorMiddleware)
+ .use(AuthMiddleware)
+ .use(LoggerMiddleware)
+ .use(CompressionMiddleware)
+ .use(CorsMiddleware(opts))
+ .use(FenceMiddleware)
+ .route("/", ControlPlaneRoutes())
+ .route("/", InstanceRoutes(runtime.upgradeWebSocket)),
+ runtime,
+ }
+ }
+
return {
app: app
.onError(ErrorMiddleware)
diff --git a/packages/opencode/test/plugin/workspace-adaptor.test.ts b/packages/opencode/test/plugin/workspace-adaptor.test.ts
index 669a822a2..ff8df7490 100644
--- a/packages/opencode/test/plugin/workspace-adaptor.test.ts
+++ b/packages/opencode/test/plugin/workspace-adaptor.test.ts
@@ -7,10 +7,16 @@ import { tmpdir } from "../fixture/fixture"
const disableDefault = process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS
process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = "1"
+const { Flag } = await import("../../src/flag/flag")
const { Plugin } = await import("../../src/plugin/index")
const { Workspace } = await import("../../src/control-plane/workspace")
const { Instance } = await import("../../src/project/instance")
+const experimental = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
+
+// @ts-expect-error tests override the flag directly
+Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
+
afterEach(async () => {
await Instance.disposeAll()
})
@@ -18,9 +24,12 @@ afterEach(async () => {
afterAll(() => {
if (disableDefault === undefined) {
delete process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS
- return
+ } else {
+ process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault
}
- process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault
+
+ // @ts-expect-error restore original test flag value
+ Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = experimental
})
describe("plugin.workspace", () => {