diff options
| author | Adam Malczewski <[email protected]> | 2026-06-26 22:03:19 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-26 22:23:39 +0900 |
| commit | 727c98c9dae516a2070eb950410314380a20c974 (patch) | |
| tree | 52aa1022c54f11770be7e4e2a324f0a8b8b8deec /packages/kernel/src/runtime/dispatch.ts | |
| parent | e59dc11f63b1df51142259bb2c406af8c9c8c2bb (diff) | |
| download | dispatch-727c98c9dae516a2070eb950410314380a20c974.tar.gz dispatch-727c98c9dae516a2070eb950410314380a20c974.zip | |
style: switch from tabs to 2-space indentation
Diffstat (limited to 'packages/kernel/src/runtime/dispatch.ts')
| -rw-r--r-- | packages/kernel/src/runtime/dispatch.ts | 308 |
1 files changed, 154 insertions, 154 deletions
diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts index 01f0043..d09db3b 100644 --- a/packages/kernel/src/runtime/dispatch.ts +++ b/packages/kernel/src/runtime/dispatch.ts @@ -5,182 +5,182 @@ import type { ToolCall, ToolContract, ToolExecuteContext, ToolResult } from "../ import { toolOutputEvent } from "./events.js"; export interface StepDispatcher { - submit(call: ToolCall): void; - drain(): Promise<Map<string, ToolResult>>; + submit(call: ToolCall): void; + drain(): Promise<Map<string, ToolResult>>; } export async function executeToolCall( - call: ToolCall, - tool: ToolContract | undefined, - signal: AbortSignal, - emit: EventEmitter, - conversationId: string, - turnId: string, - toolSpan?: Span, - cwd?: string, - computerId?: string, + call: ToolCall, + tool: ToolContract | undefined, + signal: AbortSignal, + emit: EventEmitter, + conversationId: string, + turnId: string, + toolSpan?: Span, + cwd?: string, + computerId?: string, ): Promise<ToolResult> { - if (tool === undefined) { - return { content: `Unknown tool: ${call.name}`, isError: true }; - } - if (signal.aborted) { - return { content: "Aborted", isError: true }; - } - const ctx: ToolExecuteContext = { - toolCallId: call.id, - signal, - onOutput: (data, stream) => { - emit(toolOutputEvent(conversationId, turnId, call.id, data, stream)); - }, - log: toolSpan?.log ?? createNoopLogger(), - conversationId, - ...(cwd !== undefined ? { cwd } : {}), - ...(computerId !== undefined ? { computerId } : {}), - }; - // Race the tool's execute promise against the abort signal so a tool - // that hangs (ignores ctx.signal, or blocks on something the signal - // can't interrupt) can't keep runTurn from returning. When the signal - // fires we RESOLVE (not reject) with an "Aborted" result so the step - // completes normally and the existing signal.aborted → finishReason = - // "aborted" path seals the turn cleanly (done event), letting the - // caller's finally clear active state and the FE clear its spinner. - try { - const toolPromise = tool.execute(call.input, ctx); - const abortPromise = new Promise<ToolResult>((resolve) => { - signal.addEventListener("abort", () => resolve({ content: "Aborted", isError: true }), { - once: true, - }); - }); - // Swallow late rejections from the orphaned tool promise: the tool - // may reject after the race already resolved with "Aborted". - void toolPromise.catch(() => {}); - return await Promise.race([toolPromise, abortPromise]); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - return { content: `Tool execution error: ${message}`, isError: true }; - } + if (tool === undefined) { + return { content: `Unknown tool: ${call.name}`, isError: true }; + } + if (signal.aborted) { + return { content: "Aborted", isError: true }; + } + const ctx: ToolExecuteContext = { + toolCallId: call.id, + signal, + onOutput: (data, stream) => { + emit(toolOutputEvent(conversationId, turnId, call.id, data, stream)); + }, + log: toolSpan?.log ?? createNoopLogger(), + conversationId, + ...(cwd !== undefined ? { cwd } : {}), + ...(computerId !== undefined ? { computerId } : {}), + }; + // Race the tool's execute promise against the abort signal so a tool + // that hangs (ignores ctx.signal, or blocks on something the signal + // can't interrupt) can't keep runTurn from returning. When the signal + // fires we RESOLVE (not reject) with an "Aborted" result so the step + // completes normally and the existing signal.aborted → finishReason = + // "aborted" path seals the turn cleanly (done event), letting the + // caller's finally clear active state and the FE clear its spinner. + try { + const toolPromise = tool.execute(call.input, ctx); + const abortPromise = new Promise<ToolResult>((resolve) => { + signal.addEventListener("abort", () => resolve({ content: "Aborted", isError: true }), { + once: true, + }); + }); + // Swallow late rejections from the orphaned tool promise: the tool + // may reject after the race already resolved with "Aborted". + void toolPromise.catch(() => {}); + return await Promise.race([toolPromise, abortPromise]); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { content: `Tool execution error: ${message}`, isError: true }; + } } interface QueueEntry { - readonly call: ToolCall; - readonly tool: ToolContract | undefined; - readonly resolve: (result: ToolResult) => void; + readonly call: ToolCall; + readonly tool: ToolContract | undefined; + readonly resolve: (result: ToolResult) => void; } export function createStepDispatcher( - toolMap: Map<string, ToolContract>, - policy: ToolDispatchPolicy, - signal: AbortSignal, - emit: EventEmitter, - conversationId: string, - turnId: string, - toolSpans: Map<string, Span>, - cwd?: string, - computerId?: string, + toolMap: Map<string, ToolContract>, + policy: ToolDispatchPolicy, + signal: AbortSignal, + emit: EventEmitter, + conversationId: string, + turnId: string, + toolSpans: Map<string, Span>, + cwd?: string, + computerId?: string, ): StepDispatcher { - let activeCount = 0; - let unsafeRunning = false; - const queue: QueueEntry[] = []; - const allPromises: Array<{ id: string; promise: Promise<ToolResult> }> = []; - const dedupMap = new Map<string, Promise<ToolResult>>(); + let activeCount = 0; + let unsafeRunning = false; + const queue: QueueEntry[] = []; + const allPromises: Array<{ id: string; promise: Promise<ToolResult> }> = []; + const dedupMap = new Map<string, Promise<ToolResult>>(); - function canStart(isConcurrencySafe: boolean): boolean { - if (unsafeRunning) return false; - if (!isConcurrencySafe && activeCount > 0) return false; - if (policy.maxConcurrent === 0) return true; - return activeCount < policy.maxConcurrent; - } + function canStart(isConcurrencySafe: boolean): boolean { + if (unsafeRunning) return false; + if (!isConcurrencySafe && activeCount > 0) return false; + if (policy.maxConcurrent === 0) return true; + return activeCount < policy.maxConcurrent; + } - function tryStartNext(): void { - while (queue.length > 0) { - const next = queue[0]; - if (next === undefined) break; - const isSafe = next.tool?.concurrencySafe !== false; - if (!canStart(isSafe)) break; - queue.shift(); - activeCount++; - if (!isSafe) unsafeRunning = true; - void runAndResolve(next); - } - } + function tryStartNext(): void { + while (queue.length > 0) { + const next = queue[0]; + if (next === undefined) break; + const isSafe = next.tool?.concurrencySafe !== false; + if (!canStart(isSafe)) break; + queue.shift(); + activeCount++; + if (!isSafe) unsafeRunning = true; + void runAndResolve(next); + } + } - async function runAndResolve(entry: QueueEntry): Promise<void> { - const tcSpan = toolSpans.get(entry.call.id); - const result = await executeToolCall( - entry.call, - entry.tool, - signal, - emit, - conversationId, - turnId, - tcSpan, - cwd, - computerId, - ); - activeCount--; - if (entry.tool?.concurrencySafe === false) unsafeRunning = false; - entry.resolve(result); - tryStartNext(); - } + async function runAndResolve(entry: QueueEntry): Promise<void> { + const tcSpan = toolSpans.get(entry.call.id); + const result = await executeToolCall( + entry.call, + entry.tool, + signal, + emit, + conversationId, + turnId, + tcSpan, + cwd, + computerId, + ); + activeCount--; + if (entry.tool?.concurrencySafe === false) unsafeRunning = false; + entry.resolve(result); + tryStartNext(); + } - function submit(call: ToolCall): void { - const tool = toolMap.get(call.name); - const key = `${call.name}:${JSON.stringify(call.input)}`; + function submit(call: ToolCall): void { + const tool = toolMap.get(call.name); + const key = `${call.name}:${JSON.stringify(call.input)}`; - const existing = dedupMap.get(key); - if (existing !== undefined) { - allPromises.push({ id: call.id, promise: existing }); - return; - } + const existing = dedupMap.get(key); + if (existing !== undefined) { + allPromises.push({ id: call.id, promise: existing }); + return; + } - const promise = new Promise<ToolResult>((resolve) => { - queue.push({ call, tool, resolve }); - tryStartNext(); - }); + const promise = new Promise<ToolResult>((resolve) => { + queue.push({ call, tool, resolve }); + tryStartNext(); + }); - dedupMap.set(key, promise); - allPromises.push({ id: call.id, promise }); - } + dedupMap.set(key, promise); + allPromises.push({ id: call.id, promise }); + } - async function drain(): Promise<Map<string, ToolResult>> { - if (signal.aborted) { - for (const item of queue) { - item.resolve({ content: "Aborted", isError: true }); - } - queue.length = 0; - } + async function drain(): Promise<Map<string, ToolResult>> { + if (signal.aborted) { + for (const item of queue) { + item.resolve({ content: "Aborted", isError: true }); + } + queue.length = 0; + } - const results = new Map<string, ToolResult>(); - for (const entry of allPromises) { - const result = await entry.promise; - results.set(entry.id, result); - } - return results; - } + const results = new Map<string, ToolResult>(); + for (const entry of allPromises) { + const result = await entry.promise; + results.set(entry.id, result); + } + return results; + } - return { submit, drain }; + return { submit, drain }; } function createNoopLogger(): Logger { - return { - debug() {}, - info() {}, - warn() {}, - error() {}, - child() { - return createNoopLogger(); - }, - span() { - return { - id: "noop", - log: createNoopLogger(), - setAttributes() {}, - addLink() {}, - child() { - return this; - }, - end() {}, - }; - }, - }; + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; } |
