summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJames Long <[email protected]>2026-04-17 13:30:09 -0400
committerGitHub <[email protected]>2026-04-17 13:30:09 -0400
commita8c78fc005a9a1cff3622a68f65b1df550cb2ccc (patch)
tree3abdb0692252d762fb5081c767a80aad4440f83d
parentfcb473ff64f0767461c27db8942ce41df3e115d3 (diff)
downloadopencode-a8c78fc005a9a1cff3622a68f65b1df550cb2ccc.tar.gz
opencode-a8c78fc005a9a1cff3622a68f65b1df550cb2ccc.zip
fix(core): add historical sync on workspace connect (#23121)
-rw-r--r--packages/opencode/src/cli/cmd/tui/context/sdk.tsx28
-rw-r--r--packages/opencode/src/control-plane/workspace.ts97
-rw-r--r--packages/opencode/src/server/proxy.ts7
-rw-r--r--packages/opencode/src/server/routes/instance/sync.ts26
-rw-r--r--packages/opencode/test/workspace/workspace-restore.test.ts5
-rw-r--r--packages/sdk/js/src/v2/gen/sdk.gen.ts33
-rw-r--r--packages/sdk/js/src/v2/gen/types.gen.ts19
-rw-r--r--packages/sdk/openapi.json43
8 files changed, 234 insertions, 24 deletions
diff --git a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx
index 14d306288..6a240ceef 100644
--- a/packages/opencode/src/cli/cmd/tui/context/sdk.tsx
+++ b/packages/opencode/src/cli/cmd/tui/context/sdk.tsx
@@ -2,6 +2,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2"
import type { GlobalEvent } from "@opencode-ai/sdk/v2"
import { createSimpleContext } from "./helper"
import { createGlobalEmitter } from "@solid-primitives/event-bus"
+import { Flag } from "@/flag/flag"
import { batch, onCleanup, onMount } from "solid-js"
export type EventSource = {
@@ -39,6 +40,8 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
let queue: GlobalEvent[] = []
let timer: Timer | undefined
let last = 0
+ const retryDelay = 1000
+ const maxRetryDelay = 30000
const flush = () => {
if (queue.length === 0) return
@@ -73,9 +76,20 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
const ctrl = new AbortController()
sse = ctrl
;(async () => {
+ let attempt = 0
while (true) {
if (abort.signal.aborted || ctrl.signal.aborted) break
- const events = await sdk.global.event({ signal: ctrl.signal })
+
+ const events = await sdk.global.event({
+ signal: ctrl.signal,
+ sseMaxRetryAttempts: 0,
+ })
+
+ if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+ // Start syncing workspaces, it's important to do this after
+ // we've started listening to events
+ await sdk.sync.start().catch(() => {})
+ }
for await (const event of events.stream) {
if (ctrl.signal.aborted) break
@@ -84,6 +98,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
if (timer) clearTimeout(timer)
if (queue.length > 0) flush()
+ attempt += 1
+ if (abort.signal.aborted || ctrl.signal.aborted) break
+
+ // Exponential backoff
+ const backoff = Math.min(retryDelay * 2 ** (attempt - 1), maxRetryDelay)
+ await new Promise((resolve) => setTimeout(resolve, backoff))
}
})().catch(() => {})
}
@@ -92,6 +112,12 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
if (props.events) {
const unsub = await props.events.subscribe(handleEvent)
onCleanup(unsub)
+
+ if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+ // Start syncing workspaces, it's important to do this after
+ // we've started listening to events
+ await sdk.sync.start().catch(() => {})
+ }
} else {
startSSE()
}
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index fd22d3af0..d678ad752 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -7,7 +7,7 @@ import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { Auth } from "@/auth"
import { SyncEvent } from "@/sync"
-import { EventTable } from "@/sync/event.sql"
+import { EventSequenceTable, EventTable } from "@/sync/event.sql"
import { Flag } from "@/flag/flag"
import { Log } from "@/util"
import { Filesystem } from "@/util"
@@ -23,8 +23,8 @@ import { SessionTable } from "@/session/session.sql"
import { SessionID } from "@/session/schema"
import { errorData } from "@/util/error"
import { AppRuntime } from "@/effect/app-runtime"
-import { EventSequenceTable } from "@/sync/event.sql"
import { waitEvent } from "./util"
+import { WorkspaceContext } from "./workspace-context"
export const Info = WorkspaceInfo.meta({
ref: "Workspace",
@@ -297,22 +297,13 @@ export function list(project: Project.Info) {
db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(),
)
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
-
- for (const space of spaces) startSync(space)
return spaces
}
-function lookup(id: WorkspaceID) {
+export const get = fn(WorkspaceID.zod, async (id) => {
const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get())
if (!row) return
return fromRow(row)
-}
-
-export const get = fn(WorkspaceID.zod, async (id) => {
- const space = lookup(id)
- if (!space) return
- startSync(space)
- return space
})
export const remove = fn(WorkspaceID.zod, async (id) => {
@@ -437,6 +428,70 @@ async function connectSSE(url: URL | string, headers: HeadersInit | undefined, s
return res.body
}
+async function syncHistory(space: Info, url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) {
+ const sessionIDs = Database.use((db) =>
+ db
+ .select({ id: SessionTable.id })
+ .from(SessionTable)
+ .where(eq(SessionTable.workspace_id, space.id))
+ .all()
+ .map((row) => row.id),
+ )
+ const state = sessionIDs.length
+ ? Object.fromEntries(
+ Database.use((db) =>
+ db.select().from(EventSequenceTable).where(inArray(EventSequenceTable.aggregate_id, sessionIDs)).all(),
+ ).map((row) => [row.aggregate_id, row.seq]),
+ )
+ : {}
+
+ log.info("syncing workspace history", {
+ workspaceID: space.id,
+ sessions: sessionIDs.length,
+ known: Object.keys(state).length,
+ })
+
+ const requestHeaders = new Headers(headers)
+ requestHeaders.set("content-type", "application/json")
+
+ const res = await fetch(route(url, "/sync/history"), {
+ method: "POST",
+ headers: requestHeaders,
+ body: JSON.stringify(state),
+ signal,
+ })
+
+ if (!res.ok) {
+ const body = await res.text()
+ throw new Error(`Workspace history HTTP failure: ${res.status} ${body}`)
+ }
+
+ const events = await res.json()
+
+ return WorkspaceContext.provide({
+ workspaceID: space.id,
+ fn: () => {
+ for (const event of events) {
+ SyncEvent.replay(
+ {
+ id: event.id,
+ aggregateID: event.aggregate_id,
+ seq: event.seq,
+ type: event.type,
+ data: event.data,
+ },
+ { publish: true },
+ )
+ }
+ },
+ })
+
+ log.info("workspace history synced", {
+ workspaceID: space.id,
+ events: events.length,
+ })
+}
+
async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
const adaptor = await getAdaptor(space.projectID, space.type)
const target = await adaptor.target(space)
@@ -452,7 +507,9 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
let stream
try {
stream = await connectSSE(target.url, target.headers, signal)
+ await syncHistory(space, target.url, target.headers, signal)
} catch (err) {
+ stream = null
setStatus(space.id, "error")
log.info("failed to connect to global sync", {
workspace: space.name,
@@ -469,6 +526,7 @@ async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
await parseSSE(stream, signal, (evt: any) => {
try {
if (!("payload" in evt)) return
+ if (evt.payload.type === "server.heartbeat") return
if (evt.payload.type === "sync") {
SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
@@ -536,4 +594,19 @@ function stopSync(id: WorkspaceID) {
connections.delete(id)
}
+export function startWorkspaceSyncing(projectID: ProjectID) {
+ const spaces = Database.use((db) =>
+ db
+ .select({ workspace: WorkspaceTable })
+ .from(WorkspaceTable)
+ .innerJoin(SessionTable, eq(SessionTable.workspace_id, WorkspaceTable.id))
+ .where(eq(WorkspaceTable.project_id, projectID))
+ .all(),
+ )
+
+ for (const row of new Map(spaces.map((row) => [row.workspace.id, row.workspace])).values()) {
+ void startSync(fromRow(row))
+ }
+}
+
export * as Workspace from "./workspace"
diff --git a/packages/opencode/src/server/proxy.ts b/packages/opencode/src/server/proxy.ts
index 9c1fd1f28..19a623cb0 100644
--- a/packages/opencode/src/server/proxy.ts
+++ b/packages/opencode/src/server/proxy.ts
@@ -130,13 +130,6 @@ export async function http(url: string | URL, extra: HeadersInit | undefined, re
const done = sync ? Fence.wait(workspaceID, sync, req.signal) : Promise.resolve()
return done.then(async () => {
- console.log("proxy http response", {
- method: req.method,
- request: req.url,
- url: String(url),
- status: res.status,
- statusText: res.statusText,
- })
return new Response(res.body, {
status: res.status,
statusText: res.statusText,
diff --git a/packages/opencode/src/server/routes/instance/sync.ts b/packages/opencode/src/server/routes/instance/sync.ts
index c6a067997..b124cd875 100644
--- a/packages/opencode/src/server/routes/instance/sync.ts
+++ b/packages/opencode/src/server/routes/instance/sync.ts
@@ -6,6 +6,8 @@ import { Database, asc, and, not, or, lte, eq } from "@/storage"
import { EventTable } from "@/sync/event.sql"
import { lazy } from "@/util/lazy"
import { Log } from "@/util"
+import { startWorkspaceSyncing } from "@/control-plane/workspace"
+import { Instance } from "@/project/instance"
import { errors } from "../../error"
const ReplayEvent = z.object({
@@ -21,6 +23,28 @@ const log = Log.create({ service: "server.sync" })
export const SyncRoutes = lazy(() =>
new Hono()
.post(
+ "/start",
+ describeRoute({
+ summary: "Start workspace sync",
+ description: "Start sync loops for workspaces in the current project that have active sessions.",
+ operationId: "sync.start",
+ responses: {
+ 200: {
+ description: "Workspace sync started",
+ content: {
+ "application/json": {
+ schema: resolver(z.boolean()),
+ },
+ },
+ },
+ },
+ }),
+ async (c) => {
+ startWorkspaceSyncing(Instance.project.id)
+ return c.json(true)
+ },
+ )
+ .post(
"/replay",
describeRoute({
summary: "Replay sync events",
@@ -75,7 +99,7 @@ export const SyncRoutes = lazy(() =>
})
},
)
- .get(
+ .post(
"/history",
describeRoute({
summary: "List sync events",
diff --git a/packages/opencode/test/workspace/workspace-restore.test.ts b/packages/opencode/test/workspace/workspace-restore.test.ts
index 429eeaf9d..ad6ac2c5f 100644
--- a/packages/opencode/test/workspace/workspace-restore.test.ts
+++ b/packages/opencode/test/workspace/workspace-restore.test.ts
@@ -141,9 +141,12 @@ describe("Workspace.sessionRestore", () => {
Object.assign(
async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => {
const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url)
- if (url.pathname !== "/base/sync/replay") {
+ if (url.pathname === "/base/global/event") {
return eventStreamResponse()
}
+ if (url.pathname === "/base/sync/history") {
+ return Response.json([])
+ }
const body = JSON.parse(String(init?.body))
posts.push({
path: url.pathname,
diff --git a/packages/sdk/js/src/v2/gen/sdk.gen.ts b/packages/sdk/js/src/v2/gen/sdk.gen.ts
index f484147a4..6248eb8e4 100644
--- a/packages/sdk/js/src/v2/gen/sdk.gen.ts
+++ b/packages/sdk/js/src/v2/gen/sdk.gen.ts
@@ -163,6 +163,7 @@ import type {
SyncHistoryListResponses,
SyncReplayErrors,
SyncReplayResponses,
+ SyncStartResponses,
TextPartInput,
ToolIdsErrors,
ToolIdsResponses,
@@ -3038,7 +3039,7 @@ export class History extends HeyApiClient {
},
],
)
- return (options?.client ?? this.client).get<SyncHistoryListResponses, SyncHistoryListErrors, ThrowOnError>({
+ return (options?.client ?? this.client).post<SyncHistoryListResponses, SyncHistoryListErrors, ThrowOnError>({
url: "/sync/history",
...options,
...params,
@@ -3053,6 +3054,36 @@ export class History extends HeyApiClient {
export class Sync extends HeyApiClient {
/**
+ * Start workspace sync
+ *
+ * Start sync loops for workspaces in the current project that have active sessions.
+ */
+ public start<ThrowOnError extends boolean = false>(
+ parameters?: {
+ directory?: string
+ workspace?: string
+ },
+ options?: Options<never, ThrowOnError>,
+ ) {
+ const params = buildClientParams(
+ [parameters],
+ [
+ {
+ args: [
+ { in: "query", key: "directory" },
+ { in: "query", key: "workspace" },
+ ],
+ },
+ ],
+ )
+ return (options?.client ?? this.client).post<SyncStartResponses, unknown, ThrowOnError>({
+ url: "/sync/start",
+ ...options,
+ ...params,
+ })
+ }
+
+ /**
* Replay sync events
*
* Validate and replay a complete sync event history.
diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts
index 460c2bcdf..5698cba54 100644
--- a/packages/sdk/js/src/v2/gen/types.gen.ts
+++ b/packages/sdk/js/src/v2/gen/types.gen.ts
@@ -4502,6 +4502,25 @@ export type ProviderOauthCallbackResponses = {
export type ProviderOauthCallbackResponse = ProviderOauthCallbackResponses[keyof ProviderOauthCallbackResponses]
+export type SyncStartData = {
+ body?: never
+ path?: never
+ query?: {
+ directory?: string
+ workspace?: string
+ }
+ url: "/sync/start"
+}
+
+export type SyncStartResponses = {
+ /**
+ * Workspace sync started
+ */
+ 200: boolean
+}
+
+export type SyncStartResponse = SyncStartResponses[keyof SyncStartResponses]
+
export type SyncReplayData = {
body?: {
directory: string
diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json
index 7bdf025bb..3b811f2fa 100644
--- a/packages/sdk/openapi.json
+++ b/packages/sdk/openapi.json
@@ -5224,6 +5224,47 @@
]
}
},
+ "/sync/start": {
+ "post": {
+ "operationId": "sync.start",
+ "parameters": [
+ {
+ "in": "query",
+ "name": "directory",
+ "schema": {
+ "type": "string"
+ }
+ },
+ {
+ "in": "query",
+ "name": "workspace",
+ "schema": {
+ "type": "string"
+ }
+ }
+ ],
+ "summary": "Start workspace sync",
+ "description": "Start sync loops for workspaces in the current project that have active sessions.",
+ "responses": {
+ "200": {
+ "description": "Workspace sync started",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "boolean"
+ }
+ }
+ }
+ }
+ },
+ "x-codeSamples": [
+ {
+ "lang": "js",
+ "source": "import { createOpencodeClient } from \"@opencode-ai/sdk\n\nconst client = createOpencodeClient()\nawait client.sync.start({\n ...\n})"
+ }
+ ]
+ }
+ },
"/sync/replay": {
"post": {
"operationId": "sync.replay",
@@ -5328,7 +5369,7 @@
}
},
"/sync/history": {
- "get": {
+ "post": {
"operationId": "sync.history.list",
"parameters": [
{