diff options
Diffstat (limited to 'packages/opencode/src/control-plane/workspace.ts')
| -rw-r--r-- | packages/opencode/src/control-plane/workspace.ts | 91 |
1 files changed, 54 insertions, 37 deletions
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index fe8046ba9..7f9d078bb 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -169,6 +169,7 @@ export const layer = Layer.effect( const auth = yield* Auth.Service const session = yield* Session.Service const http = yield* HttpClient.HttpClient + const sync = yield* SyncEvent.Service const connections = new Map<WorkspaceID, ConnectionStatus>() const syncFibers = yield* FiberMap.make<WorkspaceID, void, SyncLoopError>() @@ -307,25 +308,30 @@ export const layer = Layer.effect( events: events.length, }) - yield* Effect.sync(() => - WorkspaceContext.provide({ + yield* Effect.promise(async () => { + await WorkspaceContext.provide({ workspaceID: space.id, - fn: () => { - for (const event of events) { - SyncEvent.replay( - { - id: event.id, - aggregateID: event.aggregate_id, - seq: event.seq, - type: event.type, - data: event.data, - }, - { publish: true }, - ) - } + async fn() { + await Effect.runPromise( + Effect.forEach( + events, + (event) => + sync.replay( + { + id: event.id, + aggregateID: event.aggregate_id, + seq: event.seq, + type: event.type, + data: event.data, + }, + { publish: true }, + ), + { discard: true }, + ), + ) }, - }), - ) + }) + }) }) const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) { @@ -361,16 +367,28 @@ export const layer = Layer.effect( setStatus(space.id, "connected") yield* parseSSE(stream, (evt) => - Effect.sync(() => { - try { - if (!evt || typeof evt !== "object" || !("payload" in evt)) return - const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent } - if (payload.type === "server.heartbeat") return - - if (payload.type === "sync" && payload.syncEvent) { - SyncEvent.replay(payload.syncEvent) - } + Effect.gen(function* () { + if (!evt || typeof evt !== "object" || !("payload" in evt)) return + const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent } + if (payload.type === "server.heartbeat") return + + if (payload.type === "sync" && payload.syncEvent) { + const failed = yield* sync.replay(payload.syncEvent).pipe( + Effect.as(false), + Effect.catchCause((error) => + Effect.sync(() => { + log.info("failed to replay global event", { + workspaceID: space.id, + error, + }) + return true + }), + ), + ) + if (failed) return + } + try { const event = evt as { directory?: string; project?: string; payload: unknown } GlobalBus.emit("event", { directory: event.directory, @@ -378,10 +396,10 @@ export const layer = Layer.effect( workspace: space.id, payload: event.payload, }) - } catch (err) { + } catch (error) { log.info("failed to replay global event", { workspaceID: space.id, - error: err, + error, }) } }), @@ -516,14 +534,12 @@ export const layer = Layer.effect( const adaptor = getAdaptor(space.projectID, space.type) const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))) - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { - sessionID: input.sessionID, - info: { - workspaceID: input.workspaceID, - }, - }), - ) + yield* sync.run(Session.Event.Updated, { + sessionID: input.sessionID, + info: { + workspaceID: input.workspaceID, + }, + }) const rows = yield* db((db) => db @@ -593,7 +609,7 @@ export const layer = Layer.effect( }) if (target.type === "local") { - SyncEvent.replayAll(events) + yield* sync.replayAll(events) log.info("session restore batch replayed locally", { workspaceID: input.workspaceID, sessionID: input.sessionID, @@ -812,6 +828,7 @@ export const layer = Layer.effect( export const defaultLayer = layer.pipe( Layer.provide(Auth.defaultLayer), Layer.provide(Session.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), Layer.provide(FetchHttpClient.layer), ) |
