summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-21 23:12:32 +0900
committerAdam Malczewski <[email protected]>2026-06-21 23:12:32 +0900
commit62ea07f56ff066bbf05041aadf8006cbc65e5c53 (patch)
tree4fd89929053fb853e4f6d904b7f89a399ffc90f5
parente2f7ce2782b46619442d422743ecb8959fd4fc59 (diff)
downloaddispatch-62ea07f56ff066bbf05041aadf8006cbc65e5c53.tar.gz
dispatch-62ea07f56ff066bbf05041aadf8006cbc65e5c53.zip
fix(transport-http): stream /chat response instead of buffering
The /chat endpoint was buffering the entire turn before returning the response, which meant X-Conversation-Id was not available until the turn finished. This prevented the CLI --open flag from firing until after the turn completed. Now the response is a ReadableStream that: - Returns X-Conversation-Id header immediately - Streams NDJSON events as they arrive from the orchestrator - Closes the stream when the turn completes (or errors) - Records throughput after stream close (non-blocking) This fixes: dispatch <model> --text '...' --open now opens the frontend tab immediately, not after the turn finishes.
-rw-r--r--packages/transport-http/src/app.ts44
1 files changed, 22 insertions, 22 deletions
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index a9dcbe5..7db5cba 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -259,9 +259,12 @@ export function createApp(opts: CreateServerOptions): Hono {
});
const events: AgentEvent[] = [];
- let resolveStream: () => void;
- const streamReady = new Promise<void>((resolve) => {
- resolveStream = resolve;
+ let controllerRef: ReadableStreamDefaultController<Uint8Array> | undefined;
+
+ const stream = new ReadableStream<Uint8Array>({
+ start(controller) {
+ controllerRef = controller;
+ },
});
const orchestratorInput: Parameters<SessionOrchestrator["handleMessage"]>[0] = {
@@ -269,41 +272,38 @@ export function createApp(opts: CreateServerOptions): Hono {
text: message,
onEvent: (event) => {
events.push(event);
+ controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(event)));
},
...(model !== undefined ? { modelName: model } : {}),
...(cwd !== undefined ? { cwd } : {}),
...(reasoningEffort !== undefined ? { reasoningEffort } : {}),
};
- const orchestratorPromise = opts.orchestrator
+ opts.orchestrator
.handleMessage(orchestratorInput)
- .then(() => {
- resolveStream();
+ .then(async () => {
+ controllerRef?.close();
+ await recordThroughput(events, model);
})
.catch((err) => {
log.error("chat: turn failed", { err });
- events.push({
+ const errorEvent: AgentEvent = {
type: "error",
conversationId,
turnId: "",
message: err instanceof Error ? err.message : String(err),
- });
- resolveStream();
+ };
+ controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(errorEvent)));
+ controllerRef?.close();
});
- await streamReady;
- await orchestratorPromise.catch(() => {});
-
- // Record a per-model throughput sample for this turn. Generation time is
- // the PURE decode time — the sum of per-step genTotalMs (excludes tool
- // waits) — and tokens are the turn's aggregate output tokens.
- await recordThroughput(events, model);
-
- const ndjson = events.map(serializeEventLine).join("");
-
- return c.text(ndjson, 200, {
- "Content-Type": "application/x-ndjson",
- "X-Conversation-Id": conversationId,
+ return new Response(stream, {
+ status: 200,
+ headers: {
+ "Content-Type": "application/x-ndjson",
+ "X-Conversation-Id": conversationId,
+ "Transfer-Encoding": "chunked",
+ },
});
});