diff options
| author | Adam Malczewski <[email protected]> | 2026-06-21 23:12:32 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-21 23:12:32 +0900 |
| commit | 62ea07f56ff066bbf05041aadf8006cbc65e5c53 (patch) | |
| tree | 4fd89929053fb853e4f6d904b7f89a399ffc90f5 /packages | |
| parent | e2f7ce2782b46619442d422743ecb8959fd4fc59 (diff) | |
| download | dispatch-62ea07f56ff066bbf05041aadf8006cbc65e5c53.tar.gz dispatch-62ea07f56ff066bbf05041aadf8006cbc65e5c53.zip | |
fix(transport-http): stream /chat response instead of buffering
The /chat endpoint was buffering the entire turn before returning
the response, which meant X-Conversation-Id was not available until
the turn finished. This prevented the CLI --open flag from firing
until after the turn completed.
Now the response is a ReadableStream that:
- Returns X-Conversation-Id header immediately
- Streams NDJSON events as they arrive from the orchestrator
- Closes the stream when the turn completes (or errors)
- Records throughput after stream close (non-blocking)
This fixes: dispatch <model> --text '...' --open now opens the
frontend tab immediately, not after the turn finishes.
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/transport-http/src/app.ts | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index a9dcbe5..7db5cba 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -259,9 +259,12 @@ export function createApp(opts: CreateServerOptions): Hono { }); const events: AgentEvent[] = []; - let resolveStream: () => void; - const streamReady = new Promise<void>((resolve) => { - resolveStream = resolve; + let controllerRef: ReadableStreamDefaultController<Uint8Array> | undefined; + + const stream = new ReadableStream<Uint8Array>({ + start(controller) { + controllerRef = controller; + }, }); const orchestratorInput: Parameters<SessionOrchestrator["handleMessage"]>[0] = { @@ -269,41 +272,38 @@ export function createApp(opts: CreateServerOptions): Hono { text: message, onEvent: (event) => { events.push(event); + controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(event))); }, ...(model !== undefined ? { modelName: model } : {}), ...(cwd !== undefined ? { cwd } : {}), ...(reasoningEffort !== undefined ? { reasoningEffort } : {}), }; - const orchestratorPromise = opts.orchestrator + opts.orchestrator .handleMessage(orchestratorInput) - .then(() => { - resolveStream(); + .then(async () => { + controllerRef?.close(); + await recordThroughput(events, model); }) .catch((err) => { log.error("chat: turn failed", { err }); - events.push({ + const errorEvent: AgentEvent = { type: "error", conversationId, turnId: "", message: err instanceof Error ? err.message : String(err), - }); - resolveStream(); + }; + controllerRef?.enqueue(new TextEncoder().encode(serializeEventLine(errorEvent))); + controllerRef?.close(); }); - await streamReady; - await orchestratorPromise.catch(() => {}); - - // Record a per-model throughput sample for this turn. Generation time is - // the PURE decode time — the sum of per-step genTotalMs (excludes tool - // waits) — and tokens are the turn's aggregate output tokens. - await recordThroughput(events, model); - - const ndjson = events.map(serializeEventLine).join(""); - - return c.text(ndjson, 200, { - "Content-Type": "application/x-ndjson", - "X-Conversation-Id": conversationId, + return new Response(stream, { + status: 200, + headers: { + "Content-Type": "application/x-ndjson", + "X-Conversation-Id": conversationId, + "Transfer-Encoding": "chunked", + }, }); }); |
