summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-04-16 20:00:15 -0400
committerGitHub <[email protected]>2026-04-17 00:00:15 +0000
commitbae80af1b4620961664076bd257c22b88b57eeaf (patch)
treebc147f4d6122f6aff360947a665403e919d72c87
parentf9aa3d77cd543ad3a46f86e36a2908f0cc2e652f (diff)
downloadopencode-bae80af1b4620961664076bd257c22b88b57eeaf.tar.gz
opencode-bae80af1b4620961664076bd257c22b88b57eeaf.zip
refactor: unwrap Workspace namespace + self-reexport (#22934)
-rw-r--r--packages/opencode/src/control-plane/workspace.ts820
1 files changed, 410 insertions, 410 deletions
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index 9c1c4c896..3af11707e 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -26,505 +26,505 @@ import { AppRuntime } from "@/effect/app-runtime"
import { EventSequenceTable } from "@/sync/event.sql"
import { waitEvent } from "./util"
-export namespace Workspace {
- export const Info = WorkspaceInfo.meta({
- ref: "Workspace",
- })
- export type Info = z.infer<typeof Info>
-
- export const ConnectionStatus = z.object({
- workspaceID: WorkspaceID.zod,
- status: z.enum(["connected", "connecting", "disconnected", "error"]),
- error: z.string().optional(),
- })
- export type ConnectionStatus = z.infer<typeof ConnectionStatus>
-
- const Restore = z.object({
- workspaceID: WorkspaceID.zod,
- sessionID: SessionID.zod,
- total: z.number().int().min(0),
- step: z.number().int().min(0),
- })
+export const Info = WorkspaceInfo.meta({
+ ref: "Workspace",
+})
+export type Info = z.infer<typeof Info>
+
+export const ConnectionStatus = z.object({
+ workspaceID: WorkspaceID.zod,
+ status: z.enum(["connected", "connecting", "disconnected", "error"]),
+ error: z.string().optional(),
+})
+export type ConnectionStatus = z.infer<typeof ConnectionStatus>
+
+const Restore = z.object({
+ workspaceID: WorkspaceID.zod,
+ sessionID: SessionID.zod,
+ total: z.number().int().min(0),
+ step: z.number().int().min(0),
+})
+
+export const Event = {
+ Ready: BusEvent.define(
+ "workspace.ready",
+ z.object({
+ name: z.string(),
+ }),
+ ),
+ Failed: BusEvent.define(
+ "workspace.failed",
+ z.object({
+ message: z.string(),
+ }),
+ ),
+ Restore: BusEvent.define("workspace.restore", Restore),
+ Status: BusEvent.define("workspace.status", ConnectionStatus),
+}
- export const Event = {
- Ready: BusEvent.define(
- "workspace.ready",
- z.object({
- name: z.string(),
- }),
- ),
- Failed: BusEvent.define(
- "workspace.failed",
- z.object({
- message: z.string(),
- }),
- ),
- Restore: BusEvent.define("workspace.restore", Restore),
- Status: BusEvent.define("workspace.status", ConnectionStatus),
+function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
+ return {
+ id: row.id,
+ type: row.type,
+ branch: row.branch,
+ name: row.name,
+ directory: row.directory,
+ extra: row.extra,
+ projectID: row.project_id,
}
+}
- function fromRow(row: typeof WorkspaceTable.$inferSelect): Info {
- return {
- id: row.id,
- type: row.type,
- branch: row.branch,
- name: row.name,
- directory: row.directory,
- extra: row.extra,
- projectID: row.project_id,
- }
+const CreateInput = z.object({
+ id: WorkspaceID.zod.optional(),
+ type: Info.shape.type,
+ branch: Info.shape.branch,
+ projectID: ProjectID.zod,
+ extra: Info.shape.extra,
+})
+
+export const create = fn(CreateInput, async (input) => {
+ const id = WorkspaceID.ascending(input.id)
+ const adaptor = await getAdaptor(input.projectID, input.type)
+
+ const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null })
+
+ const info: Info = {
+ id,
+ type: config.type,
+ branch: config.branch ?? null,
+ name: config.name ?? null,
+ directory: config.directory ?? null,
+ extra: config.extra ?? null,
+ projectID: input.projectID,
}
- const CreateInput = z.object({
- id: WorkspaceID.zod.optional(),
- type: Info.shape.type,
- branch: Info.shape.branch,
- projectID: ProjectID.zod,
- extra: Info.shape.extra,
+ Database.use((db) => {
+ db.insert(WorkspaceTable)
+ .values({
+ id: info.id,
+ type: info.type,
+ branch: info.branch,
+ name: info.name,
+ directory: info.directory,
+ extra: info.extra,
+ project_id: info.projectID,
+ })
+ .run()
})
- export const create = fn(CreateInput, async (input) => {
- const id = WorkspaceID.ascending(input.id)
- const adaptor = await getAdaptor(input.projectID, input.type)
+ const env = {
+ OPENCODE_AUTH_CONTENT: JSON.stringify(await AppRuntime.runPromise(Auth.Service.use((auth) => auth.all()))),
+ OPENCODE_WORKSPACE_ID: config.id,
+ OPENCODE_EXPERIMENTAL_WORKSPACES: "true",
+ }
+ await adaptor.create(config, env)
- const config = await adaptor.configure({ ...input, id, name: Slug.create(), directory: null })
+ startSync(info)
- const info: Info = {
- id,
- type: config.type,
- branch: config.branch ?? null,
- name: config.name ?? null,
- directory: config.directory ?? null,
- extra: config.extra ?? null,
- projectID: input.projectID,
- }
+ await waitEvent({
+ timeout: TIMEOUT,
+ fn(event) {
+ if (event.workspace === info.id && event.payload.type === Event.Status.type) {
+ const { status } = event.payload.properties
+ return status === "error" || status === "connected"
+ }
+ return false
+ },
+ })
- Database.use((db) => {
- db.insert(WorkspaceTable)
- .values({
- id: info.id,
- type: info.type,
- branch: info.branch,
- name: info.name,
- directory: info.directory,
- extra: info.extra,
- project_id: info.projectID,
- })
- .run()
- })
+ return info
+})
- const env = {
- OPENCODE_AUTH_CONTENT: JSON.stringify(await AppRuntime.runPromise(Auth.Service.use((auth) => auth.all()))),
- OPENCODE_WORKSPACE_ID: config.id,
- OPENCODE_EXPERIMENTAL_WORKSPACES: "true",
- }
- await adaptor.create(config, env)
+const SessionRestoreInput = z.object({
+ workspaceID: WorkspaceID.zod,
+ sessionID: SessionID.zod,
+})
- startSync(info)
+export const sessionRestore = fn(SessionRestoreInput, async (input) => {
+ log.info("session restore requested", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ })
+ try {
+ const space = await get(input.workspaceID)
+ if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
- await waitEvent({
- timeout: TIMEOUT,
- fn(event) {
- if (event.workspace === info.id && event.payload.type === Event.Status.type) {
- const { status } = event.payload.properties
- return status === "error" || status === "connected"
- }
- return false
+ const adaptor = await getAdaptor(space.projectID, space.type)
+ const target = await adaptor.target(space)
+
+ // Need to switch the workspace of the session
+ SyncEvent.run(Session.Event.Updated, {
+ sessionID: input.sessionID,
+ info: {
+ workspaceID: input.workspaceID,
},
})
- return info
- })
+ const rows = Database.use((db) =>
+ db
+ .select({
+ id: EventTable.id,
+ aggregateID: EventTable.aggregate_id,
+ seq: EventTable.seq,
+ type: EventTable.type,
+ data: EventTable.data,
+ })
+ .from(EventTable)
+ .where(eq(EventTable.aggregate_id, input.sessionID))
+ .orderBy(asc(EventTable.seq))
+ .all(),
+ )
+ if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
- const SessionRestoreInput = z.object({
- workspaceID: WorkspaceID.zod,
- sessionID: SessionID.zod,
- })
+ const all = rows
- export const sessionRestore = fn(SessionRestoreInput, async (input) => {
- log.info("session restore requested", {
+ const size = 10
+ const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
+ const total = sets.length
+ log.info("session restore prepared", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
+ workspaceType: space.type,
+ directory: space.directory,
+ target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
+ events: all.length,
+ batches: total,
+ first: all[0]?.seq,
+ last: all.at(-1)?.seq,
})
- try {
- const space = await get(input.workspaceID)
- if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
-
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
-
- // Need to switch the workspace of the session
- SyncEvent.run(Session.Event.Updated, {
- sessionID: input.sessionID,
- info: {
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: input.workspaceID,
+ payload: {
+ type: Event.Restore.type,
+ properties: {
workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ total,
+ step: 0,
},
- })
-
- const rows = Database.use((db) =>
- db
- .select({
- id: EventTable.id,
- aggregateID: EventTable.aggregate_id,
- seq: EventTable.seq,
- type: EventTable.type,
- data: EventTable.data,
- })
- .from(EventTable)
- .where(eq(EventTable.aggregate_id, input.sessionID))
- .orderBy(asc(EventTable.seq))
- .all(),
- )
- if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
-
- const all = rows
-
- const size = 10
- const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
- const total = sets.length
- log.info("session restore prepared", {
+ },
+ })
+ for (const [i, events] of sets.entries()) {
+ log.info("session restore batch starting", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
- workspaceType: space.type,
- directory: space.directory,
+ step: i + 1,
+ total,
+ events: events.length,
+ first: events[0]?.seq,
+ last: events.at(-1)?.seq,
target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
- events: all.length,
- batches: total,
- first: all[0]?.seq,
- last: all.at(-1)?.seq,
})
- GlobalBus.emit("event", {
- directory: "global",
- workspace: input.workspaceID,
- payload: {
- type: Event.Restore.type,
- properties: {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- total,
- step: 0,
- },
- },
- })
- for (const [i, events] of sets.entries()) {
- log.info("session restore batch starting", {
+ if (target.type === "local") {
+ SyncEvent.replayAll(events)
+ log.info("session restore batch replayed locally", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
step: i + 1,
total,
events: events.length,
- first: events[0]?.seq,
- last: events.at(-1)?.seq,
- target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
})
- if (target.type === "local") {
- SyncEvent.replayAll(events)
- log.info("session restore batch replayed locally", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- step: i + 1,
- total,
- events: events.length,
- })
- } else {
- const url = route(target.url, "/sync/replay")
- const headers = new Headers(target.headers)
- headers.set("content-type", "application/json")
- const res = await fetch(url, {
- method: "POST",
- headers,
- body: JSON.stringify({
- directory: space.directory ?? "",
- events,
- }),
- })
- if (!res.ok) {
- const body = await res.text()
- log.error("session restore batch failed", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- step: i + 1,
- total,
- status: res.status,
- body,
- })
- throw new Error(
- `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
- )
- }
- log.info("session restore batch posted", {
+ } else {
+ const url = route(target.url, "/sync/replay")
+ const headers = new Headers(target.headers)
+ headers.set("content-type", "application/json")
+ const res = await fetch(url, {
+ method: "POST",
+ headers,
+ body: JSON.stringify({
+ directory: space.directory ?? "",
+ events,
+ }),
+ })
+ if (!res.ok) {
+ const body = await res.text()
+ log.error("session restore batch failed", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
step: i + 1,
total,
status: res.status,
+ body,
})
+ throw new Error(
+ `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
+ )
}
- GlobalBus.emit("event", {
- directory: "global",
- workspace: input.workspaceID,
- payload: {
- type: Event.Restore.type,
- properties: {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- total,
- step: i + 1,
- },
- },
+ log.info("session restore batch posted", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ status: res.status,
})
}
-
- log.info("session restore complete", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- batches: total,
- })
-
- return {
- total,
- }
- } catch (err) {
- log.error("session restore failed", {
- workspaceID: input.workspaceID,
- sessionID: input.sessionID,
- error: errorData(err),
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: input.workspaceID,
+ payload: {
+ type: Event.Restore.type,
+ properties: {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ total,
+ step: i + 1,
+ },
+ },
})
- throw err
}
- })
- export function list(project: Project.Info) {
- const rows = Database.use((db) =>
- db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
- )
- const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
+ log.info("session restore complete", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ batches: total,
+ })
- for (const space of spaces) startSync(space)
- return spaces
+ return {
+ total,
+ }
+ } catch (err) {
+ log.error("session restore failed", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ error: errorData(err),
+ })
+ throw err
}
+})
- function lookup(id: WorkspaceID) {
- const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
- if (!row) return
- return fromRow(row)
- }
+export function list(project: Project.Info) {
+ const rows = Database.use((db) =>
+ db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
+ )
+ const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
- export const get = fn(WorkspaceID.zod, async (id) => {
- const space = lookup(id)
- if (!space) return
- startSync(space)
- return space
- })
+ for (const space of spaces) startSync(space)
+ return spaces
+}
- export const remove = fn(WorkspaceID.zod, async (id) => {
- const sessions = Database.use((db) =>
- db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
- )
- for (const session of sessions) {
- await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
- }
+function lookup(id: WorkspaceID) {
+ const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
+ if (!row) return
+ return fromRow(row)
+}
- const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
+export const get = fn(WorkspaceID.zod, async (id) => {
+ const space = lookup(id)
+ if (!space) return
+ startSync(space)
+ return space
+})
+
+export const remove = fn(WorkspaceID.zod, async (id) => {
+ const sessions = Database.use((db) =>
+ db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
+ )
+ for (const session of sessions) {
+ await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
+ }
- if (row) {
- stopSync(id)
+ const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
- const info = fromRow(row)
- try {
- const adaptor = await getAdaptor(info.projectID, row.type)
- await adaptor.remove(info)
- } catch {
- log.error("adaptor not available when removing workspace", { type: row.type })
- }
- Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
- return info
- }
- })
+ if (row) {
+ stopSync(id)
- const connections = new Map<WorkspaceID, ConnectionStatus>()
- const aborts = new Map<WorkspaceID, AbortController>()
- const TIMEOUT = 5000
+ const info = fromRow(row)
+ try {
+ const adaptor = await getAdaptor(info.projectID, row.type)
+ await adaptor.remove(info)
+ } catch {
+ log.error("adaptor not available when removing workspace", { type: row.type })
+ }
+ Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
+ return info
+ }
+})
- function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
- const prev = connections.get(id)
- if (prev?.status === status && prev?.error === error) return
- const next = { workspaceID: id, status, error }
- connections.set(id, next)
+const connections = new Map<WorkspaceID, ConnectionStatus>()
+const aborts = new Map<WorkspaceID, AbortController>()
+const TIMEOUT = 5000
- if (status === "error") {
- aborts.delete(id)
- }
+function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
+ const prev = connections.get(id)
+ if (prev?.status === status && prev?.error === error) return
+ const next = { workspaceID: id, status, error }
+ connections.set(id, next)
- GlobalBus.emit("event", {
- directory: "global",
- workspace: id,
- payload: {
- type: Event.Status.type,
- properties: next,
- },
- })
+ if (status === "error") {
+ aborts.delete(id)
}
- export function status(): ConnectionStatus[] {
- return [...connections.values()]
- }
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: id,
+ payload: {
+ type: Event.Status.type,
+ properties: next,
+ },
+ })
+}
- function synced(state: Record<string, number>) {
- const ids = Object.keys(state)
- if (ids.length === 0) return true
+export function status(): ConnectionStatus[] {
+ return [...connections.values()]
+}
- const done = Object.fromEntries(
- Database.use((db) =>
- db
- .select({
- id: EventSequenceTable.aggregate_id,
- seq: EventSequenceTable.seq,
- })
- .from(EventSequenceTable)
- .where(inArray(EventSequenceTable.aggregate_id, ids))
- .all(),
- ).map((row) => [row.id, row.seq]),
- ) as Record<string, number>
-
- return ids.every((id) => {
- return (done[id] ?? -1) >= state[id]
- })
- }
+function synced(state: Record<string, number>) {
+ const ids = Object.keys(state)
+ if (ids.length === 0) return true
- export async function isSyncing(workspaceID: WorkspaceID) {
- return aborts.has(workspaceID)
- }
+ const done = Object.fromEntries(
+ Database.use((db) =>
+ db
+ .select({
+ id: EventSequenceTable.aggregate_id,
+ seq: EventSequenceTable.seq,
+ })
+ .from(EventSequenceTable)
+ .where(inArray(EventSequenceTable.aggregate_id, ids))
+ .all(),
+ ).map((row) => [row.id, row.seq]),
+ ) as Record<string, number>
+
+ return ids.every((id) => {
+ return (done[id] ?? -1) >= state[id]
+ })
+}
- export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
- if (synced(state)) return
+export async function isSyncing(workspaceID: WorkspaceID) {
+ return aborts.has(workspaceID)
+}
- try {
- await waitEvent({
- timeout: TIMEOUT,
- signal,
- fn(event) {
- if (event.workspace !== workspaceID && event.payload.type !== "sync") {
- return false
- }
- return synced(state)
- },
- })
- } catch {
- if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
- throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
- }
+export async function waitForSync(workspaceID: WorkspaceID, state: Record<string, number>, signal?: AbortSignal) {
+ if (synced(state)) return
+
+ try {
+ await waitEvent({
+ timeout: TIMEOUT,
+ signal,
+ fn(event) {
+ if (event.workspace !== workspaceID && event.payload.type !== "sync") {
+ return false
+ }
+ return synced(state)
+ },
+ })
+ } catch {
+ if (signal?.aborted) throw signal.reason ?? new Error("Request aborted")
+ throw new Error(`Timed out waiting for sync fence: ${JSON.stringify(state)}`)
}
+}
- const log = Log.create({ service: "workspace-sync" })
+const log = Log.create({ service: "workspace-sync" })
- function route(url: string | URL, path: string) {
- const next = new URL(url)
- next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
- next.search = ""
- next.hash = ""
- return next
- }
+function route(url: string | URL, path: string) {
+ const next = new URL(url)
+ next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
+ next.search = ""
+ next.hash = ""
+ return next
+}
- async function syncWorkspace(space: Info, signal: AbortSignal) {
- while (!signal.aborted) {
- log.info("connecting to global sync", { workspace: space.name })
- setStatus(space.id, "connecting")
+async function syncWorkspace(space: Info, signal: AbortSignal) {
+ while (!signal.aborted) {
+ log.info("connecting to global sync", { workspace: space.name })
+ setStatus(space.id, "connecting")
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
+ const adaptor = await getAdaptor(space.projectID, space.type)
+ const target = await adaptor.target(space)
- if (target.type === "local") return
+ if (target.type === "local") return
- const res = await fetch(route(target.url, "/global/event"), {
- method: "GET",
- headers: target.headers,
- signal,
- }).catch((err: unknown) => {
- setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
+ const res = await fetch(route(target.url, "/global/event"), {
+ method: "GET",
+ headers: target.headers,
+ signal,
+ }).catch((err: unknown) => {
+ setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
- log.info("failed to connect to global sync", {
- workspace: space.name,
- error: err,
- })
- return undefined
+ log.info("failed to connect to global sync", {
+ workspace: space.name,
+ error: err,
})
+ return undefined
+ })
- if (!res || !res.ok || !res.body) {
- const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
- log.info("failed to connect to global sync", { workspace: space.name, error })
- setStatus(space.id, "error", error)
- await sleep(1000)
- continue
- }
-
- log.info("global sync connected", { workspace: space.name })
- setStatus(space.id, "connected")
+ if (!res || !res.ok || !res.body) {
+ const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
+ log.info("failed to connect to global sync", { workspace: space.name, error })
+ setStatus(space.id, "error", error)
+ await sleep(1000)
+ continue
+ }
- await parseSSE(res.body, signal, (evt: any) => {
- try {
- if (!("payload" in evt)) return
+ log.info("global sync connected", { workspace: space.name })
+ setStatus(space.id, "connected")
- if (evt.payload.type === "sync") {
- SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
- }
+ await parseSSE(res.body, signal, (evt: any) => {
+ try {
+ if (!("payload" in evt)) return
- GlobalBus.emit("event", {
- directory: evt.directory,
- project: evt.project,
- workspace: space.id,
- payload: evt.payload,
- })
- } catch (err) {
- log.info("failed to replay global event", {
- workspaceID: space.id,
- error: err,
- })
+ if (evt.payload.type === "sync") {
+ SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
}
- })
- log.info("disconnected from global sync: " + space.id)
- setStatus(space.id, "disconnected")
+ GlobalBus.emit("event", {
+ directory: evt.directory,
+ project: evt.project,
+ workspace: space.id,
+ payload: evt.payload,
+ })
+ } catch (err) {
+ log.info("failed to replay global event", {
+ workspaceID: space.id,
+ error: err,
+ })
+ }
+ })
+
+ log.info("disconnected from global sync: " + space.id)
+ setStatus(space.id, "disconnected")
- // TODO: Implement exponential backoff
- await sleep(1000)
- }
+ // TODO: Implement exponential backoff
+ await sleep(1000)
}
+}
- async function startSync(space: Info) {
- if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
+async function startSync(space: Info) {
+ if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
- const adaptor = await getAdaptor(space.projectID, space.type)
- const target = await adaptor.target(space)
+ const adaptor = await getAdaptor(space.projectID, space.type)
+ const target = await adaptor.target(space)
- if (target.type === "local") {
- void Filesystem.exists(target.directory).then((exists) => {
- setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
- })
- return
- }
+ if (target.type === "local") {
+ void Filesystem.exists(target.directory).then((exists) => {
+ setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
+ })
+ return
+ }
- if (aborts.has(space.id)) return true
+ if (aborts.has(space.id)) return true
- setStatus(space.id, "disconnected")
+ setStatus(space.id, "disconnected")
- const abort = new AbortController()
- aborts.set(space.id, abort)
+ const abort = new AbortController()
+ aborts.set(space.id, abort)
- void syncWorkspace(space, abort.signal).catch((error) => {
- aborts.delete(space.id)
+ void syncWorkspace(space, abort.signal).catch((error) => {
+ aborts.delete(space.id)
- setStatus(space.id, "error", String(error))
- log.warn("workspace listener failed", {
- workspaceID: space.id,
- error,
- })
+ setStatus(space.id, "error", String(error))
+ log.warn("workspace listener failed", {
+ workspaceID: space.id,
+ error,
})
- }
+ })
+}
- function stopSync(id: WorkspaceID) {
- aborts.get(id)?.abort()
- aborts.delete(id)
- connections.delete(id)
- }
+function stopSync(id: WorkspaceID) {
+ aborts.get(id)?.abort()
+ aborts.delete(id)
+ connections.delete(id)
}
+
+export * as Workspace from "./workspace"