summaryrefslogtreecommitdiffhomepage
path: root/packages/opencode/src/control-plane/workspace.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/opencode/src/control-plane/workspace.ts')
-rw-r--r--packages/opencode/src/control-plane/workspace.ts91
1 files changed, 54 insertions, 37 deletions
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index fe8046ba9..7f9d078bb 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -169,6 +169,7 @@ export const layer = Layer.effect(
const auth = yield* Auth.Service
const session = yield* Session.Service
const http = yield* HttpClient.HttpClient
+ const sync = yield* SyncEvent.Service
const connections = new Map<WorkspaceID, ConnectionStatus>()
const syncFibers = yield* FiberMap.make<WorkspaceID, void, SyncLoopError>()
@@ -307,25 +308,30 @@ export const layer = Layer.effect(
events: events.length,
})
- yield* Effect.sync(() =>
- WorkspaceContext.provide({
+ yield* Effect.promise(async () => {
+ await WorkspaceContext.provide({
workspaceID: space.id,
- fn: () => {
- for (const event of events) {
- SyncEvent.replay(
- {
- id: event.id,
- aggregateID: event.aggregate_id,
- seq: event.seq,
- type: event.type,
- data: event.data,
- },
- { publish: true },
- )
- }
+ async fn() {
+ await Effect.runPromise(
+ Effect.forEach(
+ events,
+ (event) =>
+ sync.replay(
+ {
+ id: event.id,
+ aggregateID: event.aggregate_id,
+ seq: event.seq,
+ type: event.type,
+ data: event.data,
+ },
+ { publish: true },
+ ),
+ { discard: true },
+ ),
+ )
},
- }),
- )
+ })
+ })
})
const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) {
@@ -361,16 +367,28 @@ export const layer = Layer.effect(
setStatus(space.id, "connected")
yield* parseSSE(stream, (evt) =>
- Effect.sync(() => {
- try {
- if (!evt || typeof evt !== "object" || !("payload" in evt)) return
- const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
- if (payload.type === "server.heartbeat") return
-
- if (payload.type === "sync" && payload.syncEvent) {
- SyncEvent.replay(payload.syncEvent)
- }
+ Effect.gen(function* () {
+ if (!evt || typeof evt !== "object" || !("payload" in evt)) return
+ const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
+ if (payload.type === "server.heartbeat") return
+
+ if (payload.type === "sync" && payload.syncEvent) {
+ const failed = yield* sync.replay(payload.syncEvent).pipe(
+ Effect.as(false),
+ Effect.catchCause((error) =>
+ Effect.sync(() => {
+ log.info("failed to replay global event", {
+ workspaceID: space.id,
+ error,
+ })
+ return true
+ }),
+ ),
+ )
+ if (failed) return
+ }
+ try {
const event = evt as { directory?: string; project?: string; payload: unknown }
GlobalBus.emit("event", {
directory: event.directory,
@@ -378,10 +396,10 @@ export const layer = Layer.effect(
workspace: space.id,
payload: event.payload,
})
- } catch (err) {
+ } catch (error) {
log.info("failed to replay global event", {
workspaceID: space.id,
- error: err,
+ error,
})
}
}),
@@ -516,14 +534,12 @@ export const layer = Layer.effect(
const adaptor = getAdaptor(space.projectID, space.type)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
- yield* Effect.sync(() =>
- SyncEvent.run(Session.Event.Updated, {
- sessionID: input.sessionID,
- info: {
- workspaceID: input.workspaceID,
- },
- }),
- )
+ yield* sync.run(Session.Event.Updated, {
+ sessionID: input.sessionID,
+ info: {
+ workspaceID: input.workspaceID,
+ },
+ })
const rows = yield* db((db) =>
db
@@ -593,7 +609,7 @@ export const layer = Layer.effect(
})
if (target.type === "local") {
- SyncEvent.replayAll(events)
+ yield* sync.replayAll(events)
log.info("session restore batch replayed locally", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
@@ -812,6 +828,7 @@ export const layer = Layer.effect(
export const defaultLayer = layer.pipe(
Layer.provide(Auth.defaultLayer),
Layer.provide(Session.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
Layer.provide(FetchHttpClient.layer),
)