summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
Diffstat (limited to 'packages')
-rw-r--r--packages/transport-http/src/app.ts44
1 files changed, 22 insertions, 22 deletions
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index a9dcbe5..7db5cba 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -259,9 +259,12 @@ export function createApp(opts: CreateServerOptions): Hono {
});
const events: AgentEvent[] = [];
- let resolveStream: () => void;
- const streamReady = new Promise<void>((resolve) => {
- resolveStream = resolve;
+ let controllerRef: ReadableStreamDefaultController<Uint8Array> | undefined;
+
+ const stream = new ReadableStream<Uint8Array>({
+ start(controller) {
+ controllerRef = controller;
+ },
});
const orchestratorInput: Parameters<SessionOrchestrator["handleMessage"]>[0] = {
@@ -269,41 +272,38 @@ export function createApp(opts: CreateServerOptions): Hono {
text: message,
onEvent: (event) => {
events.push(event);
+ controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(event)));
},
...(model !== undefined ? { modelName: model } : {}),
...(cwd !== undefined ? { cwd } : {}),
...(reasoningEffort !== undefined ? { reasoningEffort } : {}),
};
- const orchestratorPromise = opts.orchestrator
+ opts.orchestrator
.handleMessage(orchestratorInput)
- .then(() => {
- resolveStream();
+ .then(async () => {
+ controllerRef?.close();
+ await recordThroughput(events, model);
})
.catch((err) => {
log.error("chat: turn failed", { err });
- events.push({
+ const errorEvent: AgentEvent = {
type: "error",
conversationId,
turnId: "",
message: err instanceof Error ? err.message : String(err),
- });
- resolveStream();
+ };
+ controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(errorEvent)));
+ controllerRef?.close();
});
- await streamReady;
- await orchestratorPromise.catch(() => {});
-
- // Record a per-model throughput sample for this turn. Generation time is
- // the PURE decode time — the sum of per-step genTotalMs (excludes tool
- // waits) — and tokens are the turn's aggregate output tokens.
- await recordThroughput(events, model);
-
- const ndjson = events.map(serializeEventLine).join("");
-
- return c.text(ndjson, 200, {
- "Content-Type": "application/x-ndjson",
- "X-Conversation-Id": conversationId,
+ return new Response(stream, {
+ status: 200,
+ headers: {
+ "Content-Type": "application/x-ndjson",
+ "X-Conversation-Id": conversationId,
+ "Transfer-Encoding": "chunked",
+ },
});
});