diff options
| -rw-r--r-- | packages/transport-http/src/app.ts | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index a9dcbe5..7db5cba 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -259,9 +259,12 @@ export function createApp(opts: CreateServerOptions): Hono { }); const events: AgentEvent[] = []; - let resolveStream: () => void; - const streamReady = new Promise<void>((resolve) => { - resolveStream = resolve; + let controllerRef: ReadableStreamDefaultController<Uint8Array> | undefined; + + const stream = new ReadableStream<Uint8Array>({ + start(controller) { + controllerRef = controller; + }, }); const orchestratorInput: Parameters<SessionOrchestrator["handleMessage"]>[0] = { @@ -269,41 +272,38 @@ export function createApp(opts: CreateServerOptions): Hono { text: message, onEvent: (event) => { events.push(event); + controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(event))); }, ...(model !== undefined ? { modelName: model } : {}), ...(cwd !== undefined ? { cwd } : {}), ...(reasoningEffort !== undefined ? { reasoningEffort } : {}), }; - const orchestratorPromise = opts.orchestrator + opts.orchestrator .handleMessage(orchestratorInput) - .then(() => { - resolveStream(); + .then(async () => { + controllerRef?.close(); + await recordThroughput(events, model); }) .catch((err) => { log.error("chat: turn failed", { err }); - events.push({ + const errorEvent: AgentEvent = { type: "error", conversationId, turnId: "", message: err instanceof Error ? err.message : String(err), - }); - resolveStream(); + }; + controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(errorEvent))); + controllerRef?.close(); }); - await streamReady; - await orchestratorPromise.catch(() => {}); - - // Record a per-model throughput sample for this turn. Generation time is - // the PURE decode time — the sum of per-step genTotalMs (excludes tool - // waits) — and tokens are the turn's aggregate output tokens. - await recordThroughput(events, model); - - const ndjson = events.map(serializeEventLine).join(""); - - return c.text(ndjson, 200, { - "Content-Type": "application/x-ndjson", - "X-Conversation-Id": conversationId, + return new Response(stream, { + status: 200, + headers: { + "Content-Type": "application/x-ndjson", + "X-Conversation-Id": conversationId, + "Transfer-Encoding": "chunked", + }, }); }); |
