summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJames Long <[email protected]>2026-04-30 11:44:58 -0400
committerGitHub <[email protected]>2026-04-30 11:44:58 -0400
commit53e9cac383859f7bb33f771db3ac6967cf4a98a7 (patch)
treeba7be22855bfef1cde2479d7429457a8ead52481
parentfe0c182747bfd28b10c58cf6abb8345c66fcebe4 (diff)
downloadopencode-53e9cac383859f7bb33f771db3ac6967cf4a98a7.tar.gz
opencode-53e9cac383859f7bb33f771db3ac6967cf4a98a7.zip
refactor(core): convert control-plane workspace to Effect (#25018)
-rw-r--r--packages/opencode/src/control-plane/adaptors/index.ts13
-rw-r--r--packages/opencode/src/control-plane/adaptors/worktree.ts18
-rw-r--r--packages/opencode/src/control-plane/dev/README.md20
-rw-r--r--packages/opencode/src/control-plane/sse.ts66
-rw-r--r--packages/opencode/src/control-plane/util.ts14
-rw-r--r--packages/opencode/src/control-plane/workspace.ts1153
-rw-r--r--packages/opencode/src/effect/app-runtime.ts2
-rw-r--r--packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts4
-rw-r--r--packages/opencode/test/control-plane/sse.test.ts56
-rw-r--r--packages/opencode/test/control-plane/workspace.test.ts1391
-rw-r--r--packages/opencode/test/workspace/workspace-restore.test.ts283
11 files changed, 2155 insertions, 865 deletions
diff --git a/packages/opencode/src/control-plane/adaptors/index.ts b/packages/opencode/src/control-plane/adaptors/index.ts
index 651d09cc2..c91f534b5 100644
--- a/packages/opencode/src/control-plane/adaptors/index.ts
+++ b/packages/opencode/src/control-plane/adaptors/index.ts
@@ -1,27 +1,26 @@
-import { lazy } from "@/util/lazy"
import type { ProjectID } from "@/project/schema"
import type { WorkspaceAdaptor, WorkspaceAdaptorEntry } from "../types"
+import { WorktreeAdaptor } from "./worktree"
-const BUILTIN: Record<string, () => Promise<WorkspaceAdaptor>> = {
- worktree: lazy(async () => (await import("./worktree")).WorktreeAdaptor),
+const BUILTIN: Record<string, WorkspaceAdaptor> = {
+ worktree: WorktreeAdaptor,
}
const state = new Map<ProjectID, Map<string, WorkspaceAdaptor>>()
-export async function getAdaptor(projectID: ProjectID, type: string): Promise<WorkspaceAdaptor> {
+export function getAdaptor(projectID: ProjectID, type: string): WorkspaceAdaptor {
const custom = state.get(projectID)?.get(type)
if (custom) return custom
const builtin = BUILTIN[type]
- if (builtin) return builtin()
+ if (builtin) return builtin
throw new Error(`Unknown workspace adaptor: ${type}`)
}
export async function listAdaptors(projectID: ProjectID): Promise<WorkspaceAdaptorEntry[]> {
const builtin = await Promise.all(
- Object.entries(BUILTIN).map(async ([type, init]) => {
- const adaptor = await init()
+ Object.entries(BUILTIN).map(async ([type, adaptor]) => {
return {
type,
name: adaptor.name,
diff --git a/packages/opencode/src/control-plane/adaptors/worktree.ts b/packages/opencode/src/control-plane/adaptors/worktree.ts
index 9c080daa3..de9618d30 100644
--- a/packages/opencode/src/control-plane/adaptors/worktree.ts
+++ b/packages/opencode/src/control-plane/adaptors/worktree.ts
@@ -1,6 +1,4 @@
import { Schema } from "effect"
-import { AppRuntime } from "@/effect/app-runtime"
-import { Worktree } from "@/worktree"
import { type WorkspaceAdaptor, WorkspaceInfo } from "../types"
const WorktreeConfig = Schema.Struct({
@@ -10,19 +8,26 @@ const WorktreeConfig = Schema.Struct({
})
const decodeWorktreeConfig = Schema.decodeUnknownSync(WorktreeConfig)
+async function loadWorktree() {
+ const [{ AppRuntime }, { Worktree }] = await Promise.all([import("@/effect/app-runtime"), import("@/worktree")])
+ return { AppRuntime, Worktree }
+}
+
export const WorktreeAdaptor: WorkspaceAdaptor = {
name: "Worktree",
description: "Create a git worktree",
async configure(info) {
- const worktree = await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.makeWorktreeInfo()))
+ const { AppRuntime, Worktree } = await loadWorktree()
+ const next = await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.makeWorktreeInfo()))
return {
...info,
- name: worktree.name,
- branch: worktree.branch,
- directory: worktree.directory,
+ name: next.name,
+ branch: next.branch,
+ directory: next.directory,
}
},
async create(info) {
+ const { AppRuntime, Worktree } = await loadWorktree()
const config = decodeWorktreeConfig(info)
await AppRuntime.runPromise(
Worktree.Service.use((svc) =>
@@ -35,6 +40,7 @@ export const WorktreeAdaptor: WorkspaceAdaptor = {
)
},
async remove(info) {
+ const { AppRuntime, Worktree } = await loadWorktree()
const config = decodeWorktreeConfig(info)
await AppRuntime.runPromise(Worktree.Service.use((svc) => svc.remove({ directory: config.directory })))
},
diff --git a/packages/opencode/src/control-plane/dev/README.md b/packages/opencode/src/control-plane/dev/README.md
new file mode 100644
index 000000000..dbd62c0b1
--- /dev/null
+++ b/packages/opencode/src/control-plane/dev/README.md
@@ -0,0 +1,20 @@
+
+This is a plugin to simulate a remote environment locally. Add this to `.opencode/opencode.jsonc`:
+
+```json
+ "plugin": ["../packages/opencode/src/control-plane/dev/debug-workspace-plugin.ts"],
+```
+
+In a separate terminal, run a separate OpenCode server. This will act like a remote server and the local instance will proxy all requests to it:
+
+```
+./packages/opencode/script/run-workspace-server
+```
+
+With the plugin install, you can now run OpenCode and create a `debug` workspace type. This will create a "remote" workspace which talks to the second workspace server started above.
+
+How this works:
+
+* The workspace server needs to know the workspace id and port to run. It waits for this information to be written to a file and starts the server when the data is written.
+* The debug plugin writes this information in the `create` call to the workspace. So create a `debug` workspace will always kick off a new external server.
+* The server script watches for file changes, so whenver you create a new `debug` workspace it will restart with the new information. This means that there is only ever one working `debug` workspace at a time; when you create a new one all previous sessions will show that it can't connect because previous debug workspaces do not exist. \ No newline at end of file
diff --git a/packages/opencode/src/control-plane/sse.ts b/packages/opencode/src/control-plane/sse.ts
deleted file mode 100644
index 003093a00..000000000
--- a/packages/opencode/src/control-plane/sse.ts
+++ /dev/null
@@ -1,66 +0,0 @@
-export async function parseSSE(
- body: ReadableStream<Uint8Array>,
- signal: AbortSignal,
- onEvent: (event: unknown) => void,
-) {
- const reader = body.getReader()
- const decoder = new TextDecoder()
- let buf = ""
- let last = ""
- let retry = 1000
-
- const abort = () => {
- void reader.cancel().catch(() => undefined)
- }
-
- signal.addEventListener("abort", abort)
-
- try {
- while (!signal.aborted) {
- const chunk = await reader.read().catch(() => ({ done: true, value: undefined as Uint8Array | undefined }))
- if (chunk.done) break
-
- buf += decoder.decode(chunk.value, { stream: true })
- buf = buf.replace(/\r\n/g, "\n").replace(/\r/g, "\n")
-
- const chunks = buf.split("\n\n")
- buf = chunks.pop() ?? ""
-
- chunks.forEach((chunk) => {
- const data: string[] = []
- chunk.split("\n").forEach((line) => {
- if (line.startsWith("data:")) {
- data.push(line.replace(/^data:\s*/, ""))
- return
- }
- if (line.startsWith("id:")) {
- last = line.replace(/^id:\s*/, "")
- return
- }
- if (line.startsWith("retry:")) {
- const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10)
- if (!Number.isNaN(parsed)) retry = parsed
- }
- })
-
- if (!data.length) return
- const raw = data.join("\n")
- try {
- onEvent(JSON.parse(raw))
- } catch {
- onEvent({
- type: "sse.message",
- properties: {
- data: raw,
- id: last || undefined,
- retry,
- },
- })
- }
- })
- }
- } finally {
- signal.removeEventListener("abort", abort)
- reader.releaseLock()
- }
-}
diff --git a/packages/opencode/src/control-plane/util.ts b/packages/opencode/src/control-plane/util.ts
index 023c2ae15..35bc87163 100644
--- a/packages/opencode/src/control-plane/util.ts
+++ b/packages/opencode/src/control-plane/util.ts
@@ -1,22 +1,23 @@
import { GlobalBus, type GlobalEvent } from "@/bus/global"
+import { Effect } from "effect"
export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (event: GlobalEvent) => boolean }) {
- if (input.signal?.aborted) return Promise.reject(input.signal.reason ?? new Error("Request aborted"))
+ if (input.signal?.aborted) return Effect.fail(input.signal.reason ?? new Error("Request aborted"))
- return new Promise<void>((resolve, reject) => {
+ return Effect.callback<void, unknown>((resume) => {
const abort = () => {
cleanup()
- reject(input.signal?.reason ?? new Error("Request aborted"))
+ resume(Effect.fail(input.signal?.reason ?? new Error("Request aborted")))
}
const handler = (event: GlobalEvent) => {
try {
if (!input.fn(event)) return
cleanup()
- resolve()
+ resume(Effect.void)
} catch (error) {
cleanup()
- reject(error)
+ resume(Effect.fail(error))
}
}
@@ -28,10 +29,11 @@ export function waitEvent(input: { timeout: number; signal?: AbortSignal; fn: (e
const timeout = setTimeout(() => {
cleanup()
- reject(new Error("Timed out waiting for global event"))
+ resume(Effect.fail(new Error("Timed out waiting for global event")))
}, input.timeout)
GlobalBus.on("event", handler)
input.signal?.addEventListener("abort", abort, { once: true })
+ return Effect.sync(cleanup)
})
}
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index c56ff2631..2d8c57044 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -1,5 +1,5 @@
-import { Schema } from "effect"
-import { setTimeout as sleep } from "node:timers/promises"
+import { Context, Effect, FiberMap, Layer, Schema, Stream } from "effect"
+import { FetchHttpClient, HttpBody, HttpClient, HttpClientError, HttpClientRequest } from "effect/unstable/http"
import { fn } from "@/util/fn"
import { Database } from "@/storage/db"
import { asc } from "drizzle-orm"
@@ -20,12 +20,11 @@ import { WorkspaceTable } from "./workspace.sql"
import { getAdaptor } from "./adaptors"
import { type WorkspaceInfo, WorkspaceInfo as WorkspaceInfoSchema } from "./types"
import { WorkspaceID } from "./schema"
-import { parseSSE } from "./sse"
import { Session } from "@/session/session"
import { SessionTable } from "@/session/session.sql"
import { SessionID } from "@/session/schema"
import { errorData } from "@/util/error"
-import { AppRuntime } from "@/effect/app-runtime"
+import { makeRuntime } from "@/effect/run-service"
import { waitEvent } from "./util"
import { WorkspaceContext } from "./workspace-context"
import { NonNegativeInt, withStatics } from "@/util/schema"
@@ -76,6 +75,11 @@ function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
}
}
+const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
+ Effect.sync(() => Database.use(fn))
+
+const log = Log.create({ service: "workspace-sync" })
+
export const CreateInput = Schema.Struct({
id: Schema.optional(WorkspaceID),
type: Info.fields.type,
@@ -85,286 +89,739 @@ export const CreateInput = Schema.Struct({
}).pipe(withStatics((s) => ({ zod: effectZod(s), zodObject: zodObject(s) })))
export type CreateInput = Schema.Schema.Type<typeof CreateInput>
-export const create = fn(CreateInput.zod, async (input) => {
- const id = WorkspaceID.ascending(input.id)
- const adaptor = await getAdaptor(input.projectID, input.type)
+export const SessionRestoreInput = Schema.Struct({
+ workspaceID: WorkspaceID,
+ sessionID: SessionID,
+}).pipe(withStatics((s) => ({ zod: effectZod(s), zodObject: zodObject(s) })))
+export type SessionRestoreInput = Schema.Schema.Type<typeof SessionRestoreInput>
- const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null })
+export class SyncHttpError extends Schema.TaggedErrorClass<SyncHttpError>()("WorkspaceSyncHttpError", {
+ message: Schema.String,
+ status: Schema.Number,
+ body: Schema.optional(Schema.String),
+}) {}
- const info: Info = {
- id,
- type: config.type,
- branch: config.branch ?? null,
- name: config.name ?? null,
- directory: config.directory ?? null,
- extra: config.extra ?? null,
- projectID: input.projectID,
- }
+export class WorkspaceNotFoundError extends Schema.TaggedErrorClass<WorkspaceNotFoundError>()("WorkspaceNotFoundError", {
+ message: Schema.String,
+ workspaceID: WorkspaceID,
+}) {}
+
+export class SessionEventsNotFoundError extends Schema.TaggedErrorClass<SessionEventsNotFoundError>()(
+ "WorkspaceSessionEventsNotFoundError",
+ {
+ message: Schema.String,
+ sessionID: SessionID,
+ },
+) {}
+
+export class SessionRestoreHttpError extends Schema.TaggedErrorClass<SessionRestoreHttpError>()(
+ "WorkspaceSessionRestoreHttpError",
+ {
+ message: Schema.String,
+ workspaceID: WorkspaceID,
+ sessionID: SessionID,
+ status: Schema.Number,
+ body: Schema.String,
+ },
+) {}
+
+export class SyncTimeoutError extends Schema.TaggedErrorClass<SyncTimeoutError>()("WorkspaceSyncTimeoutError", {
+ message: Schema.String,
+ state: Schema.Record(Schema.String, Schema.Number),
+}) {}
+
+export class SyncAbortedError extends Schema.TaggedErrorClass<SyncAbortedError>()("WorkspaceSyncAbortedError", {
+ message: Schema.String,
+ cause: Schema.optional(Schema.Defect),
+}) {}
+
+type CreateError = Auth.AuthError
+type SessionRestoreError =
+ | WorkspaceNotFoundError
+ | SessionEventsNotFoundError
+ | SessionRestoreHttpError
+ | HttpClientError.HttpClientError
+type WaitForSyncError = SyncTimeoutError | SyncAbortedError
+type SyncLoopError = SyncHttpError | HttpClientError.HttpClientError
+
+export interface Interface {
+ readonly create: (input: CreateInput) => Effect.Effect<Info, CreateError>
+ readonly sessionRestore: (input: SessionRestoreInput) => Effect.Effect<{ total: number }, SessionRestoreError>
+ readonly list: (project: Project.Info) => Effect.Effect<Info[]>
+ readonly get: (id: WorkspaceID) => Effect.Effect<Info | undefined>
+ readonly remove: (id: WorkspaceID) => Effect.Effect<Info | undefined>
+ readonly status: () => Effect.Effect<ConnectionStatus[]>
+ readonly isSyncing: (workspaceID: WorkspaceID) => Effect.Effect<boolean>
+ readonly waitForSync: (
+ workspaceID: WorkspaceID,
+ state: Record<string, number>,
+ signal?: AbortSignal,
+ ) => Effect.Effect<void, WaitForSyncError>
+ readonly startWorkspaceSyncing: (projectID: ProjectID) => Effect.Effect<void>
+}
+
+export class Service extends Context.Service<Service, Interface>()("@opencode/Workspace") {}
- Database.use((db) => {
- db.insert(WorkspaceTable)
- .values({
- id: info.id,
- type: info.type,
- branch: info.branch,
- name: info.name,
- directory: info.directory,
- extra: info.extra,
- project_id: info.projectID,
+export const layer = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const auth = yield* Auth.Service
+ const session = yield* Session.Service
+ const http = yield* HttpClient.HttpClient
+ const connections = new Map<WorkspaceID, ConnectionStatus>()
+ const syncFibers = yield* FiberMap.make<WorkspaceID, void, SyncLoopError>()
+
+ const setStatus = (id: WorkspaceID, status: ConnectionStatus["status"]) => {
+ const prev = connections.get(id)
+ if (prev?.status === status) return
+ const next = { workspaceID: id, status }
+ connections.set(id, next)
+
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: id,
+ payload: {
+ type: Event.Status.type,
+ properties: next,
+ },
})
- .run()
- })
+ }
- const env = {
- OPENCODE_AUTH_CONTENT: JSON.stringify(await AppRuntime.runPromise(Auth.Service.use((auth) => auth.all()))),
- OPENCODE_WORKSPACE_ID: config.id,
- OPENCODE_EXPERIMENTAL_WORKSPACES: "true",
- OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS,
- OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT,
- OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES,
- }
- await adaptor.create(config, env)
+ const connectSSE = Effect.fn("Workspace.connectSSE")(function* (
+ url: URL | string,
+ headers: HeadersInit | undefined,
+ ) {
+ const response = yield* http.execute(
+ HttpClientRequest.get(route(url, "/global/event"), {
+ headers: new Headers(headers),
+ accept: "text/event-stream",
+ }),
+ )
+ if (response.status < 200 || response.status >= 300) {
+ return yield* new SyncHttpError({
+ message: `Workspace sync HTTP failure: ${response.status}`,
+ status: response.status,
+ })
+ }
+ return response.stream
+ })
+
+ const parseSSE = Effect.fn("Workspace.parseSSE")(function* (
+ stream: Stream.Stream<Uint8Array, unknown>,
+ onEvent: (event: unknown) => Effect.Effect<void>,
+ ) {
+ yield* stream.pipe(
+ Stream.decodeText(),
+ Stream.splitLines,
+ Stream.mapAccum(
+ () => ({ data: [] as string[], id: undefined as string | undefined, retry: 1000 }),
+ (state, line) => {
+ if (line === "") {
+ if (!state.data.length) return [state, []]
+ return [{ ...state, data: [] }, [{ data: state.data.join("\n"), id: state.id, retry: state.retry }]]
+ }
+
+ const index = line.indexOf(":")
+ const field = index === -1 ? line : line.slice(0, index)
+ const value = index === -1 ? "" : line.slice(index + (line[index + 1] === " " ? 2 : 1))
+
+ if (field === "data") return [{ ...state, data: [...state.data, value] }, []]
+ if (field === "id") return [{ ...state, id: value }, []]
+ if (field === "retry") {
+ const retry = Number.parseInt(value, 10)
+ return [Number.isNaN(retry) ? state : { ...state, retry }, []]
+ }
+ return [state, []]
+ },
+ {
+ onHalt: (state) =>
+ state.data.length ? [{ data: state.data.join("\n"), id: state.id, retry: state.retry }] : [],
+ },
+ ),
+ Stream.map((event) => {
+ try {
+ return JSON.parse(event.data) as unknown
+ } catch {
+ return {
+ type: "sse.message",
+ properties: {
+ data: event.data,
+ id: event.id || undefined,
+ retry: event.retry,
+ },
+ }
+ }
+ }),
+ Stream.runForEach(onEvent),
+ )
+ })
+
+ const syncHistory = Effect.fn("Workspace.syncHistory")(function* (
+ space: Info,
+ url: URL | string,
+ headers: HeadersInit | undefined,
+ ) {
+ const sessionIDs = yield* db((db) =>
+ db
+ .select({ id: SessionTable.id })
+ .from(SessionTable)
+ .where(eq(SessionTable.workspace_id, space.id))
+ .all()
+ .map((row) => row.id),
+ )
+ const state = sessionIDs.length
+ ? Object.fromEntries(
+ (yield* db((db) =>
+ db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(),
+ )).map((row) => [row.aggregate_id, row.seq]),
+ )
+ : {}
- startSync(info)
+ log.info("syncing workspace history", {
+ workspaceID: space.id,
+ sessions: sessionIDs.length,
+ known: Object.keys(state).length,
+ })
- await waitEvent({
- timeout: TIMEOUT,
- fn(event) {
- if (event.workspace === info.id && event.payload.type === Event.Status.type) {
- const { status } = event.payload.properties
- return status === "error" || status === "connected"
+ const response = yield* http.execute(
+ HttpClientRequest.post(route(url, "/sync/history"), {
+ headers: new Headers(headers),
+ body: HttpBody.jsonUnsafe(state),
+ }),
+ )
+
+ if (response.status < 200 || response.status >= 300) {
+ const body = yield* response.text
+ return yield* new SyncHttpError({
+ message: `Workspace history HTTP failure: ${response.status} ${body}`,
+ status: response.status,
+ body,
+ })
}
- return false
- },
- })
- return info
-})
+ const events = (yield* response.json) as HistoryEvent[]
-export const SessionRestoreInput = Schema.Struct({
- workspaceID: WorkspaceID,
- sessionID: SessionID,
-}).pipe(withStatics((s) => ({ zod: effectZod(s), zodObject: zodObject(s) })))
-export type SessionRestoreInput = Schema.Schema.Type<typeof SessionRestoreInput>
+ log.info("workspace history synced", {
+ workspaceID: space.id,
+ events: events.length,
+ })
-export const sessionRestore = fn(SessionRestoreInput.zod, async (input) => {
- log.info("session restore requested", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- })
- try {
- const space = await get(input.workspaceID)
- if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
-
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
-
- // Need to switch the workspace of the session
- SyncEvent.run(Session.Event.Updated, {
- sessionID: input.sessionID,
- info: {
- workspaceID: input.workspaceID,
- },
+ yield* Effect.sync(() =>
+ WorkspaceContext.provide({
+ workspaceID: space.id,
+ fn: () => {
+ for (const event of events) {
+ SyncEvent.replay(
+ {
+ id: event.id,
+ aggregateID: event.aggregate_id,
+ seq: event.seq,
+ type: event.type,
+ data: event.data,
+ },
+ { publish: true },
+ )
+ }
+ },
+ }),
+ )
})
- const rows = Database.use((db) =>
- db
- .select({
- id: EventTable.id,
- aggregateID: EventTable.aggregate_id,
- seq: EventTable.seq,
- type: EventTable.type,
- data: EventTable.data,
- })
- .from(EventTable)
- .where(eq(EventTable.aggregate_id, input.sessionID))
- .orderBy(asc(EventTable.seq))
- .all(),
- )
- if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
-
- const all = rows
-
- const size = 10
- const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
- const total = sets.length
- log.info("session restore prepared", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- workspaceType: space.type,
- directory: space.directory,
- target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
- events: all.length,
- batches: total,
- first: all[0]?.seq,
- last: all.at(-1)?.seq,
+ const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) {
+ const adaptor = getAdaptor(space.projectID, space.type)
+ const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
+
+ if (target.type === "local") return
+
+ let attempt = 0
+
+ while (true) {
+ log.info("connecting to global sync", { workspace: space.name })
+ setStatus(space.id, "connecting")
+
+ const stream = yield* connectSSE(target.url, target.headers).pipe(
+ Effect.tap(() => syncHistory(space, target.url, target.headers)),
+ Effect.catch((err) =>
+ Effect.sync(() => {
+ setStatus(space.id, "error")
+ log.info("failed to connect to global sync", {
+ workspace: space.name,
+ err,
+ })
+ return null
+ }),
+ ),
+ )
+
+ if (stream) {
+ attempt = 0
+
+ log.info("global sync connected", { workspace: space.name })
+ setStatus(space.id, "connected")
+
+ yield* parseSSE(stream, (evt) =>
+ Effect.sync(() => {
+ try {
+ if (!evt || typeof evt !== "object" || !("payload" in evt)) return
+ const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
+ if (payload.type === "server.heartbeat") return
+
+ if (payload.type === "sync" && payload.syncEvent) {
+ SyncEvent.replay(payload.syncEvent)
+ }
+
+ const event = evt as { directory?: string; project?: string; payload: unknown }
+ GlobalBus.emit("event", {
+ directory: event.directory,
+ project: event.project,
+ workspace: space.id,
+ payload: event.payload,
+ })
+ } catch (err) {
+ log.info("failed to replay global event", {
+ workspaceID: space.id,
+ error: err,
+ })
+ }
+ }),
+ )
+
+ log.info("disconnected from global sync: " + space.id)
+ setStatus(space.id, "disconnected")
+ }
+
+ // Back off reconnect attempts up to 2 minutes while the workspace
+ // stays unavailable.
+ yield* Effect.sleep(`${Math.min(120_000, 1_000 * 2 ** attempt)} millis`)
+ attempt += 1
+ }
})
- GlobalBus.emit("event", {
- directory: "global",
- workspace: input.workspaceID,
- payload: {
- type: Event.Restore.type,
- properties: {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- total,
- step: 0,
- },
- },
+
+ const startSync = Effect.fn("Workspace.startSync")(function* (space: Info) {
+ if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
+
+ const adaptor = getAdaptor(space.projectID, space.type)
+ const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
+
+ if (target.type === "local") {
+ setStatus(space.id, (yield* Effect.promise(() => Filesystem.exists(target.directory))) ? "connected" : "error")
+ return
+ }
+
+ const exists = yield* FiberMap.has(syncFibers, space.id)
+ if (exists && connections.get(space.id)?.status !== "error") return
+
+ setStatus(space.id, "disconnected")
+
+ yield* FiberMap.run(
+ syncFibers,
+ space.id,
+ // TODO: look into `tapError` to set the status but still
+ // allow the fiber to fail and automatically get removed
+ syncWorkspaceLoop(space).pipe(
+ Effect.catch((error) =>
+ Effect.sync(() => {
+ setStatus(space.id, "error")
+ log.warn("workspace listener failed", {
+ workspaceID: space.id,
+ error,
+ })
+ }),
+ ),
+ ),
+ )
})
- for (const [i, events] of sets.entries()) {
- log.info("session restore batch starting", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- step: i + 1,
- total,
- events: events.length,
- first: events[0]?.seq,
- last: events.at(-1)?.seq,
- target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
+
+ const stopSync = Effect.fn("Workspace.stopSync")(function* (id: WorkspaceID) {
+ yield* FiberMap.remove(syncFibers, id)
+ connections.delete(id)
+ })
+
+ const create = Effect.fn("Workspace.create")(function* (input: CreateInput) {
+ const id = WorkspaceID.ascending(input.id)
+ const adaptor = getAdaptor(input.projectID, input.type)
+ const config = yield* Effect.promise(() =>
+ Promise.resolve(adaptor.configure({ ...input, id, name: Slug.create(), directory: null })),
+ )
+
+ const info: Info = {
+ id,
+ type: config.type,
+ branch: config.branch ?? null,
+ name: config.name ?? null,
+ directory: config.directory ?? null,
+ extra: config.extra ?? null,
+ projectID: input.projectID,
+ }
+
+ yield* db((db) => {
+ db.insert(WorkspaceTable)
+ .values({
+ id: info.id,
+ type: info.type,
+ branch: info.branch,
+ name: info.name,
+ directory: info.directory,
+ extra: info.extra,
+ project_id: info.projectID,
+ })
+ .run()
})
- if (target.type === "local") {
- SyncEvent.replayAll(events)
- log.info("session restore batch replayed locally", {
+
+ const env = {
+ OPENCODE_AUTH_CONTENT: JSON.stringify(yield* auth.all()),
+ OPENCODE_WORKSPACE_ID: config.id,
+ OPENCODE_EXPERIMENTAL_WORKSPACES: "true",
+ OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS,
+ OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT,
+ OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES,
+ }
+
+ yield* Effect.promise(() => adaptor.create(config, env))
+ yield* Effect.all(
+ [
+ waitEvent({
+ timeout: TIMEOUT,
+ fn(event) {
+ if (event.workspace === info.id && event.payload.type === Event.Status.type) {
+ const { status } = event.payload.properties
+ return status === "error" || status === "connected"
+ }
+ return false
+ },
+ }),
+ startSync(info),
+ ],
+ { concurrency: 2, discard: true },
+ )
+
+ return info
+ })
+
+ const sessionRestore = Effect.fn("Workspace.sessionRestore")(function* (input: SessionRestoreInput) {
+ return yield* Effect.gen(function* () {
+ log.info("session restore requested", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
- step: i + 1,
- total,
- events: events.length,
})
- } else {
- const url = route(target.url, "/sync/replay")
- const headers = new Headers(target.headers)
- headers.set("content-type", "application/json")
- const res = await fetch(url, {
- method: "POST",
- headers,
- body: JSON.stringify({
- directory: space.directory ?? "",
- events,
+
+ const space = yield* get(input.workspaceID)
+ if (!space)
+ return yield* new WorkspaceNotFoundError({
+ message: `Workspace not found: ${input.workspaceID}`,
+ workspaceID: input.workspaceID,
+ })
+
+ const adaptor = getAdaptor(space.projectID, space.type)
+ const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
+
+ yield* Effect.sync(() =>
+ SyncEvent.run(Session.Event.Updated, {
+ sessionID: input.sessionID,
+ info: {
+ workspaceID: input.workspaceID,
+ },
}),
+ )
+
+ const rows = yield* db((db) =>
+ db
+ .select({
+ id: EventTable.id,
+ aggregateID: EventTable.aggregate_id,
+ seq: EventTable.seq,
+ type: EventTable.type,
+ data: EventTable.data,
+ })
+ .from(EventTable)
+ .where(eq(EventTable.aggregate_id, input.sessionID))
+ .orderBy(asc(EventTable.seq))
+ .all(),
+ )
+ if (rows.length === 0)
+ return yield* new SessionEventsNotFoundError({
+ message: `No events found for session: ${input.sessionID}`,
+ sessionID: input.sessionID,
+ })
+
+ const size = 10
+ // TODO: look into using effect APIs to process this in chunks
+ const sets = Array.from({ length: Math.ceil(rows.length / size) }, (_, i) =>
+ rows.slice(i * size, (i + 1) * size),
+ )
+ const total = sets.length
+
+ log.info("session restore prepared", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ workspaceType: space.type,
+ directory: space.directory,
+ target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
+ events: rows.length,
+ batches: total,
+ first: rows[0]?.seq,
+ last: rows.at(-1)?.seq,
})
- if (!res.ok) {
- const body = await res.text()
- log.error("session restore batch failed", {
+
+ yield* Effect.sync(() =>
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: input.workspaceID,
+ payload: {
+ type: Event.Restore.type,
+ properties: {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ total,
+ step: 0,
+ },
+ },
+ }),
+ )
+
+ for (const [i, events] of sets.entries()) {
+ log.info("session restore batch starting", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
step: i + 1,
total,
- status: res.status,
- body,
+ events: events.length,
+ first: events[0]?.seq,
+ last: events.at(-1)?.seq,
+ target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
})
- throw new Error(
- `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
+
+ if (target.type === "local") {
+ SyncEvent.replayAll(events)
+ log.info("session restore batch replayed locally", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ events: events.length,
+ })
+ } else {
+ const url = route(target.url, "/sync/replay")
+ const res = yield* http.execute(
+ HttpClientRequest.post(url, {
+ headers: new Headers(target.headers),
+ body: HttpBody.jsonUnsafe({
+ directory: space.directory ?? "",
+ events,
+ }),
+ }),
+ )
+
+ if (res.status < 200 || res.status >= 300) {
+ const body = yield* res.text
+ log.error("session restore batch failed", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ status: res.status,
+ body,
+ })
+ return yield* new SessionRestoreHttpError({
+ message: `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ status: res.status,
+ body,
+ })
+ }
+
+ log.info("session restore batch posted", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ status: res.status,
+ })
+ }
+
+ yield* Effect.sync(() =>
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: input.workspaceID,
+ payload: {
+ type: Event.Restore.type,
+ properties: {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ total,
+ step: i + 1,
+ },
+ },
+ }),
)
}
- log.info("session restore batch posted", {
+
+ log.info("session restore complete", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
- step: i + 1,
- total,
- status: res.status,
+ batches: total,
})
- }
- GlobalBus.emit("event", {
- directory: "global",
- workspace: input.workspaceID,
- payload: {
- type: Event.Restore.type,
- properties: {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- total,
- step: i + 1,
- },
- },
- })
- }
- log.info("session restore complete", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- batches: total,
+ return { total }
+ }).pipe(
+ Effect.tapError((err) =>
+ Effect.sync(() =>
+ log.error("session restore failed", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ error: errorData(err),
+ }),
+ ),
+ ),
+ )
})
- return {
- total,
- }
- } catch (err) {
- log.error("session restore failed", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- error: errorData(err),
+ const list = Effect.fn("Workspace.list")(function* (project: Project.Info) {
+ return yield* db((db) =>
+ db
+ .select()
+ .from(WorkspaceTable)
+ .where(eq(WorkspaceTable.project_id, project.id))
+ .all()
+ .map(fromRow)
+ .sort((a, b) => a.id.localeCompare(b.id)),
+ )
})
- throw err
- }
-})
-export function list(project: Project.Info) {
- const rows = Database.use((db) =>
- db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
- )
- const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
- return spaces
-}
+ const get = Effect.fn("Workspace.get")(function* (id: WorkspaceID) {
+ const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
+ if (!row) return
+ return fromRow(row)
+ })
-export const get = fn(WorkspaceID.zod, async (id) => {
- const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
- if (!row) return
- return fromRow(row)
-})
+ const remove = Effect.fn("Workspace.remove")(function* (id: WorkspaceID) {
+ const sessions = yield* db((db) =>
+ db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
+ )
+ yield* Effect.forEach(sessions, (sessionInfo) => session.remove(sessionInfo.id), { discard: true })
+
+ const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
+ if (!row) return
+
+ yield* stopSync(id)
+
+ const info = fromRow(row)
+ yield* Effect.catch(
+ Effect.gen(function* () {
+ const adaptor = getAdaptor(info.projectID, row.type)
+ yield* Effect.tryPromise(() => Promise.resolve(adaptor.remove(info)))
+ }),
+ () =>
+ Effect.sync(() => {
+ log.error("adaptor not available when removing workspace", { type: row.type })
+ }),
+ )
-export const remove = fn(WorkspaceID.zod, async (id) => {
- const sessions = Database.use((db) =>
- db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
- )
- for (const session of sessions) {
- await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
- }
+ yield* db((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
+ return info
+ })
- const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
+ const status = Effect.fn("Workspace.status")(function* () {
+ return [...connections.values()]
+ })
- if (row) {
- stopSync(id)
+ const isSyncing = Effect.fn("Workspace.isSyncing")(function* (workspaceID: WorkspaceID) {
+ const exists = yield* FiberMap.has(syncFibers, workspaceID)
+ return exists && connections.get(workspaceID)?.status !== "error"
+ })
- const info = fromRow(row)
- try {
- const adaptor = await getAdaptor(info.projectID, row.type)
- await adaptor.remove(info)
- } catch {
- log.error("adaptor not available when removing workspace", { type: row.type })
- }
- Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
- return info
- }
-})
+ const waitForSync = Effect.fn("Workspace.waitForSync")(function* (
+ workspaceID: WorkspaceID,
+ state: Record<string, number>,
+ signal?: AbortSignal,
+ ) {
+ if (synced(state)) return
+
+ yield* Effect.catch(
+ waitEvent({
+ timeout: TIMEOUT,
+ signal,
+ fn(event) {
+ if (event.workspace !== workspaceID && event.payload.type !== "sync") {
+ return false
+ }
+ return synced(state)
+ },
+ }),
+ (): Effect.Effect<never, WaitForSyncError> =>
+ signal?.aborted
+ ? Effect.fail(
+ new SyncAbortedError({
+ message: signal.reason instanceof Error ? signal.reason.message : "Request aborted",
+ cause: signal.reason,
+ }),
+ )
+ : Effect.fail(
+ new SyncTimeoutError({
+ message: `Timed out waiting for sync fence: ${JSON.stringify(state)}`,
+ state,
+ }),
+ ),
+ )
+ })
-const connections = new Map<WorkspaceID, ConnectionStatus>()
-const aborts = new Map<WorkspaceID, AbortController>()
-const TIMEOUT = 5000
+ const startWorkspaceSyncing = Effect.fn("Workspace.startWorkspaceSyncing")(function* (projectID: ProjectID) {
+ // This session table join makes this query only return
+ // workspaces that have sessions
+ const rows = yield* db((db) =>
+ db
+ .selectDistinct({ workspace: WorkspaceTable })
+ .from(WorkspaceTable)
+ .innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id))
+ .where(eq(WorkspaceTable.project_id, projectID))
+ .all(),
+ )
-function setStatus(id: WorkspaceID, status: ConnectionStatus["status"]) {
- const prev = connections.get(id)
- if (prev?.status === status) return
- const next = { workspaceID: id, status }
- connections.set(id, next)
+ for (const { workspace } of rows) {
+ yield* startSync(fromRow(workspace)).pipe(
+ Effect.catch((error) =>
+ Effect.sync(() => {
+ setStatus(workspace.id, "error")
+ log.warn("workspace sync failed to start", {
+ workspaceID: workspace.id,
+ error,
+ })
+ }),
+ ),
+ Effect.forkDetach,
+ )
+ }
+ })
- if (status === "error") {
- aborts.delete(id)
- }
+ return Service.of({
+ create,
+ sessionRestore,
+ list,
+ get,
+ remove,
+ status,
+ isSyncing,
+ waitForSync,
+ startWorkspaceSyncing,
+ })
+ }),
+)
- GlobalBus.emit("event", {
- directory: "global",
- workspace: id,
- payload: {
- type: Event.Status.type,
- properties: next,
- },
- })
-}
+export const defaultLayer = layer.pipe(
+ Layer.provide(Auth.defaultLayer),
+ Layer.provide(Session.defaultLayer),
+ Layer.provide(FetchHttpClient.layer),
+)
-export function status(): ConnectionStatus[] {
- return [...connections.values()]
+const TIMEOUT = 5000
+
+type HistoryEvent = {
+ id: string
+ aggregate_id: string
+ seq: number
+ type: string
+ data: Record<string, unknown>
}
function synced(state: Record<string, number>) {
@@ -389,32 +846,6 @@ function synced(state: Record<string, number>) {
})
}
-export async function isSyncing(workspaceID: WorkspaceID) {
- return aborts.has(workspaceID)
-}
-
-export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
- if (synced(state)) return
-
- try {
- await waitEvent({
- timeout: TIMEOUT,
- signal,
- fn(event) {
- if (event.workspace !== workspaceID && event.payload.type !== "sync") {
- return false
- }
- return synced(state)
- },
- })
- } catch {
- if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
- throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
- }
-}
-
-const log = Log.create({ service: "workspace-sync" })
-
function route(url: string | URL, path: string) {
const next = new URL(url)
next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
@@ -423,198 +854,42 @@ function route(url: string | URL, path: string) {
return next
}
-async function connectSSE(url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) {
- const res = await fetch(route(url, "/global/event"), {
- method: "GET",
- headers,
- signal,
- })
+const { runPromise, runSync } = makeRuntime(Service, defaultLayer)
- if (!res.ok) throw new Error(`Workspace sync HTTP failure: ${res.status}`)
- if (!res.body) throw new Error("No response body from global sync")
+export const create = fn(CreateInput.zod, (input) => runPromise((svc) => svc.create(input)))
- return res.body
-}
+export const sessionRestore = fn(SessionRestoreInput.zod, (input) => runPromise((svc) => svc.sessionRestore(input)))
-async function syncHistory(space: Info, url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) {
- const sessionIDs = Database.use((db) =>
+export function list(project: Project.Info) {
+ return Database.use((db) =>
db
- .select({ id: SessionTable.id })
- .from(SessionTable)
- .where(eq(SessionTable.workspace_id, space.id))
+ .select()
+ .from(WorkspaceTable)
+ .where(eq(WorkspaceTable.project_id, project.id))
.all()
- .map((row) => row.id),
+ .map(fromRow)
+ .sort((a, b) => a.id.localeCompare(b.id)),
)
- const state = sessionIDs.length
- ? Object.fromEntries(
- Database.use((db) =>
- db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(),
- ).map((row) => [row.aggregate_id, row.seq]),
- )
- : {}
-
- log.info("syncing workspace history", {
- workspaceID: space.id,
- sessions: sessionIDs.length,
- known: Object.keys(state).length,
- })
-
- const requestHeaders = new Headers(headers)
- requestHeaders.set("content-type", "application/json")
-
- const res = await fetch(route(url, "/sync/history"), {
- method: "POST",
- headers: requestHeaders,
- body: JSON.stringify(state),
- signal,
- })
-
- if (!res.ok) {
- const body = await res.text()
- throw new Error(`Workspace history HTTP failure: ${res.status} ${body}`)
- }
-
- const events = await res.json()
-
- return WorkspaceContext.provide({
- workspaceID: space.id,
- fn: () => {
- for (const event of events) {
- SyncEvent.replay(
- {
- id: event.id,
- aggregateID: event.aggregate_id,
- seq: event.seq,
- type: event.type,
- data: event.data,
- },
- { publish: true },
- )
- }
- },
- })
-
- log.info("workspace history synced", {
- workspaceID: space.id,
- events: events.length,
- })
}
-async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
-
- if (target.type === "local") return null
-
- let attempt = 0
-
- while (!signal.aborted) {
- log.info("connecting to global sync", { workspace: space.name })
- setStatus(space.id, "connecting")
-
- let stream
- try {
- stream = await connectSSE(target.url, target.headers, signal)
- await syncHistory(space, target.url, target.headers, signal)
- } catch (err) {
- stream = null
- setStatus(space.id, "error")
- log.info("failed to connect to global sync", {
- workspace: space.name,
- err,
- })
- }
-
- if (stream) {
- attempt = 0
+export const get = fn(WorkspaceID.zod, (id) => runPromise((svc) => svc.get(id)))
- log.info("global sync connected", { workspace: space.name })
- setStatus(space.id, "connected")
+export const remove = fn(WorkspaceID.zod, (id) => runPromise((svc) => svc.remove(id)))
- await parseSSE(stream, signal, (evt: any) => {
- try {
- if (!("payload" in evt)) return
- if (evt.payload.type === "server.heartbeat") return
-
- if (evt.payload.type === "sync") {
- SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
- }
-
- GlobalBus.emit("event", {
- directory: evt.directory,
- project: evt.project,
- workspace: space.id,
- payload: evt.payload,
- })
- } catch (err) {
- log.info("failed to replay global event", {
- workspaceID: space.id,
- error: err,
- })
- }
- })
-
- log.info("disconnected from global sync: " + space.id)
- setStatus(space.id, "disconnected")
- }
-
- // Back off reconnect attempts up to 2 minutes while the workspace
- // stays unavailable.
- await sleep(Math.min(120_000, 1_000 * 2 ** attempt))
- attempt += 1
- }
+export function status() {
+ return runSync((svc) => svc.status())
}
-async function startSync(space: Info) {
- if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
-
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
-
- if (target.type === "local") {
- void Filesystem.exists(target.directory).then((exists) => {
- setStatus(space.id, exists ? "connected" : "error")
- })
- return
- }
-
- if (aborts.has(space.id)) return true
-
- setStatus(space.id, "disconnected")
-
- const abort = new AbortController()
- aborts.set(space.id, abort)
-
- void syncWorkspaceLoop(space, abort.signal).catch((error) => {
- aborts.delete(space.id)
-
- setStatus(space.id, "error")
- log.warn("workspace listener failed", {
- workspaceID: space.id,
- error,
- })
- })
+export function isSyncing(workspaceID: WorkspaceID) {
+ return runSync((svc) => svc.isSyncing(workspaceID))
}
-function stopSync(id: WorkspaceID) {
- aborts.get(id)?.abort()
- aborts.delete(id)
- connections.delete(id)
+export function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
+ return runPromise((svc) => svc.waitForSync(workspaceID, state, signal))
}
export function startWorkspaceSyncing(projectID: ProjectID) {
- const spaces = Database.use((db) =>
- db
- .select({ workspace: WorkspaceTable })
- .from(WorkspaceTable)
- .innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id))
- .where(eq(WorkspaceTable.project_id, projectID))
- .all(),
- )
-
- for (const row of new Map(spaces.map((row) => [row.workspace.id, row.workspace])).values()) {
- void startSync(fromRow(row))
- }
+ void runPromise((svc) => svc.startWorkspaceSyncing(projectID))
}
export * as Workspace from "./workspace"
diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts
index fdd305362..84be17068 100644
--- a/packages/opencode/src/effect/app-runtime.ts
+++ b/packages/opencode/src/effect/app-runtime.ts
@@ -41,6 +41,7 @@ import { ToolRegistry } from "@/tool/registry"
import { Format } from "@/format"
import { Project } from "@/project/project"
import { Vcs } from "@/project/vcs"
+import { Workspace } from "@/control-plane/workspace"
import { Worktree } from "@/worktree"
import { Pty } from "@/pty"
import { Installation } from "@/installation"
@@ -90,6 +91,7 @@ export const AppLayer = Layer.mergeAll(
Format.defaultLayer,
Project.defaultLayer,
Vcs.defaultLayer,
+ Workspace.defaultLayer,
Worktree.defaultLayer,
Pty.defaultLayer,
Installation.defaultLayer,
diff --git a/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts b/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts
index 9318dbfe5..68dc0b9d7 100644
--- a/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts
+++ b/packages/opencode/src/server/routes/instance/httpapi/middleware/workspace-routing.ts
@@ -89,7 +89,7 @@ function missingWorkspaceResponse(id: WorkspaceID): HttpServerResponse.HttpServe
function resolveTarget(workspace: Workspace.Info): Effect.Effect<Target> {
return Effect.gen(function* () {
- const adaptor = yield* Effect.promise(() => getAdaptor(workspace.projectID, workspace.type))
+ const adaptor = yield* Effect.sync(() => getAdaptor(workspace.projectID, workspace.type))
return yield* Effect.promise(() => Promise.resolve(adaptor.target(workspace)))
})
}
@@ -101,7 +101,7 @@ function proxyRemote(
url: URL,
): Effect.Effect<HttpServerResponse.HttpServerResponse, never, Socket.WebSocketConstructor> {
return Effect.gen(function* () {
- const syncing = yield* Effect.promise(() => Workspace.isSyncing(workspace.id))
+ const syncing = yield* Effect.sync(() => Workspace.isSyncing(workspace.id))
if (!syncing) {
return HttpServerResponse.text(`broken sync connection for workspace: ${workspace.id}`, {
status: 503,
diff --git a/packages/opencode/test/control-plane/sse.test.ts b/packages/opencode/test/control-plane/sse.test.ts
deleted file mode 100644
index 78a8341c0..000000000
--- a/packages/opencode/test/control-plane/sse.test.ts
+++ /dev/null
@@ -1,56 +0,0 @@
-import { afterEach, describe, expect, test } from "bun:test"
-import { parseSSE } from "../../src/control-plane/sse"
-import { resetDatabase } from "../fixture/db"
-
-afterEach(async () => {
- await resetDatabase()
-})
-
-function stream(chunks: string[]) {
- return new ReadableStream<Uint8Array>({
- start(controller) {
- const encoder = new TextEncoder()
- chunks.forEach((chunk) => controller.enqueue(encoder.encode(chunk)))
- controller.close()
- },
- })
-}
-
-describe("control-plane/sse", () => {
- test("parses JSON events with CRLF and multiline data blocks", async () => {
- const events: unknown[] = []
- const stop = new AbortController()
-
- await parseSSE(
- stream([
- 'data: {"type":"one","properties":{"ok":true}}\r\n\r\n',
- 'data: {"type":"two",\r\ndata: "properties":{"n":2}}\r\n\r\n',
- ]),
- stop.signal,
- (event) => events.push(event),
- )
-
- expect(events).toEqual([
- { type: "one", properties: { ok: true } },
- { type: "two", properties: { n: 2 } },
- ])
- })
-
- test("falls back to sse.message for non-json payload", async () => {
- const events: unknown[] = []
- const stop = new AbortController()
-
- await parseSSE(stream(["id: abc\nretry: 1500\ndata: hello world\n\n"]), stop.signal, (event) => events.push(event))
-
- expect(events).toEqual([
- {
- type: "sse.message",
- properties: {
- data: "hello world",
- id: "abc",
- retry: 1500,
- },
- },
- ])
- })
-})
diff --git a/packages/opencode/test/control-plane/workspace.test.ts b/packages/opencode/test/control-plane/workspace.test.ts
new file mode 100644
index 000000000..c94d3f9a3
--- /dev/null
+++ b/packages/opencode/test/control-plane/workspace.test.ts
@@ -0,0 +1,1391 @@
+import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
+import fs from "node:fs/promises"
+import Http from "node:http"
+import path from "node:path"
+import { setTimeout as delay } from "node:timers/promises"
+import { NodeHttpServer } from "@effect/platform-node"
+import { Effect, Layer } from "effect"
+import { HttpServer, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
+import { asc, eq } from "drizzle-orm"
+import * as Log from "@opencode-ai/core/util/log"
+import { Flag } from "@opencode-ai/core/flag/flag"
+import { GlobalBus, type GlobalEvent } from "@/bus/global"
+import { Database } from "@/storage/db"
+import { ProjectID } from "@/project/schema"
+import { ProjectTable } from "@/project/project.sql"
+import { Instance } from "@/project/instance"
+import { Session as SessionNs } from "@/session/session"
+import { SessionID, MessageID, PartID } from "@/session/schema"
+import { SessionTable } from "@/session/session.sql"
+import { ModelID, ProviderID } from "@/provider/schema"
+import { SyncEvent } from "@/sync"
+import { EventSequenceTable, EventTable } from "@/sync/event.sql"
+import { resetDatabase } from "../fixture/db"
+import { provideTmpdirInstance, tmpdir } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
+import { registerAdaptor } from "../../src/control-plane/adaptors"
+import { WorkspaceID } from "../../src/control-plane/schema"
+import { WorkspaceTable } from "../../src/control-plane/workspace.sql"
+import type { Target, WorkspaceAdaptor, WorkspaceInfo } from "../../src/control-plane/types"
+import * as WorkspaceOld from "../../src/control-plane/workspace"
+import { AppRuntime } from "@/effect/app-runtime"
+
+void Log.init({ print: false })
+
+const testServerLayer = Layer.mergeAll(
+ NodeHttpServer.layer(Http.createServer, { host: "127.0.0.1", port: 0 }),
+ WorkspaceOld.defaultLayer,
+ SessionNs.defaultLayer,
+)
+const it = testEffect(testServerLayer)
+
+const originalWorkspacesFlag = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
+const originalEnv = {
+ OPENCODE_AUTH_CONTENT: process.env.OPENCODE_AUTH_CONTENT,
+ OTEL_EXPORTER_OTLP_HEADERS: process.env.OTEL_EXPORTER_OTLP_HEADERS,
+ OTEL_EXPORTER_OTLP_ENDPOINT: process.env.OTEL_EXPORTER_OTLP_ENDPOINT,
+ OTEL_RESOURCE_ATTRIBUTES: process.env.OTEL_RESOURCE_ATTRIBUTES,
+}
+
+type RecordedCreate = {
+ info: WorkspaceInfo
+ env: Record<string, string | undefined>
+ from?: WorkspaceInfo
+}
+
+type RecordedAdaptor = {
+ adaptor: WorkspaceAdaptor
+ calls: {
+ configure: WorkspaceInfo[]
+ create: RecordedCreate[]
+ remove: WorkspaceInfo[]
+ target: WorkspaceInfo[]
+ }
+}
+
+type FetchCall = {
+ url: URL
+ method: string
+ headers: Headers
+ bodyText?: string
+ json?: unknown
+}
+
+function unique(prefix: string) {
+ return `${prefix}-${Math.random().toString(36).slice(2)}`
+}
+
+function restoreEnv() {
+ Object.entries(originalEnv).forEach(([key, value]) => {
+ if (value === undefined) {
+ delete process.env[key]
+ return
+ }
+ process.env[key] = value
+ })
+}
+
+beforeEach(() => {
+ Database.close()
+ Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
+ restoreEnv()
+})
+
+afterEach(async () => {
+ mock.restore()
+ await Instance.disposeAll()
+ Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = originalWorkspacesFlag
+ restoreEnv()
+ await resetDatabase()
+})
+
+async function withInstance<T>(fn: (dir: string) => T | Promise<T>) {
+ await using tmp = await tmpdir({ git: true })
+ return Instance.provide({
+ directory: tmp.path,
+ fn: () => fn(tmp.path),
+ })
+}
+
+function captureGlobalEvents() {
+ const events: GlobalEvent[] = []
+ const handler = (event: GlobalEvent) => events.push(event)
+ GlobalBus.on("event", handler)
+ return {
+ events,
+ dispose() {
+ GlobalBus.off("event", handler)
+ },
+ }
+}
+
+async function eventually<T>(fn: () => T | Promise<T>, timeout = 1500) {
+ const started = Date.now()
+ let last: unknown
+ while (Date.now() - started < timeout) {
+ try {
+ return await fn()
+ } catch (err) {
+ last = err
+ await delay(10)
+ }
+ }
+ throw last ?? new Error("Timed out waiting for condition")
+}
+
+function eventuallyEffect(effect: Effect.Effect<void>, timeout = 1500) {
+ return Effect.gen(function* () {
+ const started = Date.now()
+ let last: unknown
+ while (Date.now() - started < timeout) {
+ const exit = yield* Effect.exit(effect)
+ if (exit._tag === "Success") return
+ last = exit.cause
+ yield* Effect.sleep("10 millis")
+ }
+ throw last ?? new Error("Timed out waiting for condition")
+ })
+}
+
+function recordedAdaptor(input: {
+ target: (info: WorkspaceInfo) => Target | Promise<Target>
+ configure?: (info: WorkspaceInfo) => WorkspaceInfo | Promise<WorkspaceInfo>
+ create?: (info: WorkspaceInfo, env: Record<string, string | undefined>, from?: WorkspaceInfo) => Promise<void>
+ remove?: (info: WorkspaceInfo) => Promise<void>
+}): RecordedAdaptor {
+ const calls: RecordedAdaptor["calls"] = {
+ configure: [],
+ create: [],
+ remove: [],
+ target: [],
+ }
+
+ return {
+ calls,
+ adaptor: {
+ name: "recorded",
+ description: "recorded",
+ configure(info) {
+ calls.configure.push(structuredClone(info))
+ return input.configure?.(info) ?? info
+ },
+ async create(info, env, from) {
+ calls.create.push({ info: structuredClone(info), env: { ...env }, from: from ? structuredClone(from) : undefined })
+ await input.create?.(info, env, from)
+ },
+ async remove(info) {
+ calls.remove.push(structuredClone(info))
+ await input.remove?.(info)
+ },
+ target(info) {
+ calls.target.push(structuredClone(info))
+ return input.target(info)
+ },
+ },
+ }
+}
+
+function localAdaptor(dir: string, input?: { createDir?: boolean; remove?: (info: WorkspaceInfo) => Promise<void> }) {
+ return recordedAdaptor({
+ configure(info) {
+ return { ...info, directory: dir }
+ },
+ async create() {
+ if (input?.createDir === false) return
+ await fs.mkdir(dir, { recursive: true })
+ },
+ remove: input?.remove,
+ target() {
+ return { type: "local", directory: dir }
+ },
+ })
+}
+
+function remoteAdaptor(url: string, input?: { directory?: string | null; headers?: HeadersInit }) {
+ return recordedAdaptor({
+ configure(info) {
+ return { ...info, directory: input?.directory ?? info.directory }
+ },
+ target() {
+ return { type: "remote", url, headers: input?.headers }
+ },
+ })
+}
+
+function eventStreamResponse(events: unknown[] = [], keepOpen = true) {
+ const encoder = new TextEncoder()
+ return new Response(
+ new ReadableStream<Uint8Array>({
+ start(controller) {
+ if (keepOpen) controller.enqueue(encoder.encode(":\n\n"))
+ events.forEach((event) => controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)))
+ if (!keepOpen) controller.close()
+ },
+ }),
+ { status: 200, headers: { "content-type": "text/event-stream" } },
+ )
+}
+
+function serverUrl() {
+ return Effect.gen(function* () {
+ return HttpServer.formatAddress((yield* HttpServer.HttpServer).address)
+ })
+}
+
+function workspaceInfo(projectID: ProjectID, type: string, input?: Partial<WorkspaceInfo>): WorkspaceInfo {
+ return {
+ id: input?.id ?? WorkspaceID.ascending(),
+ type,
+ name: input?.name ?? unique("workspace"),
+ branch: input?.branch ?? null,
+ directory: input?.directory ?? null,
+ extra: input?.extra ?? null,
+ projectID,
+ }
+}
+
+function insertWorkspace(info: WorkspaceInfo) {
+ Database.use((db) =>
+ db
+ .insert(WorkspaceTable)
+ .values({
+ id: info.id,
+ type: info.type,
+ branch: info.branch,
+ name: info.name,
+ directory: info.directory,
+ extra: info.extra,
+ project_id: info.projectID,
+ })
+ .run(),
+ )
+}
+
+function insertProject(id: ProjectID, worktree: string) {
+ Database.use((db) =>
+ db
+ .insert(ProjectTable)
+ .values({
+ id,
+ worktree,
+ vcs: null,
+ name: null,
+ time_created: Date.now(),
+ time_updated: Date.now(),
+ sandboxes: [],
+ })
+ .run(),
+ )
+}
+
+function attachSessionToWorkspace(sessionID: SessionID, workspaceID: WorkspaceID) {
+ Database.use((db) => db.update(SessionTable).set({ workspace_id: workspaceID }).where(eq(SessionTable.id, sessionID)).run())
+}
+
+function sessionSequence(sessionID: SessionID) {
+ return Database.use((db) =>
+ db.select({ seq: EventSequenceTable.seq }).from(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, sessionID)).get(),
+ )?.seq
+}
+
+function eventRows(sessionID: SessionID) {
+ return Database.use((db) =>
+ db
+ .select({ seq: EventTable.seq, type: EventTable.type, data: EventTable.data })
+ .from(EventTable)
+ .where(eq(EventTable.aggregate_id, sessionID))
+ .orderBy(asc(EventTable.seq))
+ .all(),
+ )
+}
+
+function sessionUpdatedType() {
+ return SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version)
+}
+
+function replaceSessionEvents(sessionID: SessionID, count: number) {
+ Database.use((db) => {
+ db.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, sessionID)).run()
+ if (count === 0) return
+
+ db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: count - 1 }).run()
+ db.insert(EventTable)
+ .values(
+ Array.from({ length: count }, (_, i) => ({
+ id: `evt_${unique(`manual-${i}`)}`,
+ aggregate_id: sessionID,
+ seq: i,
+ type: sessionUpdatedType(),
+ data: { sessionID, info: { title: `manual ${i}` } },
+ })),
+ )
+ .run()
+ })
+}
+
+describe("workspace-old schemas and exports", () => {
+ test("keeps the historical event type names", () => {
+ expect(WorkspaceOld.Event.Ready.type).toBe("workspace.ready")
+ expect(WorkspaceOld.Event.Failed.type).toBe("workspace.failed")
+ expect(WorkspaceOld.Event.Restore.type).toBe("workspace.restore")
+ expect(WorkspaceOld.Event.Status.type).toBe("workspace.status")
+ })
+
+ test("validates create input with workspace id, project id, branch, type, and extra", () => {
+ const input = {
+ id: WorkspaceID.ascending("wrk_schema_create"),
+ type: "worktree",
+ branch: "feature/schema",
+ projectID: ProjectID.make("project-schema"),
+ extra: { nested: true },
+ }
+
+ expect(WorkspaceOld.CreateInput.zod.parse(input)).toEqual(input)
+ expect(() => WorkspaceOld.CreateInput.zod.parse({ ...input, id: "bad" })).toThrow()
+ expect(() => WorkspaceOld.CreateInput.zod.parse({ ...input, branch: 1 })).toThrow()
+ })
+
+ test("validates session restore input", () => {
+ const input = {
+ workspaceID: WorkspaceID.ascending("wrk_schema_restore"),
+ sessionID: SessionID.descending("ses_schema_restore"),
+ }
+
+ expect(WorkspaceOld.SessionRestoreInput.zod.parse(input)).toEqual(input)
+ expect(() => WorkspaceOld.SessionRestoreInput.zod.parse({ ...input, workspaceID: "bad" })).toThrow()
+ expect(() => WorkspaceOld.SessionRestoreInput.zod.parse({ ...input, sessionID: "bad" })).toThrow()
+ })
+})
+
+describe("workspace-old CRUD", () => {
+ test("get returns undefined for a missing workspace", async () => {
+ await withInstance(async () => {
+ expect(await WorkspaceOld.get(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined()
+ })
+ })
+
+ test("list maps database rows, filters by project, and sorts by id", async () => {
+ await withInstance(() => {
+ const otherProjectID = ProjectID.make("project-other")
+ insertProject(otherProjectID, "/tmp/other")
+ const a = workspaceInfo(Instance.project.id, "manual", {
+ id: WorkspaceID.ascending("wrk_a_list"),
+ branch: "a",
+ directory: "/a",
+ extra: { a: true },
+ })
+ const b = workspaceInfo(Instance.project.id, "manual", {
+ id: WorkspaceID.ascending("wrk_b_list"),
+ branch: "b",
+ directory: "/b",
+ extra: ["b"],
+ })
+ const other = workspaceInfo(otherProjectID, "manual", { id: WorkspaceID.ascending("wrk_c_list") })
+ insertWorkspace(b)
+ insertWorkspace(other)
+ insertWorkspace(a)
+
+ expect(WorkspaceOld.list(Instance.project)).toEqual([a, b])
+ })
+ })
+
+ test("create configures, persists, creates, starts local sync, and passes environment", async () => {
+ await withInstance(async (dir) => {
+ process.env.OPENCODE_AUTH_CONTENT = JSON.stringify({ test: { type: "api", key: "secret" } })
+ process.env.OTEL_EXPORTER_OTLP_HEADERS = "authorization=otel"
+ process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "https://otel.test"
+ process.env.OTEL_RESOURCE_ATTRIBUTES = "service.name=opencode-test"
+
+ const workspaceID = WorkspaceID.ascending("wrk_create_local")
+ const type = unique("create-local")
+ const targetDir = path.join(dir, "created-local")
+ const recorded = recordedAdaptor({
+ configure(info) {
+ return {
+ ...info,
+ branch: "configured-branch",
+ name: "Configured Name",
+ directory: targetDir,
+ extra: { configured: true },
+ }
+ },
+ async create() {
+ await fs.mkdir(targetDir, { recursive: true })
+ },
+ target() {
+ return { type: "local", directory: targetDir }
+ },
+ })
+ registerAdaptor(Instance.project.id, type, recorded.adaptor)
+
+ const info = await WorkspaceOld.create({
+ id: workspaceID,
+ type,
+ branch: null,
+ projectID: Instance.project.id,
+ extra: null,
+ })
+
+ expect(info).toEqual({
+ id: workspaceID,
+ type,
+ branch: "configured-branch",
+ name: "Configured Name",
+ directory: targetDir,
+ extra: { configured: true },
+ projectID: Instance.project.id,
+ })
+ expect(await WorkspaceOld.get(workspaceID)).toEqual(info)
+ expect(WorkspaceOld.list(Instance.project)).toEqual([info])
+ expect(recorded.calls.configure).toHaveLength(1)
+ expect(recorded.calls.configure[0]).toMatchObject({ id: workspaceID, type, directory: null })
+ expect(recorded.calls.create).toHaveLength(1)
+ expect(recorded.calls.create[0].info).toEqual(info)
+ expect(JSON.parse(recorded.calls.create[0].env.OPENCODE_AUTH_CONTENT ?? "{}")).toEqual({
+ test: { type: "api", key: "secret" },
+ })
+ expect(recorded.calls.create[0].env.OPENCODE_WORKSPACE_ID).toBe(workspaceID)
+ expect(recorded.calls.create[0].env.OPENCODE_EXPERIMENTAL_WORKSPACES).toBe("true")
+ expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_HEADERS).toBe("authorization=otel")
+ expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_ENDPOINT).toBe("https://otel.test")
+ expect(recorded.calls.create[0].env.OTEL_RESOURCE_ATTRIBUTES).toBe("service.name=opencode-test")
+ expect(WorkspaceOld.status().find((item) => item.workspaceID === workspaceID)?.status).toBe("connected")
+
+ await WorkspaceOld.remove(workspaceID)
+ expect(WorkspaceOld.status().find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined()
+ })
+ })
+
+ test("create propagates configure failures and does not insert a workspace", async () => {
+ await withInstance(async () => {
+ const type = unique("configure-failure")
+ registerAdaptor(
+ Instance.project.id,
+ type,
+ recordedAdaptor({
+ configure() {
+ throw new Error("configure exploded")
+ },
+ target() {
+ return { type: "local", directory: "/unused" }
+ },
+ }).adaptor,
+ )
+
+ await expect(
+ WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null }),
+ ).rejects.toThrow("configure exploded")
+ expect(WorkspaceOld.list(Instance.project)).toEqual([])
+ })
+ })
+
+ test("create leaves the inserted row when adaptor create fails", async () => {
+ await withInstance(async () => {
+ const type = unique("create-failure")
+ const recorded = recordedAdaptor({
+ async create() {
+ throw new Error("create exploded")
+ },
+ target() {
+ return { type: "local", directory: "/unused" }
+ },
+ })
+ registerAdaptor(Instance.project.id, type, recorded.adaptor)
+
+ await expect(
+ WorkspaceOld.create({ type, branch: "branch", projectID: Instance.project.id, extra: { x: 1 } }),
+ ).rejects.toThrow("create exploded")
+
+ const rows = WorkspaceOld.list(Instance.project)
+ expect(rows).toHaveLength(1)
+ expect(rows[0]).toMatchObject({ type, branch: "branch", extra: { x: 1 } })
+ expect(recorded.calls.target).toHaveLength(0)
+ await WorkspaceOld.remove(rows[0].id)
+ })
+ })
+
+ test("create returns after a local workspace reports error", async () => {
+ await withInstance(async (dir) => {
+ const type = unique("local-error")
+ const missing = path.join(dir, "missing-local-target")
+ const recorded = localAdaptor(missing, { createDir: false })
+ registerAdaptor(Instance.project.id, type, recorded.adaptor)
+
+ const info = await WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null })
+
+ expect(info.directory).toBe(missing)
+ expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("error")
+ await WorkspaceOld.remove(info.id)
+ })
+ })
+
+ it.live("remote create connects to routed event and history endpoints", () => {
+ const calls: FetchCall[] = []
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const bodyText = yield* req.text
+ const call = {
+ url: new URL(req.url, "http://localhost"),
+ method: req.method,
+ headers: new Headers(req.headers),
+ bodyText,
+ json: bodyText ? JSON.parse(bodyText) : undefined,
+ }
+ calls.push(call)
+ if (call.url.pathname === "/base/global/event") return HttpServerResponse.fromWeb(eventStreamResponse([], false))
+ if (call.url.pathname === "/base/sync/history") return yield* HttpServerResponse.json([])
+ return HttpServerResponse.text("unexpected", { status: 500 })
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance((dir) =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const type = unique("remote-create")
+ const recorded = remoteAdaptor(`${url}/base/?ignored=1#hash`, { directory: dir })
+ registerAdaptor(Instance.project.id, type, recorded.adaptor)
+
+ const info = yield* workspace.create({ type, branch: null, projectID: Instance.project.id, extra: null })
+
+ expect(calls.map((call) => `${call.method} ${call.url.pathname}${call.url.search}${call.url.hash}`)).toEqual([
+ "GET /base/global/event",
+ "POST /base/sync/history",
+ ])
+ expect(calls[1].json).toEqual({})
+ expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("connected")
+ expect(yield* workspace.isSyncing(info.id)).toBe(true)
+
+ yield* workspace.remove(info.id)
+ expect(yield* workspace.isSyncing(info.id)).toBe(false)
+ expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
+ }),
+ { git: true },
+ )
+ })
+ })
+
+ test("remove returns undefined for a missing workspace", async () => {
+ await withInstance(async () => {
+ expect(await WorkspaceOld.remove(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined()
+ })
+ })
+
+ test("remove deletes the workspace, associated sessions, adaptor resources, and status", async () => {
+ await withInstance(async (dir) => {
+ const type = unique("remove-local")
+ const recorded = localAdaptor(path.join(dir, "remove-local"))
+ registerAdaptor(Instance.project.id, type, recorded.adaptor)
+ const info = await WorkspaceOld.create({ type, branch: null, projectID: Instance.project.id, extra: null })
+ const one = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
+ const two = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
+ attachSessionToWorkspace(one.id, info.id)
+ attachSessionToWorkspace(two.id, info.id)
+
+ const removed = await WorkspaceOld.remove(info.id)
+
+ expect(removed).toEqual(info)
+ expect(await WorkspaceOld.get(info.id)).toBeUndefined()
+ expect(recorded.calls.remove).toEqual([info])
+ expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
+ expect(
+ Database.use((db) =>
+ db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, info.id)).all(),
+ ),
+ ).toEqual([])
+ })
+ })
+
+ test("remove still deletes the row when the adaptor cannot remove resources", async () => {
+ await withInstance(async () => {
+ const type = unique("remove-throws")
+ const info = workspaceInfo(Instance.project.id, type, { id: WorkspaceID.ascending("wrk_remove_throws") })
+ registerAdaptor(
+ Instance.project.id,
+ type,
+ recordedAdaptor({
+ async remove() {
+ throw new Error("remove exploded")
+ },
+ target() {
+ return { type: "local", directory: "/unused" }
+ },
+ }).adaptor,
+ )
+ insertWorkspace(info)
+
+ expect(await WorkspaceOld.remove(info.id)).toEqual(info)
+ expect(await WorkspaceOld.get(info.id)).toBeUndefined()
+ })
+ })
+})
+
+describe("workspace-old sync state", () => {
+ test("startWorkspaceSyncing is disabled by the experimental workspace flag", async () => {
+ await withInstance(async (dir) => {
+ Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false
+ const type = unique("flag-disabled")
+ const info = workspaceInfo(Instance.project.id, type)
+ const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))
+ attachSessionToWorkspace(session.id, info.id)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, localAdaptor(path.join(dir, "flag-disabled")).adaptor)
+
+ WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
+ await delay(25)
+
+ expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBeUndefined()
+ })
+ })
+
+ test("startWorkspaceSyncing starts only workspaces with sessions", async () => {
+ await withInstance(async (dir) => {
+ const withSessionType = unique("with-session")
+ const withoutSessionType = unique("without-session")
+ const withSession = workspaceInfo(Instance.project.id, withSessionType)
+ const withoutSession = workspaceInfo(Instance.project.id, withoutSessionType)
+ const withSessionDir = path.join(dir, "with-session")
+ const withoutSessionDir = path.join(dir, "without-session")
+ await fs.mkdir(withSessionDir, { recursive: true })
+ await fs.mkdir(withoutSessionDir, { recursive: true })
+ insertWorkspace(withSession)
+ insertWorkspace(withoutSession)
+ registerAdaptor(Instance.project.id, withSessionType, localAdaptor(withSessionDir).adaptor)
+ registerAdaptor(Instance.project.id, withoutSessionType, localAdaptor(withoutSessionDir).adaptor)
+ attachSessionToWorkspace((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, withSession.id)
+
+ WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
+
+ await eventually(() => expect(WorkspaceOld.status().find((item) => item.workspaceID === withSession.id)?.status).toBe("connected"))
+ expect(WorkspaceOld.status().find((item) => item.workspaceID === withoutSession.id)?.status).toBeUndefined()
+ await WorkspaceOld.remove(withSession.id)
+ await WorkspaceOld.remove(withoutSession.id)
+ })
+ })
+
+ test("local start reports error when the target directory is missing", async () => {
+ await withInstance(async (dir) => {
+ const type = unique("missing-local")
+ const info = workspaceInfo(Instance.project.id, type)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, localAdaptor(path.join(dir, "missing-target"), { createDir: false }).adaptor)
+ attachSessionToWorkspace((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, info.id)
+
+ WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
+
+ await eventually(() => expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("error"))
+ expect(await WorkspaceOld.isSyncing(info.id)).toBe(false)
+ await WorkspaceOld.remove(info.id)
+ })
+ })
+
+ test("duplicate local status updates are suppressed", async () => {
+ await withInstance(async (dir) => {
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("dedupe-local")
+ const info = workspaceInfo(Instance.project.id, type)
+ const target = path.join(dir, "dedupe-local")
+ await fs.mkdir(target, { recursive: true })
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, localAdaptor(target).adaptor)
+ attachSessionToWorkspace((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({})))).id, info.id)
+
+ WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
+ WorkspaceOld.startWorkspaceSyncing(Instance.project.id)
+
+ await eventually(() => expect(WorkspaceOld.status().find((item) => item.workspaceID === info.id)?.status).toBe("connected"))
+ expect(
+ captured.events.filter(
+ (event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Status.type,
+ ),
+ ).toHaveLength(1)
+ await WorkspaceOld.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ })
+ })
+
+ it.live("remote start emits disconnected, connecting, and connected then refuses duplicate listeners", () => {
+ const calls: FetchCall[] = []
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const bodyText = yield* req.text
+ const call = {
+ url: new URL(req.url, "http://localhost"),
+ method: req.method,
+ headers: new Headers(req.headers),
+ bodyText,
+ json: bodyText ? JSON.parse(bodyText) : undefined,
+ }
+ calls.push(call)
+ if (call.url.pathname === "/sync/global/event") return HttpServerResponse.fromWeb(eventStreamResponse())
+ if (call.url.pathname === "/sync/sync/history") return HttpServerResponse.fromWeb(Response.json([]))
+ return HttpServerResponse.text("unexpected", { status: 500 })
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("remote-start")
+ const info = workspaceInfo(Instance.project.id, type)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/sync`).adaptor)
+ attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
+
+ yield* workspace.startWorkspaceSyncing(Instance.project.id)
+ yield* eventuallyEffect(Effect.gen(function* () {
+ expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("connected")
+ }))
+ yield* workspace.startWorkspaceSyncing(Instance.project.id)
+ yield* Effect.sleep("25 millis")
+
+ expect(
+ captured.events
+ .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Status.type)
+ .map((event) => event.payload.properties.status),
+ ).toEqual(["disconnected", "connecting", "connected"])
+ expect(calls.filter((call) => call.url.pathname === "/sync/global/event")).toHaveLength(1)
+ expect(calls.filter((call) => call.url.pathname === "/sync/sync/history")).toHaveLength(1)
+ expect(yield* workspace.isSyncing(info.id)).toBe(true)
+
+ yield* workspace.remove(info.id)
+ expect(yield* workspace.isSyncing(info.id)).toBe(false)
+ } finally {
+ captured.dispose()
+ }
+ }),
+ { git: true },
+ )
+ })
+ })
+
+ it.live("remote connection HTTP failures set error and clear syncing", () =>
+ Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ if (new URL(req.url, "http://localhost").pathname === "/failed/global/event")
+ return HttpServerResponse.text("nope", { status: 503 })
+ return HttpServerResponse.fromWeb(Response.json([]))
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const type = unique("remote-connect-fail")
+ const info = workspaceInfo(Instance.project.id, type)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/failed`).adaptor)
+ attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
+
+ yield* workspace.startWorkspaceSyncing(Instance.project.id)
+
+ yield* eventuallyEffect(Effect.gen(function* () {
+ expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error")
+ }))
+ expect(yield* workspace.isSyncing(info.id)).toBe(false)
+ yield* workspace.remove(info.id)
+ }),
+ { git: true },
+ )
+ }),
+ )
+
+ it.live("remote history HTTP failures set error", () =>
+ Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const url = new URL(req.url, "http://localhost")
+ if (url.pathname === "/history-failed/global/event") return HttpServerResponse.fromWeb(eventStreamResponse([], false))
+ if (url.pathname === "/history-failed/sync/history") return HttpServerResponse.text("history failed", { status: 500 })
+ return HttpServerResponse.fromWeb(Response.json([]))
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const type = unique("remote-history-fail")
+ const info = workspaceInfo(Instance.project.id, type)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/history-failed`).adaptor)
+ attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
+
+ yield* workspace.startWorkspaceSyncing(Instance.project.id)
+
+ yield* eventuallyEffect(Effect.gen(function* () {
+ expect((yield* workspace.status()).find((item) => item.workspaceID === info.id)?.status).toBe("error")
+ }))
+ expect(yield* workspace.isSyncing(info.id)).toBe(false)
+ yield* workspace.remove(info.id)
+ }),
+ { git: true },
+ )
+ }),
+ )
+
+ it.live("sync history sends the local sequence fence and replays returned events in workspace context", () => {
+ const historyBodies: unknown[] = []
+ let historySessionID: SessionID | undefined
+ let historyNextSeq = 0
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const bodyText = yield* req.text
+ const url = new URL(req.url, "http://localhost")
+ if (url.pathname === "/history/global/event") return HttpServerResponse.fromWeb(eventStreamResponse())
+ if (url.pathname === "/history/sync/history") {
+ historyBodies.push(bodyText ? JSON.parse(bodyText) : undefined)
+ return HttpServerResponse.fromWeb(
+ Response.json([
+ {
+ id: `evt_${unique("history")}`,
+ aggregate_id: historySessionID!,
+ seq: historyNextSeq,
+ type: sessionUpdatedType(),
+ data: { sessionID: historySessionID!, info: { title: "from history" } },
+ },
+ ]),
+ )
+ }
+ return HttpServerResponse.text("unexpected", { status: 500 })
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("history-replay")
+ const info = workspaceInfo(Instance.project.id, type)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/history`).adaptor)
+ const session = yield* sessionSvc.create({ title: "before history" })
+ attachSessionToWorkspace(session.id, info.id)
+ historySessionID = session.id
+ historyNextSeq = (sessionSequence(session.id) ?? -1) + 1
+
+ yield* workspace.startWorkspaceSyncing(Instance.project.id)
+
+ yield* eventuallyEffect(Effect.gen(function* () {
+ expect((yield* sessionSvc.get(session.id)).title).toBe("from history")
+ }))
+ expect(historyBodies).toEqual([{ [session.id]: historyNextSeq - 1 }])
+ expect(
+ captured.events.some(
+ (event) =>
+ event.workspace === info.id && event.payload.type === "sync" && event.payload.syncEvent.seq === historyNextSeq,
+ ),
+ ).toBe(true)
+ yield* workspace.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ }),
+ { git: true },
+ )
+ })
+ })
+
+ it.live("SSE forwards non-heartbeat events and ignores heartbeats", () =>
+ Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const url = new URL(req.url, "http://localhost")
+ if (url.pathname === "/sse-forward/global/event")
+ return HttpServerResponse.fromWeb(
+ eventStreamResponse(
+ [
+ { directory: "remote-dir", project: "remote-project", payload: { type: "server.heartbeat" } },
+ {
+ directory: "remote-dir",
+ project: "remote-project",
+ payload: { type: "custom.remote", properties: { ok: true } },
+ },
+ ],
+ false,
+ ),
+ )
+ if (url.pathname === "/sse-forward/sync/history") return HttpServerResponse.fromWeb(Response.json([]))
+ return HttpServerResponse.text("unexpected", { status: 500 })
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("sse-forward")
+ const info = workspaceInfo(Instance.project.id, type)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/sse-forward`).adaptor)
+ attachSessionToWorkspace((yield* sessionSvc.create({})).id, info.id)
+
+ yield* workspace.startWorkspaceSyncing(Instance.project.id)
+
+ yield* eventuallyEffect(Effect.sync(() =>
+ expect(captured.events.some((event) => event.workspace === info.id && event.payload.type === "custom.remote"))
+ .toBe(true),
+ ))
+ expect(captured.events.some((event) => event.workspace === info.id && event.payload.type === "server.heartbeat")).toBe(
+ false,
+ )
+ expect(
+ captured.events.find((event) => event.workspace === info.id && event.payload.type === "custom.remote"),
+ ).toMatchObject({ directory: "remote-dir", project: "remote-project", payload: { properties: { ok: true } } })
+ yield* workspace.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ }),
+ { git: true },
+ )
+ }),
+ )
+
+ it.live("SSE sync events are replayed and forwarded", () => {
+ let sseSessionID: SessionID | undefined
+ let sseNextSeq = 0
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const url = new URL(req.url, "http://localhost")
+ if (url.pathname === "/sse-sync/global/event")
+ return HttpServerResponse.fromWeb(
+ eventStreamResponse(
+ [
+ {
+ directory: "remote-dir",
+ project: "remote-project",
+ payload: {
+ type: "sync",
+ syncEvent: {
+ id: `evt_${unique("sse")}`,
+ aggregateID: sseSessionID!,
+ seq: sseNextSeq,
+ type: sessionUpdatedType(),
+ data: { sessionID: sseSessionID!, info: { title: "from sse" } },
+ },
+ },
+ },
+ ],
+ false,
+ ),
+ )
+ if (url.pathname === "/sse-sync/sync/history") return HttpServerResponse.fromWeb(Response.json([]))
+ return HttpServerResponse.text("unexpected", { status: 500 })
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("sse-sync")
+ const info = workspaceInfo(Instance.project.id, type)
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/sse-sync`).adaptor)
+ const session = yield* sessionSvc.create({ title: "before sse" })
+ attachSessionToWorkspace(session.id, info.id)
+ sseSessionID = session.id
+ sseNextSeq = (sessionSequence(session.id) ?? -1) + 1
+
+ yield* workspace.startWorkspaceSyncing(Instance.project.id)
+
+ yield* eventuallyEffect(Effect.gen(function* () {
+ expect((yield* sessionSvc.get(session.id)).title).toBe("from sse")
+ }))
+ expect(
+ captured.events.some(
+ (event) => event.workspace === info.id && event.payload.type === "sync" && event.payload.syncEvent.seq === sseNextSeq,
+ ),
+ ).toBe(true)
+ yield* workspace.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ }),
+ { git: true },
+ )
+ })
+ })
+})
+
+describe("workspace-old waitForSync", () => {
+ test("returns immediately for an empty fence", async () => {
+ await withInstance(async () => {
+ await expect(WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_empty"), {})).resolves.toBeUndefined()
+ })
+ })
+
+ test("returns immediately when the stored sequence already satisfies the fence", async () => {
+ await withInstance(async () => {
+ const sessionID = SessionID.descending("ses_wait_done")
+ Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 4 }).run())
+
+ await expect(WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 })).resolves.toBeUndefined()
+ await expect(WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 })).resolves.toBeUndefined()
+ })
+ })
+
+ test("waits until the database reaches the requested sequence and a workspace event arrives", async () => {
+ await withInstance(async () => {
+ const workspaceID = WorkspaceID.ascending("wrk_wait_event")
+ const sessionID = SessionID.descending("ses_wait_event")
+ Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 1 }).run())
+
+ const waited = WorkspaceOld.waitForSync(workspaceID, { [sessionID]: 2 })
+ await delay(10)
+ Database.use((db) =>
+ db.update(EventSequenceTable).set({ seq: 2 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(),
+ )
+ GlobalBus.emit("event", { workspace: workspaceID, payload: { type: "anything" } })
+
+ await expect(waited).resolves.toBeUndefined()
+ })
+ })
+
+ test("a sync event for a different workspace can also release the fence", async () => {
+ await withInstance(async () => {
+ const workspaceID = WorkspaceID.ascending("wrk_wait_sync_any")
+ const sessionID = SessionID.descending("ses_wait_sync_any")
+ Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 0 }).run())
+
+ const waited = WorkspaceOld.waitForSync(workspaceID, { [sessionID]: 1 })
+ await delay(10)
+ Database.use((db) =>
+ db.update(EventSequenceTable).set({ seq: 1 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(),
+ )
+ GlobalBus.emit("event", {
+ workspace: WorkspaceID.ascending("wrk_other_workspace"),
+ payload: { type: "sync" },
+ })
+
+ await expect(waited).resolves.toBeUndefined()
+ })
+ })
+
+ test("rejects with the abort reason when aborted", async () => {
+ await withInstance(async () => {
+ const abort = new AbortController()
+ const reason = new Error("caller aborted")
+ const waited = WorkspaceOld.waitForSync(
+ WorkspaceID.ascending("wrk_wait_abort"),
+ { [SessionID.descending("ses_wait_abort")]: 1 },
+ abort.signal,
+ )
+ abort.abort(reason)
+
+ await expect(waited).rejects.toMatchObject({ _tag: "WorkspaceSyncAbortedError", message: reason.message, cause: reason })
+ })
+ })
+
+ test(
+ "times out with the requested fence in the error message",
+ async () => {
+ await withInstance(async () => {
+ const sessionID = SessionID.descending("ses_wait_timeout")
+
+ await expect(
+ WorkspaceOld.waitForSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 }),
+ ).rejects.toThrow(`Timed out waiting for sync fence: {"${sessionID}":1}`)
+ })
+ },
+ 7000,
+ )
+})
+
+describe("workspace-old sessionRestore", () => {
+ test("throws when the workspace is missing", async () => {
+ await withInstance(async () => {
+ await expect(
+ WorkspaceOld.sessionRestore({
+ workspaceID: WorkspaceID.ascending("wrk_restore_missing"),
+ sessionID: SessionID.descending("ses_restore_missing_workspace"),
+ }),
+ ).rejects.toThrow("Workspace not found: wrk_restore_missing")
+ })
+ })
+
+ test("throws when switching a missing session fails", async () => {
+ await withInstance(async (dir) => {
+ const type = unique("restore-missing-session")
+ const info = workspaceInfo(Instance.project.id, type, { directory: dir })
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
+
+ await expect(
+ WorkspaceOld.sessionRestore({ workspaceID: info.id, sessionID: SessionID.descending("ses_missing_restore") }),
+ ).rejects.toThrow("NotFoundError")
+ await WorkspaceOld.remove(info.id)
+ })
+ })
+
+ it.live("posts remote replay batches of 10, emits progress, and includes the workspace update event", () => {
+ const replay: FetchCall[] = []
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const bodyText = yield* req.text
+ const call = {
+ url: new URL(req.url, "http://localhost"),
+ method: req.method,
+ headers: new Headers(req.headers),
+ bodyText,
+ json: bodyText ? JSON.parse(bodyText) : undefined,
+ }
+ if (call.url.pathname === "/restore/sync/replay") {
+ replay.push(call)
+ return HttpServerResponse.fromWeb(Response.json({ ok: true }))
+ }
+ return HttpServerResponse.text("unexpected", { status: 500 })
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance((dir) =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("restore-remote")
+ const info = workspaceInfo(Instance.project.id, type, { directory: dir })
+ insertWorkspace(info)
+ registerAdaptor(
+ Instance.project.id,
+ type,
+ remoteAdaptor(`${url}/restore/?ignored=1#hash`, {
+ directory: dir,
+ headers: { authorization: "Bearer restore" },
+ }).adaptor,
+ )
+ const session = yield* sessionSvc.create({ title: "restore remote" })
+ replaceSessionEvents(session.id, 24)
+
+ const result = yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })
+
+ expect(result).toEqual({ total: 3 })
+ expect(replay).toHaveLength(3)
+ expect(replay.map((call) => call.url.pathname + call.url.search + call.url.hash)).toEqual([
+ "/restore/sync/replay",
+ "/restore/sync/replay",
+ "/restore/sync/replay",
+ ])
+ expect(replay.every((call) => call.headers.get("authorization") === "Bearer restore")).toBe(true)
+ expect(replay.every((call) => call.headers.get("content-type") === "application/json")).toBe(true)
+ expect(replay.map((call) => (call.json as { events: unknown[] }).events.length)).toEqual([10, 10, 5])
+ expect(replay.map((call) => (call.json as { directory: string }).directory)).toEqual([dir, dir, dir])
+ expect(
+ replay.flatMap((call) => (call.json as { events: Array<{ seq: number }> }).events.map((event) => event.seq)),
+ ).toEqual(Array.from({ length: 25 }, (_, i) => i))
+ expect((replay[2].json as { events: Array<{ seq: number; type: string; data: unknown }> }).events.at(-1)).toMatchObject({
+ seq: 24,
+ type: sessionUpdatedType(),
+ data: { sessionID: session.id, info: { workspaceID: info.id } },
+ })
+ expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id)
+ expect(
+ captured.events
+ .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
+ .map((event) => event.payload.properties.step),
+ ).toEqual([0, 1, 2, 3])
+ yield* workspace.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ }),
+ { git: true },
+ )
+ })
+ })
+
+ it.live("remote restore sends an empty directory string when the workspace directory is null", () => {
+ const replay: FetchCall[] = []
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const bodyText = yield* req.text
+ replay.push({
+ url: new URL(req.url, "http://localhost"),
+ method: req.method,
+ headers: new Headers(req.headers),
+ bodyText,
+ json: bodyText ? JSON.parse(bodyText) : undefined,
+ })
+ return HttpServerResponse.fromWeb(Response.json({ ok: true }))
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const type = unique("restore-null-dir")
+ const info = workspaceInfo(Instance.project.id, type, { directory: null })
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/null-dir`, { directory: null }).adaptor)
+ const session = yield* sessionSvc.create({ title: "null dir" })
+ replaceSessionEvents(session.id, 0)
+
+ expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 1 })
+ expect((replay[0].json as { directory: string }).directory).toBe("")
+ expect((replay[0].json as { events: unknown[] }).events).toHaveLength(1)
+ yield* workspace.remove(info.id)
+ }),
+ { git: true },
+ )
+ })
+ })
+
+ it.live("remote restore failures include status and body and do not emit completed batch progress", () => {
+ const replay: FetchCall[] = []
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const bodyText = yield* req.text
+ replay.push({
+ url: new URL(req.url, "http://localhost"),
+ method: req.method,
+ headers: new Headers(req.headers),
+ bodyText,
+ json: bodyText ? JSON.parse(bodyText) : undefined,
+ })
+ return HttpServerResponse.text("replay failed", { status: 503 })
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance((dir) =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("restore-remote-fail")
+ const info = workspaceInfo(Instance.project.id, type, { directory: dir })
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/fail`, { directory: dir }).adaptor)
+ const session = yield* sessionSvc.create({ title: "restore fail" })
+ replaceSessionEvents(session.id, 11)
+
+ const error = yield* Effect.flip(workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id }))
+ expect((error as Error).message).toContain(
+ `Failed to replay session ${session.id} into workspace ${info.id}: HTTP 503 replay failed`,
+ )
+
+ expect(replay).toHaveLength(1)
+ expect(
+ captured.events
+ .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
+ .map((event) => event.payload.properties.step),
+ ).toEqual([0])
+ yield* workspace.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ }),
+ { git: true },
+ )
+ })
+ })
+
+ test("local restore replays batches without fetch and emits progress", async () => {
+ await withInstance(async (dir) => {
+ const captured = captureGlobalEvents()
+ let fetchCallCount = 0
+ const replayAll = spyOn(SyncEvent, "replayAll")
+ try {
+ using server = Bun.serve({
+ port: 0,
+ fetch() {
+ fetchCallCount++
+ return Response.json({ ok: true })
+ },
+ })
+ const type = unique("restore-local")
+ const info = workspaceInfo(Instance.project.id, type, { directory: dir })
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
+ const session = await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create({ title: "restore local" })))
+ replaceSessionEvents(session.id, 20)
+
+ expect(await WorkspaceOld.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 })
+
+ expect(fetchCallCount).toBe(0)
+ expect(replayAll).toHaveBeenCalledTimes(3)
+ expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1])
+ expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe(info.id)
+ expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
+ expect(
+ captured.events
+ .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
+ .map((event) => event.payload.properties.step),
+ ).toEqual([0, 1, 2, 3])
+ await WorkspaceOld.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ })
+ })
+
+ it.live("session restore includes real message and part events in sequence order", () => {
+ const replay: FetchCall[] = []
+ return Effect.gen(function* () {
+ yield* HttpServer.serveEffect()(Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const bodyText = yield* req.text
+ replay.push({
+ url: new URL(req.url, "http://localhost"),
+ method: req.method,
+ headers: new Headers(req.headers),
+ bodyText,
+ json: bodyText ? JSON.parse(bodyText) : undefined,
+ })
+ return HttpServerResponse.fromWeb(Response.json({ ok: true }))
+ }))
+ const url = yield* serverUrl()
+ yield* provideTmpdirInstance((dir) =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const type = unique("restore-real-events")
+ const info = workspaceInfo(Instance.project.id, type, { directory: dir })
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, remoteAdaptor(`${url}/real`, { directory: dir }).adaptor)
+ const session = yield* sessionSvc.create({ title: "real events" })
+ for (let i = 0; i < 3; i++) {
+ const msg = yield* sessionSvc.updateMessage({
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID: session.id,
+ agent: "build",
+ model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") },
+ time: { created: Date.now() },
+ })
+ yield* sessionSvc.updatePart({
+ id: PartID.ascending(),
+ sessionID: session.id,
+ messageID: msg.id,
+ type: "text",
+ text: `message ${i}`,
+ })
+ }
+ const before = eventRows(session.id)
+
+ expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 1 })
+
+ const posted = (replay[0].json as { events: Array<{ seq: number; type: string }> }).events
+ expect(posted.map((event) => event.seq)).toEqual([...before.map((row) => row.seq), before.at(-1)!.seq + 1])
+ expect(posted.map((event) => event.type).slice(0, -1)).toEqual(before.map((row) => row.type))
+ expect(posted.at(-1)?.type).toBe(sessionUpdatedType())
+ yield* workspace.remove(info.id)
+ }),
+ { git: true },
+ )
+ })
+ })
+})
diff --git a/packages/opencode/test/workspace/workspace-restore.test.ts b/packages/opencode/test/workspace/workspace-restore.test.ts
deleted file mode 100644
index 7f802150e..000000000
--- a/packages/opencode/test/workspace/workspace-restore.test.ts
+++ /dev/null
@@ -1,283 +0,0 @@
-import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
-import fs from "node:fs/promises"
-import path from "node:path"
-import { GlobalBus } from "../../src/bus/global"
-import { registerAdaptor } from "../../src/control-plane/adaptors"
-import type { WorkspaceAdaptor } from "../../src/control-plane/types"
-import { Workspace } from "../../src/control-plane/workspace"
-import { AppRuntime } from "../../src/effect/app-runtime"
-import { Flag } from "@opencode-ai/core/flag/flag"
-import { ModelID, ProviderID } from "../../src/provider/schema"
-import { Instance } from "../../src/project/instance"
-import { Session as SessionNs } from "@/session/session"
-import { MessageV2 } from "../../src/session/message-v2"
-import { MessageID, PartID, type SessionID } from "../../src/session/schema"
-import { Database } from "@/storage/db"
-import { asc } from "drizzle-orm"
-import { eq } from "drizzle-orm"
-import { SyncEvent } from "../../src/sync"
-import { EventTable } from "../../src/sync/event.sql"
-import * as Log from "@opencode-ai/core/util/log"
-import { resetDatabase } from "../fixture/db"
-import { tmpdir } from "../fixture/fixture"
-
-void Log.init({ print: false })
-
-const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
-
-beforeEach(() => {
- Database.close()
- Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
-})
-
-afterEach(async () => {
- mock.restore()
- await Instance.disposeAll()
- Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
- await resetDatabase()
-})
-
-function create(input?: SessionNs.CreateInput) {
- return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create(input)))
-}
-
-function get(id: SessionID) {
- return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(id)))
-}
-
-function updateMessage<T extends MessageV2.Info>(msg: T) {
- return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updateMessage(msg)))
-}
-
-function updatePart<T extends MessageV2.Part>(part: T) {
- return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updatePart(part)))
-}
-
-async function user(sessionID: SessionID, text: string) {
- const msg = await updateMessage({
- id: MessageID.ascending(),
- role: "user",
- sessionID,
- agent: "build",
- model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") },
- time: { created: Date.now() },
- })
- await updatePart({
- id: PartID.ascending(),
- sessionID,
- messageID: msg.id,
- type: "text",
- text,
- })
-}
-
-function remote(dir: string, url: string): WorkspaceAdaptor {
- return {
- name: "remote",
- description: "remote",
- configure(info) {
- return {
- ...info,
- directory: dir,
- }
- },
- async create() {
- await fs.mkdir(dir, { recursive: true })
- },
- async remove() {},
- target() {
- return {
- type: "remote" as const,
- url,
- }
- },
- }
-}
-
-function local(dir: string): WorkspaceAdaptor {
- return {
- name: "local",
- description: "local",
- configure(info) {
- return {
- ...info,
- directory: dir,
- }
- },
- async create() {
- await fs.mkdir(dir, { recursive: true })
- },
- async remove() {},
- target() {
- return {
- type: "local" as const,
- directory: dir,
- }
- },
- }
-}
-
-function eventStreamResponse() {
- return new Response(new ReadableStream({ start() {} }), {
- status: 200,
- headers: {
- "content-type": "text/event-stream",
- },
- })
-}
-
-describe("Workspace.sessionRestore", () => {
- test("replays session events in batches of 10 and emits progress", async () => {
- await using tmp = await tmpdir({ git: true })
- const dir = path.join(tmp.path, ".restore")
- const seen: any[] = []
- const posts: Array<{
- path: string
- body: { directory: string; events: Array<{ seq: number; aggregateID: string }> }
- }> = []
- const on = (evt: any) => seen.push(evt)
- GlobalBus.on("event", on)
-
- const raw = globalThis.fetch
- spyOn(globalThis, "fetch").mockImplementation(
- Object.assign(
- async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => {
- const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url)
- if (url.pathname === "/base/global/event") {
- return eventStreamResponse()
- }
- if (url.pathname === "/base/sync/history") {
- return Response.json([])
- }
- const body = JSON.parse(String(init?.body))
- posts.push({
- path: url.pathname,
- body,
- })
- return Response.json({ sessionID: body.events[0].aggregateID })
- },
- {
- preconnect: raw.preconnect?.bind(raw),
- },
- ) as typeof globalThis.fetch,
- )
-
- try {
- const setup = await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- registerAdaptor(Instance.project.id, "worktree", remote(dir, "https://workspace.test/base"))
- const space = await Workspace.create({
- type: "worktree",
- branch: null,
- extra: null,
- projectID: Instance.project.id,
- })
- const session = await create({})
- for (let i = 0; i < 6; i++) {
- await user(session.id, `msg ${i}`)
- }
- const rows = Database.use((db) =>
- db
- .select({ seq: EventTable.seq })
- .from(EventTable)
- .where(eq(EventTable.aggregate_id, session.id))
- .orderBy(asc(EventTable.seq))
- .all(),
- )
- const result = await Workspace.sessionRestore({
- workspaceID: space.id,
- sessionID: session.id,
- })
- return { space, session, rows, result }
- },
- })
-
- expect(setup.rows).toHaveLength(13)
- expect(setup.result).toEqual({ total: 2 })
- expect(posts).toHaveLength(2)
- expect(posts[0]?.path).toBe("/base/sync/replay")
- expect(posts[1]?.path).toBe("/base/sync/replay")
- expect(posts[0]?.body.directory).toBe(dir)
- expect(posts[1]?.body.directory).toBe(dir)
- expect(posts[0]?.body.events).toHaveLength(10)
- expect(posts[1]?.body.events).toHaveLength(4)
- expect(posts.flatMap((item) => item.body.events.map((event) => event.seq))).toEqual([
- ...setup.rows.map((row) => row.seq),
- setup.rows.at(-1)!.seq + 1,
- ])
- expect(posts[1]?.body.events.at(-1)).toMatchObject({
- aggregateID: setup.session.id,
- seq: setup.rows.at(-1)!.seq + 1,
- type: SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version),
- data: {
- sessionID: setup.session.id,
- info: {
- workspaceID: setup.space.id,
- },
- },
- })
-
- const restore = seen.filter(
- (evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type,
- )
- expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2])
- expect(restore.map((evt) => evt.payload.properties.total)).toEqual([2, 2, 2])
- expect(restore.map((evt) => evt.payload.properties.sessionID)).toEqual([
- setup.session.id,
- setup.session.id,
- setup.session.id,
- ])
- } finally {
- GlobalBus.off("event", on)
- }
- })
-
- test("replays locally without posting to a server", async () => {
- await using tmp = await tmpdir({ git: true })
- const dir = path.join(tmp.path, ".restore-local")
- const seen: any[] = []
- const on = (evt: any) => seen.push(evt)
- GlobalBus.on("event", on)
-
- const fetch = spyOn(globalThis, "fetch")
- const replayAll = spyOn(SyncEvent, "replayAll")
-
- try {
- const setup = await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- registerAdaptor(Instance.project.id, "local-restore", local(dir))
- const space = await Workspace.create({
- type: "local-restore",
- branch: null,
- extra: null,
- projectID: Instance.project.id,
- })
- const session = await create({})
- for (let i = 0; i < 6; i++) {
- await user(session.id, `msg ${i}`)
- }
- const result = await Workspace.sessionRestore({
- workspaceID: space.id,
- sessionID: session.id,
- })
- const updated = await get(session.id)
- return { space, session, result, updated }
- },
- })
-
- expect(setup.result).toEqual({ total: 2 })
- expect(fetch).not.toHaveBeenCalled()
- expect(replayAll).toHaveBeenCalledTimes(2)
- expect(setup.updated.workspaceID).toBe(setup.space.id)
-
- const restore = seen.filter(
- (evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type,
- )
- expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2])
- } finally {
- GlobalBus.off("event", on)
- }
- })
-})