diff options
| author | Dax <[email protected]> | 2026-03-31 15:27:51 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-03-31 15:27:51 -0400 |
| commit | 567a91191aabe14c82eebd541ad8fffe20f8bc8a (patch) | |
| tree | f280730732c45f2dfde3770246a2d2af7ea10db7 | |
| parent | 434d82bbe2b855650b7e82fcc3539b6e64e44ddf (diff) | |
| download | opencode-567a91191aabe14c82eebd541ad8fffe20f8bc8a.tar.gz opencode-567a91191aabe14c82eebd541ad8fffe20f8bc8a.zip | |
refactor(session): simplify LLM stream by replacing queue with fromAsyncIterable (#20324)
| -rw-r--r-- | packages/opencode/src/session/llm.ts | 20 |
1 files changed, 5 insertions, 15 deletions
diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index f5717da55..dc89db409 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -53,32 +53,22 @@ export namespace LLM { Effect.gen(function* () { return Service.of({ stream(input) { - const stream: Stream.Stream<Event, unknown> = Stream.scoped( + return Stream.scoped( Stream.unwrap( Effect.gen(function* () { const ctrl = yield* Effect.acquireRelease( Effect.sync(() => new AbortController()), (ctrl) => Effect.sync(() => ctrl.abort()), ) - const queue = yield* Queue.unbounded<Event, unknown | Cause.Done>() - yield* Effect.promise(async () => { - const result = await LLM.stream({ ...input, abort: ctrl.signal }) - for await (const event of result.fullStream) { - if (!Queue.offerUnsafe(queue, event)) break - } - Queue.endUnsafe(queue) - }).pipe( - Effect.catchCause((cause) => Effect.sync(() => void Queue.failCauseUnsafe(queue, cause))), - Effect.onInterrupt(() => Effect.sync(() => ctrl.abort())), - Effect.forkScoped, - ) + const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal })) - return Stream.fromQueue(queue) + return Stream.fromAsyncIterable(result.fullStream, (e) => + e instanceof Error ? e : new Error(String(e)), + ) }), ), ) - return stream }, }) }), |
