summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorDax <[email protected]>2026-03-31 15:27:51 -0400
committerGitHub <[email protected]>2026-03-31 15:27:51 -0400
commit567a91191aabe14c82eebd541ad8fffe20f8bc8a (patch)
treef280730732c45f2dfde3770246a2d2af7ea10db7 /packages
parent434d82bbe2b855650b7e82fcc3539b6e64e44ddf (diff)
downloadopencode-567a91191aabe14c82eebd541ad8fffe20f8bc8a.tar.gz
opencode-567a91191aabe14c82eebd541ad8fffe20f8bc8a.zip
refactor(session): simplify LLM stream by replacing queue with fromAsyncIterable (#20324)
Diffstat (limited to 'packages')
-rw-r--r--packages/opencode/src/session/llm.ts20
1 files changed, 5 insertions, 15 deletions
diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts
index f5717da55..dc89db409 100644
--- a/packages/opencode/src/session/llm.ts
+++ b/packages/opencode/src/session/llm.ts
@@ -53,32 +53,22 @@ export namespace LLM {
Effect.gen(function* () {
return Service.of({
stream(input) {
- const stream: Stream.Stream<Event, unknown> = Stream.scoped(
+ return Stream.scoped(
Stream.unwrap(
Effect.gen(function* () {
const ctrl = yield* Effect.acquireRelease(
Effect.sync(() => new AbortController()),
(ctrl) => Effect.sync(() => ctrl.abort()),
)
- const queue = yield* Queue.unbounded<Event, unknown | Cause.Done>()
- yield* Effect.promise(async () => {
- const result = await LLM.stream({ ...input, abort: ctrl.signal })
- for await (const event of result.fullStream) {
- if (!Queue.offerUnsafe(queue, event)) break
- }
- Queue.endUnsafe(queue)
- }).pipe(
- Effect.catchCause((cause) => Effect.sync(() => void Queue.failCauseUnsafe(queue, cause))),
- Effect.onInterrupt(() => Effect.sync(() => ctrl.abort())),
- Effect.forkScoped,
- )
+ const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal }))
- return Stream.fromQueue(queue)
+ return Stream.fromAsyncIterable(result.fullStream, (e) =>
+ e instanceof Error ? e : new Error(String(e)),
+ )
}),
),
)
- return stream
},
})
}),