summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorJames Long <[email protected]>2026-04-15 10:18:48 -0400
committerGitHub <[email protected]>2026-04-15 10:18:48 -0400
commitaf20191d1cd60a7f4a421ad81eca5053f7deace1 (patch)
tree4faa83e938c61311f67a189220d427b9f0ed3017 /packages
parent47af00b2452ef7374cdda8769910799938d1303c (diff)
downloadopencode-af20191d1cd60a7f4a421ad81eca5053f7deace1.tar.gz
opencode-af20191d1cd60a7f4a421ad81eca5053f7deace1.zip
feat(core): sync routes, refactor proxy, session restore, and more syncing (#22518)
Diffstat (limited to 'packages')
-rw-r--r--packages/opencode/src/control-plane/workspace.ts258
-rw-r--r--packages/opencode/src/server/instance/index.ts2
-rw-r--r--packages/opencode/src/server/instance/middleware.ts66
-rw-r--r--packages/opencode/src/server/instance/sync.ts118
-rw-r--r--packages/opencode/src/server/instance/workspace.ts71
-rw-r--r--packages/opencode/src/server/middleware.ts2
-rw-r--r--packages/opencode/src/server/proxy.ts55
-rw-r--r--packages/opencode/src/sync/index.ts19
-rw-r--r--packages/sdk/js/src/v2/gen/sdk.gen.ts157
-rw-r--r--packages/sdk/js/src/v2/gen/types.gen.ts127
-rw-r--r--packages/sdk/openapi.json320
11 files changed, 1132 insertions, 63 deletions
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index f330e07b7..78f3d770e 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -1,11 +1,13 @@
import z from "zod"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
-import { Database, eq } from "@/storage/db"
+import { Database, asc, eq } from "@/storage/db"
import { Project } from "@/project/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { SyncEvent } from "@/sync"
+import { EventTable } from "@/sync/event.sql"
+import { Flag } from "@/flag/flag"
import { Log } from "@/util/log"
import { Filesystem } from "@/util/filesystem"
import { ProjectID } from "@/project/schema"
@@ -15,6 +17,11 @@ import { getAdaptor } from "./adaptors"
import { WorkspaceInfo } from "./types"
import { WorkspaceID } from "./schema"
import { parseSSE } from "./sse"
+import { Session } from "@/session"
+import { SessionTable } from "@/session/session.sql"
+import { SessionID } from "@/session/schema"
+import { errorData } from "@/util/error"
+import { AppRuntime } from "@/effect/app-runtime"
export namespace Workspace {
export const Info = WorkspaceInfo.meta({
@@ -29,6 +36,13 @@ export namespace Workspace {
})
export type ConnectionStatus = z.infer<typeof ConnectionStatus>
+ const Restore = z.object({
+ workspaceID: WorkspaceID.zod,
+ sessionID: SessionID.zod,
+ total: z.number().int().min(0),
+ step: z.number().int().min(0),
+ })
+
export const Event = {
Ready: BusEvent.define(
"workspace.ready",
@@ -42,6 +56,7 @@ export namespace Workspace {
message: z.string(),
}),
),
+ Restore: BusEvent.define("workspace.restore", Restore),
Status: BusEvent.define("workspace.status", ConnectionStatus),
}
@@ -102,11 +117,170 @@ export namespace Workspace {
return info
})
+ const SessionRestoreInput = z.object({
+ workspaceID: WorkspaceID.zod,
+ sessionID: SessionID.zod,
+ })
+
+ export const sessionRestore = fn(SessionRestoreInput, async (input) => {
+ log.info("session restore requested", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ })
+ try {
+ const space = await get(input.workspaceID)
+ if (!space) throw new Error(`Workspace not found: ${input.workspaceID}`)
+
+ const adaptor = await getAdaptor(space.projectID, space.type)
+ const target = await adaptor.target(space)
+
+ // Need to switch the workspace of the session
+ SyncEvent.run(Session.Event.Updated, {
+ sessionID: input.sessionID,
+ info: {
+ workspaceID: input.workspaceID,
+ },
+ })
+
+ const rows = Database.use((db) =>
+ db
+ .select({
+ id: EventTable.id,
+ aggregateID: EventTable.aggregate_id,
+ seq: EventTable.seq,
+ type: EventTable.type,
+ data: EventTable.data,
+ })
+ .from(EventTable)
+ .where(eq(EventTable.aggregate_id, input.sessionID))
+ .orderBy(asc(EventTable.seq))
+ .all(),
+ )
+ if (rows.length === 0) throw new Error(`No events found for session: ${input.sessionID}`)
+
+ const all = rows
+
+ const size = 10
+ const sets = Array.from({ length: Math.ceil(all.length / size) }, (_, i) => all.slice(i * size, (i + 1) * size))
+ const total = sets.length
+ log.info("session restore prepared", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ workspaceType: space.type,
+ directory: space.directory,
+ target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
+ events: all.length,
+ batches: total,
+ first: all[0]?.seq,
+ last: all.at(-1)?.seq,
+ })
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: input.workspaceID,
+ payload: {
+ type: Event.Restore.type,
+ properties: {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ total,
+ step: 0,
+ },
+ },
+ })
+ for (const [i, events] of sets.entries()) {
+ log.info("session restore batch starting", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ events: events.length,
+ first: events[0]?.seq,
+ last: events.at(-1)?.seq,
+ target: target.type === "remote" ? String(route(target.url, "/sync/replay")) : target.directory,
+ })
+ if (target.type === "local") {
+ SyncEvent.replayAll(events)
+ log.info("session restore batch replayed locally", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ events: events.length,
+ })
+ } else {
+ const url = route(target.url, "/sync/replay")
+ const headers = new Headers(target.headers)
+ headers.set("content-type", "application/json")
+ const res = await fetch(url, {
+ method: "POST",
+ headers,
+ body: JSON.stringify({
+ directory: space.directory ?? "",
+ events,
+ }),
+ })
+ if (!res.ok) {
+ const body = await res.text()
+ log.error("session restore batch failed", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ status: res.status,
+ body,
+ })
+ throw new Error(
+ `Failed to replay session ${input.sessionID} into workspace ${input.workspaceID}: HTTP ${res.status} ${body}`,
+ )
+ }
+ log.info("session restore batch posted", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ step: i + 1,
+ total,
+ status: res.status,
+ })
+ }
+ GlobalBus.emit("event", {
+ directory: "global",
+ workspace: input.workspaceID,
+ payload: {
+ type: Event.Restore.type,
+ properties: {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ total,
+ step: i + 1,
+ },
+ },
+ })
+ }
+
+ log.info("session restore complete", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ batches: total,
+ })
+
+ return {
+ total,
+ }
+ } catch (err) {
+ log.error("session restore failed", {
+ workspaceID: input.workspaceID,
+ sessionID: input.sessionID,
+ error: errorData(err),
+ })
+ throw err
+ }
+ })
+
export function list(project: Project.Info) {
const rows = Database.use((db) =>
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
)
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
+
for (const space of spaces) startSync(space)
return spaces
}
@@ -120,13 +294,25 @@ export namespace Workspace {
})
export const remove = fn(WorkspaceID.zod, async (id) => {
+ const sessions = Database.use((db) =>
+ db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.workspace_id, id)).all(),
+ )
+ for (const session of sessions) {
+ await AppRuntime.runPromise(Session.Service.use((svc) => svc.remove(session.id)))
+ }
+
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
+
if (row) {
stopSync(id)
const info = fromRow(row)
- const adaptor = await getAdaptor(info.projectID, row.type)
- adaptor.remove(info)
+ try {
+ const adaptor = await getAdaptor(info.projectID, row.type)
+ await adaptor.remove(info)
+ } catch (err) {
+ log.error("adaptor not available when removing workspace", { type: row.type })
+ }
Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run())
return info
}
@@ -156,51 +342,81 @@ export namespace Workspace {
const log = Log.create({ service: "workspace-sync" })
- async function workspaceEventLoop(space: Info, signal: AbortSignal) {
- log.info("starting sync: " + space.id)
+ function route(url: string | URL, path: string) {
+ const next = new URL(url)
+ next.pathname = `${next.pathname.replace(/\/$/, "")}${path}`
+ next.search = ""
+ next.hash = ""
+ return next
+ }
+ async function syncWorkspace(space: Info, signal: AbortSignal) {
while (!signal.aborted) {
- log.info("connecting to sync: " + space.id)
+ log.info("connecting to global sync", { workspace: space.name })
- setStatus(space.id, "connecting")
const adaptor = await getAdaptor(space.projectID, space.type)
const target = await adaptor.target(space)
if (target.type === "local") return
- const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => {
- setStatus(space.id, "error", String(err))
+ const res = await fetch(route(target.url, "/global/event"), {
+ method: "GET",
+ headers: target.headers,
+ signal,
+ }).catch((err: unknown) => {
+ setStatus(space.id, "error")
+
+ log.info("failed to connect to global sync", {
+ workspace: space.name,
+ error: err,
+ })
return undefined
})
- if (!res || !res.ok || !res.body) {
- log.info("failed to connect to sync: " + res?.status)
- setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response")
+ if (!res || !res.ok || !res.body) {
+ log.info("failed to connect to global sync", { workspace: space.name })
+ setStatus(space.id, "error")
await sleep(1000)
continue
}
+
+ log.info("global sync connected", { workspace: space.name })
setStatus(space.id, "connected")
- await parseSSE(res.body, signal, (evt) => {
- const event = evt as SyncEvent.SerializedEvent
+ await parseSSE(res.body, signal, (evt: any) => {
try {
- if (!event.type.startsWith("server.")) {
- SyncEvent.replay(event)
+ if (!("payload" in evt)) return
+
+ if (evt.payload.type === "sync") {
+ // This name -> type is temporary
+ SyncEvent.replay({ ...evt.payload, type: evt.payload.name } as SyncEvent.SerializedEvent)
}
+
+ GlobalBus.emit("event", {
+ directory: evt.directory,
+ project: evt.project,
+ workspace: space.id,
+ payload: evt.payload,
+ })
} catch (err) {
- log.warn("failed to replay sync event", {
+ log.info("failed to replay global event", {
workspaceID: space.id,
error: err,
})
}
})
+
+ log.info("disconnected from global sync: " + space.id)
setStatus(space.id, "disconnected")
- log.info("disconnected to sync: " + space.id)
- await sleep(250)
+
+ // TODO: Implement exponential backoff
+ await sleep(1000)
}
}
function startSync(space: Info) {
+ if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
+
if (space.type === "worktree") {
void Filesystem.exists(space.directory!).then((exists) => {
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
@@ -213,9 +429,9 @@ export namespace Workspace {
aborts.set(space.id, abort)
setStatus(space.id, "disconnected")
- void workspaceEventLoop(space, abort.signal).catch((error) => {
+ void syncWorkspace(space, abort.signal).catch((error) => {
setStatus(space.id, "error", String(error))
- log.warn("workspace sync listener failed", {
+ log.warn("workspace listener failed", {
workspaceID: space.id,
error,
})
diff --git a/packages/opencode/src/server/instance/index.ts b/packages/opencode/src/server/instance/index.ts
index 86a18dc67..4a03b7b29 100644
--- a/packages/opencode/src/server/instance/index.ts
+++ b/packages/opencode/src/server/instance/index.ts
@@ -23,6 +23,7 @@ import { ConfigRoutes } from "./config"
import { ExperimentalRoutes } from "./experimental"
import { ProviderRoutes } from "./provider"
import { EventRoutes } from "./event"
+import { SyncRoutes } from "./sync"
import { WorkspaceRouterMiddleware } from "./middleware"
import { AppRuntime } from "@/effect/app-runtime"
@@ -37,6 +38,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono =>
.route("/permission", PermissionRoutes())
.route("/question", QuestionRoutes())
.route("/provider", ProviderRoutes())
+ .route("/sync", SyncRoutes())
.route("/", FileRoutes())
.route("/", EventRoutes())
.route("/mcp", McpRoutes())
diff --git a/packages/opencode/src/server/instance/middleware.ts b/packages/opencode/src/server/instance/middleware.ts
index 9155ad451..824c265ef 100644
--- a/packages/opencode/src/server/instance/middleware.ts
+++ b/packages/opencode/src/server/instance/middleware.ts
@@ -11,9 +11,12 @@ import { Session } from "@/session"
import { SessionID } from "@/session/schema"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { AppRuntime } from "@/effect/app-runtime"
+import { Log } from "@/util/log"
type Rule = { method?: string; path: string; exact?: boolean; action: "local" | "forward" }
+const OPENCODE_WORKSPACE = process.env.OPENCODE_WORKSPACE
+
const RULES: Array<Rule> = [
{ path: "/session/status", action: "forward" },
{ method: "GET", path: "/session", action: "local" },
@@ -46,6 +49,8 @@ async function getSessionWorkspace(url: URL) {
}
export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): MiddlewareHandler {
+ const log = Log.create({ service: "workspace-router" })
+
return async (c, next) => {
const raw = c.req.query("directory") || c.req.header("x-opencode-directory") || process.cwd()
const directory = Filesystem.resolve(
@@ -63,8 +68,22 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
const sessionWorkspaceID = await getSessionWorkspace(url)
const workspaceID = sessionWorkspaceID || url.searchParams.get("workspace")
- // If no workspace is provided we use the project
- if (!workspaceID) {
+ if (!workspaceID || url.pathname.startsWith("/console") || OPENCODE_WORKSPACE) {
+ if (OPENCODE_WORKSPACE) {
+ return WorkspaceContext.provide({
+ workspaceID: WorkspaceID.make(OPENCODE_WORKSPACE),
+ async fn() {
+ return Instance.provide({
+ directory,
+ init: () => AppRuntime.runPromise(InstanceBootstrap),
+ async fn() {
+ return next()
+ },
+ })
+ },
+ })
+ }
+
return Instance.provide({
directory,
init: () => AppRuntime.runPromise(InstanceBootstrap),
@@ -77,16 +96,6 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
const workspace = await Workspace.get(WorkspaceID.make(workspaceID))
if (!workspace) {
- // Special-case deleting a session in case user's data in a
- // weird state. Allow them to forcefully delete a synced session
- // even if the remote workspace is not in their data.
- //
- // The lets the `DELETE /session/:id` endpoint through and we've
- // made sure that it will run without an instance
- if (url.pathname.match(/\/session\/[^/]+$/) && c.req.method === "DELETE") {
- return next()
- }
-
return new Response(`Workspace not found: ${workspaceID}`, {
status: 500,
headers: {
@@ -95,6 +104,12 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
})
}
+ if (local(c.req.method, url.pathname)) {
+ // No instance provided because we are serving cached data; there
+ // is no instance to work with
+ return next()
+ }
+
const adaptor = await getAdaptor(workspace.projectID, workspace.type)
const target = await adaptor.target(workspace)
@@ -112,24 +127,27 @@ export function WorkspaceRouterMiddleware(upgrade: UpgradeWebSocket): Middleware
})
}
- if (local(c.req.method, url.pathname)) {
- // No instance provided because we are serving cached data; there
- // is no instance to work with
- return next()
- }
+ const proxyURL = new URL(target.url)
+ proxyURL.pathname = `${proxyURL.pathname.replace(/\/$/, "")}${url.pathname}`
+ proxyURL.search = url.search
+ proxyURL.hash = url.hash
+ proxyURL.searchParams.delete("workspace")
+
+ log.info("workspace proxy forwarding", {
+ workspaceID,
+ request: url.toString(),
+ target: String(target.url),
+ proxy: proxyURL.toString(),
+ })
if (c.req.header("upgrade")?.toLowerCase() === "websocket") {
- return ServerProxy.websocket(upgrade, target, c.req.raw, c.env)
+ return ServerProxy.websocket(upgrade, proxyURL, target.headers, c.req.raw, c.env)
}
const headers = new Headers(c.req.raw.headers)
headers.delete("x-opencode-workspace")
- return ServerProxy.http(
- target,
- new Request(c.req.raw, {
- headers,
- }),
- )
+ const req = new Request(c.req.raw, { headers })
+ return ServerProxy.http(proxyURL, target.headers, req)
}
}
diff --git a/packages/opencode/src/server/instance/sync.ts b/packages/opencode/src/server/instance/sync.ts
new file mode 100644
index 000000000..c22969130
--- /dev/null
+++ b/packages/opencode/src/server/instance/sync.ts
@@ -0,0 +1,118 @@
+import z from "zod"
+import { Hono } from "hono"
+import { describeRoute, validator, resolver } from "hono-openapi"
+import { SyncEvent } from "@/sync"
+import { Database, asc, and, not, or, lte, eq } from "@/storage/db"
+import { EventTable } from "@/sync/event.sql"
+import { lazy } from "@/util/lazy"
+import { Log } from "@/util/log"
+import { errors } from "../error"
+
+const ReplayEvent = z.object({
+ id: z.string(),
+ aggregateID: z.string(),
+ seq: z.number().int().min(0),
+ type: z.string(),
+ data: z.record(z.string(), z.unknown()),
+})
+
+const log = Log.create({ service: "server.sync" })
+
+export const SyncRoutes = lazy(() =>
+ new Hono()
+ .post(
+ "/replay",
+ describeRoute({
+ summary: "Replay sync events",
+ description: "Validate and replay a complete sync event history.",
+ operationId: "sync.replay",
+ responses: {
+ 200: {
+ description: "Replayed sync events",
+ content: {
+ "application/json": {
+ schema: resolver(
+ z.object({
+ sessionID: z.string(),
+ }),
+ ),
+ },
+ },
+ },
+ ...errors(400),
+ },
+ }),
+ validator(
+ "json",
+ z.object({
+ directory: z.string(),
+ events: z.array(ReplayEvent).min(1),
+ }),
+ ),
+ async (c) => {
+ const body = c.req.valid("json")
+ const events = body.events
+ const source = events[0].aggregateID
+ log.info("sync replay requested", {
+ sessionID: source,
+ events: events.length,
+ first: events[0]?.seq,
+ last: events.at(-1)?.seq,
+ directory: body.directory,
+ })
+ SyncEvent.replayAll(events)
+
+ log.info("sync replay complete", {
+ sessionID: source,
+ events: events.length,
+ first: events[0]?.seq,
+ last: events.at(-1)?.seq,
+ })
+
+ return c.json({
+ sessionID: source,
+ })
+ },
+ )
+ .get(
+ "/history",
+ describeRoute({
+ summary: "List sync events",
+ description:
+ "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
+ operationId: "sync.history.list",
+ responses: {
+ 200: {
+ description: "Sync events",
+ content: {
+ "application/json": {
+ schema: resolver(
+ z.array(
+ z.object({
+ id: z.string(),
+ aggregate_id: z.string(),
+ seq: z.number(),
+ type: z.string(),
+ data: z.record(z.string(), z.unknown()),
+ }),
+ ),
+ ),
+ },
+ },
+ },
+ ...errors(400),
+ },
+ }),
+ validator("json", z.record(z.string(), z.number().int().min(0))),
+ async (c) => {
+ const body = c.req.valid("json")
+ const exclude = Object.entries(body)
+ const where =
+ exclude.length > 0
+ ? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!)
+ : undefined
+ const rows = Database.use((db) => db.select().from(EventTable).where(where).orderBy(asc(EventTable.seq)).all())
+ return c.json(rows)
+ },
+ ),
+)
diff --git a/packages/opencode/src/server/instance/workspace.ts b/packages/opencode/src/server/instance/workspace.ts
index 7cee03197..a4ff4eda8 100644
--- a/packages/opencode/src/server/instance/workspace.ts
+++ b/packages/opencode/src/server/instance/workspace.ts
@@ -6,12 +6,10 @@ import { Workspace } from "../../control-plane/workspace"
import { Instance } from "../../project/instance"
import { errors } from "../error"
import { lazy } from "../../util/lazy"
+import { Log } from "@/util/log"
+import { errorData } from "@/util/error"
-const WorkspaceAdaptor = z.object({
- type: z.string(),
- name: z.string(),
- description: z.string(),
-})
+const log = Log.create({ service: "server.workspace" })
export const WorkspaceRoutes = lazy(() =>
new Hono()
@@ -26,7 +24,15 @@ export const WorkspaceRoutes = lazy(() =>
description: "Workspace adaptors",
content: {
"application/json": {
- schema: resolver(z.array(WorkspaceAdaptor)),
+ schema: resolver(
+ z.array(
+ z.object({
+ type: z.string(),
+ name: z.string(),
+ description: z.string(),
+ }),
+ ),
+ ),
},
},
},
@@ -140,5 +146,58 @@ export const WorkspaceRoutes = lazy(() =>
const { id } = c.req.valid("param")
return c.json(await Workspace.remove(id))
},
+ )
+ .post(
+ "/:id/session-restore",
+ describeRoute({
+ summary: "Restore session into workspace",
+ description: "Replay a session's sync events into the target workspace in batches.",
+ operationId: "experimental.workspace.sessionRestore",
+ responses: {
+ 200: {
+ description: "Session replay started",
+ content: {
+ "application/json": {
+ schema: resolver(
+ z.object({
+ total: z.number().int().min(0),
+ }),
+ ),
+ },
+ },
+ },
+ ...errors(400),
+ },
+ }),
+ validator("param", z.object({ id: Workspace.Info.shape.id })),
+ validator("json", Workspace.sessionRestore.schema.omit({ workspaceID: true })),
+ async (c) => {
+ const { id } = c.req.valid("param")
+ const body = c.req.valid("json")
+ log.info("session restore route requested", {
+ workspaceID: id,
+ sessionID: body.sessionID,
+ directory: Instance.directory,
+ })
+ try {
+ const result = await Workspace.sessionRestore({
+ workspaceID: id,
+ ...body,
+ })
+ log.info("session restore route complete", {
+ workspaceID: id,
+ sessionID: body.sessionID,
+ total: result.total,
+ })
+ return c.json(result)
+ } catch (err) {
+ log.error("session restore route failed", {
+ workspaceID: id,
+ sessionID: body.sessionID,
+ error: errorData(err),
+ })
+ throw err
+ }
+ },
),
)
diff --git a/packages/opencode/src/server/middleware.ts b/packages/opencode/src/server/middleware.ts
index a51ba602b..d0539eb24 100644
--- a/packages/opencode/src/server/middleware.ts
+++ b/packages/opencode/src/server/middleware.ts
@@ -86,7 +86,7 @@ const zipped = compress()
export const CompressionMiddleware: MiddlewareHandler = (c, next) => {
const path = c.req.path
const method = c.req.method
- if (path === "/event" || path === "/global/event" || path === "/global/sync-event") return next()
+ if (path === "/event" || path === "/global/event") return next()
if (method === "POST" && /\/session\/[^/]+\/(message|prompt_async)$/.test(path)) return next()
return zipped(c, next)
}
diff --git a/packages/opencode/src/server/proxy.ts b/packages/opencode/src/server/proxy.ts
index c90a657dc..0c0deba20 100644
--- a/packages/opencode/src/server/proxy.ts
+++ b/packages/opencode/src/server/proxy.ts
@@ -1,6 +1,6 @@
-import type { Target } from "@/control-plane/types"
import { Hono } from "hono"
import type { UpgradeWebSocket } from "hono/ws"
+import { Log } from "@/util/log"
const hop = new Set([
"connection",
@@ -20,6 +20,7 @@ type Msg = string | ArrayBuffer | Uint8Array
function headers(req: Request, extra?: HeadersInit) {
const out = new Headers(req.headers)
for (const key of hop) out.delete(key)
+ out.delete("accept-encoding")
out.delete("x-opencode-directory")
out.delete("x-opencode-workspace")
if (!extra) return out
@@ -98,31 +99,63 @@ const app = (upgrade: UpgradeWebSocket) =>
)
export namespace ServerProxy {
- export function http(target: Extract<Target, { type: "remote" }>, req: Request) {
+ const log = Log.Default.clone().tag("service", "server-proxy")
+
+ export function http(url: string | URL, extra: HeadersInit | undefined, req: Request) {
+ console.log("proxy http request", {
+ method: req.method,
+ request: req.url,
+ url: String(url),
+ })
return fetch(
- new Request(target.url, {
+ new Request(url, {
method: req.method,
- headers: headers(req, target.headers),
+ headers: headers(req, extra),
body: req.method === "GET" || req.method === "HEAD" ? undefined : req.body,
redirect: "manual",
signal: req.signal,
}),
- )
+ ).then((res) => {
+ const next = new Headers(res.headers)
+ next.delete("content-encoding")
+ next.delete("content-length")
+
+ console.log("proxy http response", {
+ method: req.method,
+ request: req.url,
+ url: String(url),
+ status: res.status,
+ statusText: res.statusText,
+ })
+ return new Response(res.body, {
+ status: res.status,
+ statusText: res.statusText,
+ headers: next,
+ })
+ })
}
export function websocket(
upgrade: UpgradeWebSocket,
- target: Extract<Target, { type: "remote" }>,
+ target: string | URL,
+ extra: HeadersInit | undefined,
req: Request,
env: unknown,
) {
- const url = new URL(req.url)
- url.pathname = "/__workspace_ws"
- url.search = ""
+ const proxy = new URL(req.url)
+ proxy.pathname = "/__workspace_ws"
+ proxy.search = ""
const next = new Headers(req.headers)
- next.set("x-opencode-proxy-url", socket(target.url))
+ next.set("x-opencode-proxy-url", socket(target))
+ for (const [key, value] of new Headers(extra).entries()) {
+ next.set(key, value)
+ }
+ log.info("proxy websocket", {
+ request: req.url,
+ target: String(target),
+ })
return app(upgrade).fetch(
- new Request(url, {
+ new Request(proxy, {
method: req.method,
headers: next,
signal: req.signal,
diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts
index d7cb7f774..ce598dae6 100644
--- a/packages/opencode/src/sync/index.ts
+++ b/packages/opencode/src/sync/index.ts
@@ -199,6 +199,25 @@ export namespace SyncEvent {
process(def, event, { publish: !!options?.publish })
}
+ export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
+ const source = events[0]?.aggregateID
+ if (!source) return
+ if (events.some((item) => item.aggregateID !== source)) {
+ throw new Error("Replay events must belong to the same session")
+ }
+ const start = events[0].seq
+ for (const [i, item] of events.entries()) {
+ const seq = start + i
+ if (item.seq !== seq) {
+ throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
+ }
+ }
+ for (const item of events) {
+ replay(item, options)
+ }
+ return source
+ }
+
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
diff --git a/packages/sdk/js/src/v2/gen/sdk.gen.ts b/packages/sdk/js/src/v2/gen/sdk.gen.ts
index b5fc976bb..d7bf43f50 100644
--- a/packages/sdk/js/src/v2/gen/sdk.gen.ts
+++ b/packages/sdk/js/src/v2/gen/sdk.gen.ts
@@ -35,6 +35,8 @@ import type {
ExperimentalWorkspaceListResponses,
ExperimentalWorkspaceRemoveErrors,
ExperimentalWorkspaceRemoveResponses,
+ ExperimentalWorkspaceSessionRestoreErrors,
+ ExperimentalWorkspaceSessionRestoreResponses,
ExperimentalWorkspaceStatusResponses,
FileListResponses,
FilePartInput,
@@ -157,6 +159,10 @@ import type {
SessionUpdateErrors,
SessionUpdateResponses,
SubtaskPartInput,
+ SyncHistoryListErrors,
+ SyncHistoryListResponses,
+ SyncReplayErrors,
+ SyncReplayResponses,
TextPartInput,
ToolIdsErrors,
ToolIdsResponses,
@@ -1243,6 +1249,49 @@ export class Workspace extends HeyApiClient {
})
}
+ /**
+ * Restore session into workspace
+ *
+ * Replay a session's sync events into the target workspace in batches.
+ */
+ public sessionRestore<ThrowOnError extends boolean = false>(
+ parameters: {
+ id: string
+ directory?: string
+ workspace?: string
+ sessionID?: string
+ },
+ options?: Options<never, ThrowOnError>,
+ ) {
+ const params = buildClientParams(
+ [parameters],
+ [
+ {
+ args: [
+ { in: "path", key: "id" },
+ { in: "query", key: "directory" },
+ { in: "query", key: "workspace" },
+ { in: "body", key: "sessionID" },
+ ],
+ },
+ ],
+ )
+ return (options?.client ?? this.client).post<
+ ExperimentalWorkspaceSessionRestoreResponses,
+ ExperimentalWorkspaceSessionRestoreErrors,
+ ThrowOnError
+ >({
+ url: "/experimental/workspace/{id}/session-restore",
+ ...options,
+ ...params,
+ headers: {
+ "Content-Type": "application/json",
+ ...options?.headers,
+ ...params.headers,
+ },
+ })
+ }
+
private _adaptor?: Adaptor
get adaptor(): Adaptor {
return (this._adaptor ??= new Adaptor({ client: this.client }))
@@ -2961,6 +3010,109 @@ export class Provider extends HeyApiClient {
}
}
+export class History extends HeyApiClient {
+ /**
+ * List sync events
+ *
+ * List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.
+ */
+ public list<ThrowOnError extends boolean = false>(
+ parameters?: {
+ directory?: string
+ workspace?: string
+ body?: {
+ [key: string]: number
+ }
+ },
+ options?: Options<never, ThrowOnError>,
+ ) {
+ const params = buildClientParams(
+ [parameters],
+ [
+ {
+ args: [
+ { in: "query", key: "directory" },
+ { in: "query", key: "workspace" },
+ { key: "body", map: "body" },
+ ],
+ },
+ ],
+ )
+ return (options?.client ?? this.client).get<SyncHistoryListResponses, SyncHistoryListErrors, ThrowOnError>({
+ url: "/sync/history",
+ ...options,
+ ...params,
+ headers: {
+ "Content-Type": "application/json",
+ ...options?.headers,
+ ...params.headers,
+ },
+ })
+ }
+}
+
+export class Sync extends HeyApiClient {
+ /**
+ * Replay sync events
+ *
+ * Validate and replay a complete sync event history.
+ */
+ public replay<ThrowOnError extends boolean = false>(
+ parameters?: {
+ query_directory?: string
+ workspace?: string
+ body_directory?: string
+ events?: Array<{
+ id: string
+ aggregateID: string
+ seq: number
+ type: string
+ data: {
+ [key: string]: unknown
+ }
+ }>
+ },
+ options?: Options<never, ThrowOnError>,
+ ) {
+ const params = buildClientParams(
+ [parameters],
+ [
+ {
+ args: [
+ {
+ in: "query",
+ key: "query_directory",
+ map: "directory",
+ },
+ { in: "query", key: "workspace" },
+ {
+ in: "body",
+ key: "body_directory",
+ map: "directory",
+ },
+ { in: "body", key: "events" },
+ ],
+ },
+ ],
+ )
+ return (options?.client ?? this.client).post<SyncReplayResponses, SyncReplayErrors, ThrowOnError>({
+ url: "/sync/replay",
+ ...options,
+ ...params,
+ headers: {
+ "Content-Type": "application/json",
+ ...options?.headers,
+ ...params.headers,
+ },
+ })
+ }
+
+ private _history?: History
+ get history(): History {
+ return (this._history ??= new History({ client: this.client }))
+ }
+}
+
export class Find extends HeyApiClient {
/**
* Find text
@@ -4217,6 +4369,11 @@ export class OpencodeClient extends HeyApiClient {
return (this._provider ??= new Provider({ client: this.client }))
}
+ private _sync?: Sync
+ get sync(): Sync {
+ return (this._sync ??= new Sync({ client: this.client }))
+ }
+
private _find?: Find
get find(): Find {
return (this._find ??= new Find({ client: this.client }))
diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts
index 8f4c16c5b..24c1d53bf 100644
--- a/packages/sdk/js/src/v2/gen/types.gen.ts
+++ b/packages/sdk/js/src/v2/gen/types.gen.ts
@@ -520,6 +520,16 @@ export type EventWorkspaceFailed = {
}
}
+export type EventWorkspaceRestore = {
+ type: "workspace.restore"
+ properties: {
+ workspaceID: string
+ sessionID: string
+ total: number
+ step: number
+ }
+}
+
export type EventWorkspaceStatus = {
type: "workspace.status"
properties: {
@@ -1137,6 +1147,7 @@ export type GlobalEvent = {
| EventPtyDeleted
| EventWorkspaceReady
| EventWorkspaceFailed
+ | EventWorkspaceRestore
| EventWorkspaceStatus
| EventMessageUpdated
| EventMessageRemoved
@@ -2049,6 +2060,7 @@ export type Event =
| EventPtyDeleted
| EventWorkspaceReady
| EventWorkspaceFailed
+ | EventWorkspaceRestore
| EventWorkspaceStatus
| EventMessageUpdated
| EventMessageRemoved
@@ -3006,6 +3018,42 @@ export type ExperimentalWorkspaceRemoveResponses = {
export type ExperimentalWorkspaceRemoveResponse =
ExperimentalWorkspaceRemoveResponses[keyof ExperimentalWorkspaceRemoveResponses]
+export type ExperimentalWorkspaceSessionRestoreData = {
+ body?: {
+ sessionID: string
+ }
+ path: {
+ id: string
+ }
+ query?: {
+ directory?: string
+ workspace?: string
+ }
+ url: "/experimental/workspace/{id}/session-restore"
+}
+
+export type ExperimentalWorkspaceSessionRestoreErrors = {
+ /**
+ * Bad request
+ */
+ 400: BadRequestError
+}
+
+export type ExperimentalWorkspaceSessionRestoreError =
+ ExperimentalWorkspaceSessionRestoreErrors[keyof ExperimentalWorkspaceSessionRestoreErrors]
+
+export type ExperimentalWorkspaceSessionRestoreResponses = {
+ /**
+ * Session replay started
+ */
+ 200: {
+ total: number
+ }
+}
+
+export type ExperimentalWorkspaceSessionRestoreResponse =
+ ExperimentalWorkspaceSessionRestoreResponses[keyof ExperimentalWorkspaceSessionRestoreResponses]
+
export type WorktreeRemoveData = {
body?: WorktreeRemoveInput
path?: never
@@ -4456,6 +4504,85 @@ export type ProviderOauthCallbackResponses = {
export type ProviderOauthCallbackResponse = ProviderOauthCallbackResponses[keyof ProviderOauthCallbackResponses]
+export type SyncReplayData = {
+ body?: {
+ directory: string
+ events: Array<{
+ id: string
+ aggregateID: string
+ seq: number
+ type: string
+ data: {
+ [key: string]: unknown
+ }
+ }>
+ }
+ path?: never
+ query?: {
+ directory?: string
+ workspace?: string
+ }
+ url: "/sync/replay"
+}
+
+export type SyncReplayErrors = {
+ /**
+ * Bad request
+ */
+ 400: BadRequestError
+}
+
+export type SyncReplayError = SyncReplayErrors[keyof SyncReplayErrors]
+
+export type SyncReplayResponses = {
+ /**
+ * Replayed sync events
+ */
+ 200: {
+ sessionID: string
+ }
+}
+
+export type SyncReplayResponse = SyncReplayResponses[keyof SyncReplayResponses]
+
+export type SyncHistoryListData = {
+ body?: {
+ [key: string]: number
+ }
+ path?: never
+ query?: {
+ directory?: string
+ workspace?: string
+ }
+ url: "/sync/history"
+}
+
+export type SyncHistoryListErrors = {
+ /**
+ * Bad request
+ */
+ 400: BadRequestError
+}
+
+export type SyncHistoryListError = SyncHistoryListErrors[keyof SyncHistoryListErrors]
+
+export type SyncHistoryListResponses = {
+ /**
+ * Sync events
+ */
+ 200: Array<{
+ id: string
+ aggregate_id: string
+ seq: number
+ type: string
+ data: {
+ [key: string]: unknown
+ }
+ }>
+}
+
+export type SyncHistoryListResponse = SyncHistoryListResponses[keyof SyncHistoryListResponses]
+
export type FindTextData = {
body?: never
path?: never
diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json
index 6000e6604..ee3538d55 100644
--- a/packages/sdk/openapi.json
+++ b/packages/sdk/openapi.json
@@ -1805,6 +1805,90 @@
]
}
},
+ "/experimental/workspace/{id}/session-restore": {
+ "post": {
+ "operationId": "experimental.workspace.sessionRestore",
+ "parameters": [
+ {
+ "in": "query",
+ "name": "directory",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "in": "query",
+ "name": "workspace",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "in": "path",
+ "name": "id",
+ "schema": {
+ "type": "string",
+ "pattern": "^wrk.*"
+ },
+ "required": true
+ }
+ ],
+ "summary": "Restore session into workspace",
+ "description": "Replay a session's sync events into the target workspace in batches.",
+ "responses": {
+ "200": {
+ "description": "Session replay started",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {
+ "total": {
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 9007199254740991
+ }
+ },
+ "required": ["total"]
+ }
+ }
+ }
+ },
+ "400": {
+ "description": "Bad request",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/BadRequestError"
+ }
+ }
+ }
+ }
+ },
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {
+ "sessionID": {
+ "type": "string",
+ "pattern": "^ses.*"
+ }
+ },
+ "required": ["sessionID"]
+ }
+ }
+ }
+ },
+ "x-codeSamples": [
+ {
+ "lang": "js",
+ "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.experimental.workspace.sessionRestore({\n ...\n})"
+ }
+ ]
+ }
+ },
"/experimental/worktree": {
"post": {
"operationId": "worktree.create",
@@ -5143,6 +5227,202 @@
]
}
},
+ "/sync/replay": {
+ "post": {
+ "operationId": "sync.replay",
+ "parameters": [
+ {
+ "in": "query",
+ "name": "directory",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "in": "query",
+ "name": "workspace",
+ "schema": {
+ "type": "string"
+ }
+ }
+ ],
+ "summary": "Replay sync events",
+ "description": "Validate and replay a complete sync event history.",
+ "responses": {
+ "200": {
+ "description": "Replayed sync events",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {
+ "sessionID": {
+ "type": "string"
+ }
+ },
+ "required": ["sessionID"]
+ }
+ }
+ }
+ },
+ "400": {
+ "description": "Bad request",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/BadRequestError"
+ }
+ }
+ }
+ }
+ },
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "properties": {
+ "directory": {
+ "type": "string"
+ },
+ "events": {
+ "minItems": 1,
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "aggregateID": {
+ "type": "string"
+ },
+ "seq": {
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 9007199254740991
+ },
+ "type": {
+ "type": "string"
+ },
+ "data": {
+ "type": "object",
+ "propertyNames": {
+ "type": "string"
+ },
+ "additionalProperties": {}
+ }
+ },
+ "required": ["id", "aggregateID", "seq", "type", "data"]
+ }
+ }
+ },
+ "required": ["directory", "events"]
+ }
+ }
+ }
+ },
+ "x-codeSamples": [
+ {
+ "lang": "js",
+ "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.sync.replay({\n ...\n})"
+ }
+ ]
+ }
+ },
+ "/sync/history": {
+ "get": {
+ "operationId": "sync.history.list",
+ "parameters": [
+ {
+ "in": "query",
+ "name": "directory",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "in": "query",
+ "name": "workspace",
+ "schema": {
+ "type": "string"
+ }
+ }
+ ],
+ "summary": "List sync events",
+ "description": "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
+ "responses": {
+ "200": {
+ "description": "Sync events",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "aggregate_id": {
+ "type": "string"
+ },
+ "seq": {
+ "type": "number"
+ },
+ "type": {
+ "type": "string"
+ },
+ "data": {
+ "type": "object",
+ "propertyNames": {
+ "type": "string"
+ },
+ "additionalProperties": {}
+ }
+ },
+ "required": ["id", "aggregate_id", "seq", "type", "data"]
+ }
+ }
+ }
+ }
+ },
+ "400": {
+ "description": "Bad request",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/BadRequestError"
+ }
+ }
+ }
+ }
+ },
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "object",
+ "propertyNames": {
+ "type": "string"
+ },
+ "additionalProperties": {
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 9007199254740991
+ }
+ }
+ }
+ }
+ },
+ "x-codeSamples": [
+ {
+ "lang": "js",
+ "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.sync.history.list({\n ...\n})"
+ }
+ ]
+ }
+ },
"/find": {
"get": {
"operationId": "find.text",
@@ -8514,6 +8794,40 @@
},
"required": ["type", "properties"]
},
+ "Event.workspace.restore": {
+ "type": "object",
+ "properties": {
+ "type": {
+ "type": "string",
+ "const": "workspace.restore"
+ },
+ "properties": {
+ "type": "object",
+ "properties": {
+ "workspaceID": {
+ "type": "string",
+ "pattern": "^wrk.*"
+ },
+ "sessionID": {
+ "type": "string",
+ "pattern": "^ses.*"
+ },
+ "total": {
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 9007199254740991
+ },
+ "step": {
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 9007199254740991
+ }
+ },
+ "required": ["workspaceID", "sessionID", "total", "step"]
+ }
+ },
+ "required": ["type", "properties"]
+ },
"Event.workspace.status": {
"type": "object",
"properties": {
@@ -10524,6 +10838,9 @@
"$ref": "#/components/schemas/Event.workspace.failed"
},
{
+ "$ref": "#/components/schemas/Event.workspace.restore"
+ },
+ {
"$ref": "#/components/schemas/Event.workspace.status"
},
{
@@ -12781,6 +13098,9 @@
"$ref": "#/components/schemas/Event.workspace.failed"
},
{
+ "$ref": "#/components/schemas/Event.workspace.restore"
+ },
+ {
"$ref": "#/components/schemas/Event.workspace.status"
},
{