summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel/src/runtime
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-21 02:08:44 +0900
committerAdam Malczewski <[email protected]>2026-06-21 02:08:44 +0900
commitba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92 (patch)
tree21d87eb847cd526a506cf274467fd1359f349705 /packages/kernel/src/runtime
parent75032313a96856a932c109efbbe6b6a7eb782222 (diff)
downloaddispatch-ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92.tar.gz
dispatch-ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92.zip
feat(message-queue): per-conversation queue + steering injection
A per-conversation message queue (new message-queue extension) holds user messages enqueued while a turn generates; delivered mid-turn as steering at the tool-result boundary (or carried to a new turn if no tool call fires). - kernel: RunTurnInput.drainSteering callback (generic; kernel stays pure) - wire 0.7.0->0.8.0: QueuedMessage, QueuePayload, TurnSteeringEvent (additive) - transport-contract 0.11.0->0.12.0: POST /conversations/:id/queue + chat.queue WS op - message-queue ext: queue state + per-conversation custom surface (rendererId message-queue) - session-orchestrator: enqueue facade + drainSteering wiring + post-seal carry - transport-http/ws: queue endpoint + chat.queue op (fixes WsClientMessage exhaustive switch) - host-bin: register message-queue 1043 vitest + 199 transport bun pass; tsc/biome clean; boot smoke clean. FE courier: frontend-message-queue-handoff.md.
Diffstat (limited to 'packages/kernel/src/runtime')
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts246
-rw-r--r--packages/kernel/src/runtime/run-turn.ts13
2 files changed, 258 insertions, 1 deletions
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index dcaea7f..0d4c59d 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -5,7 +5,7 @@ import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.j
import type { ProviderContract, ProviderEvent } from "../contracts/provider.js";
import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js";
import { createLogger } from "../logging/logger.js";
-import { runTurn } from "./run-turn.js";
+import { MAX_STEPS, runTurn } from "./run-turn.js";
function delay(ms: number): Promise<void> {
return new Promise((resolve) => {
@@ -29,6 +29,28 @@ function createFakeProvider(script: ProviderEvent[][]): ProviderContract {
};
}
+function createCapturingProvider(script: ProviderEvent[][]): {
+ provider: ProviderContract;
+ capturedMessages: ChatMessage[][];
+} {
+ const capturedMessages: ChatMessage[][] = [];
+ let callIndex = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream(messages, _tools) {
+ capturedMessages.push([...messages]);
+ const events = script[callIndex] ?? [];
+ callIndex++;
+ return (async function* () {
+ for (const event of events) {
+ yield event;
+ }
+ })();
+ },
+ };
+ return { provider, capturedMessages };
+}
+
function createFakeTool(
name: string,
handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>,
@@ -2577,4 +2599,226 @@ describe("runTurn", () => {
}
});
});
+
+ describe("drainSteering", () => {
+ it("drainSteering called once at the tool-result boundary; returned messages appended to the next step's provider input (after tool results)", async () => {
+ let drainCallCount = 0;
+ const steeringMessage: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "steer!" }],
+ };
+
+ const { provider, capturedMessages } = createCapturingProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [steeringMessage];
+ },
+ });
+
+ expect(drainCallCount).toBe(1);
+ // The provider was called twice (tool-call step, then text step).
+ expect(capturedMessages).toHaveLength(2);
+ const secondStepMessages = capturedMessages[1] ?? [];
+ // user, assistant(tool-call), tool-result, steering(user) — in order,
+ // steering appended AFTER the tool results, before the next call.
+ expect(secondStepMessages).toHaveLength(4);
+ expect(secondStepMessages[0]?.role).toBe("user");
+ expect(secondStepMessages[1]?.role).toBe("assistant");
+ expect(secondStepMessages[2]?.role).toBe("tool");
+ expect(secondStepMessages[3]).toEqual(steeringMessage);
+ expect(secondStepMessages[3]?.role).toBe("user");
+ // Steering is fed to the next provider call, NOT surfaced in the
+ // turn result — the caller owns the steering messages' lifecycle.
+ expect(result.messages).toHaveLength(3);
+ });
+
+ it("drainSteering omitted → no injection; turn byte-identical to before", async () => {
+ const { provider, capturedMessages } = createCapturingProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ // drainSteering omitted — must be a strict no-op.
+ });
+
+ expect(capturedMessages).toHaveLength(2);
+ const secondStepMessages = capturedMessages[1] ?? [];
+ // user, assistant(tool-call), tool-result — NO steering injected.
+ expect(secondStepMessages).toHaveLength(3);
+ expect(secondStepMessages[0]?.role).toBe("user");
+ expect(secondStepMessages[1]?.role).toBe("assistant");
+ expect(secondStepMessages[2]?.role).toBe("tool");
+ expect(result.messages).toHaveLength(3);
+ });
+
+ it("drainSteering returns [] → no injection", async () => {
+ let drainCallCount = 0;
+ const { provider, capturedMessages } = createCapturingProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ // Called at the boundary, but returned nothing → no injection.
+ expect(drainCallCount).toBe(1);
+ expect(capturedMessages).toHaveLength(2);
+ const secondStepMessages = capturedMessages[1] ?? [];
+ expect(secondStepMessages).toHaveLength(3);
+ expect(secondStepMessages[2]?.role).toBe("tool");
+ });
+
+ it("drainSteering NOT called when a step has no tool calls (text-only turn)", async () => {
+ let drainCallCount = 0;
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hello" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ expect(drainCallCount).toBe(0);
+ });
+
+ it("multiple tool-call steps → drainSteering called once per tool-call step", async () => {
+ let drainCallCount = 0;
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "tool-call", toolCallId: "tc2", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ // Steps 0 and 1 each produced tool calls → drained once each.
+ // Step 2 (text-only) → no boundary → no drain. Total = 2.
+ expect(drainCallCount).toBe(2);
+ });
+
+ it("drainSteering NOT called when max-steps ends the turn after a tool-call step (no next step → no drain)", async () => {
+ let drainCallCount = 0;
+ // Every step produces a tool call → the turn runs to MAX_STEPS.
+ const script: ProviderEvent[][] = Array.from({ length: MAX_STEPS }, () => [
+ { type: "tool-call", toolCallId: "tc", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ]);
+ const provider = createFakeProvider(script);
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ drainSteering: () => {
+ drainCallCount++;
+ return [];
+ },
+ });
+
+ expect(result.finishReason).toBe("max-steps");
+ // MAX_STEPS tool-call steps (indices 0..MAX_STEPS-1). Drained on every
+ // step that is followed by a next step (0..MAX_STEPS-2 = MAX_STEPS-1
+ // calls); the final step is the max-steps boundary → no next step →
+ // no drain (queue left intact for the caller).
+ expect(drainCallCount).toBe(MAX_STEPS - 1);
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index bf57854..228ef8a 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -538,6 +538,19 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
if (step === MAX_STEPS - 1) {
finishReason = "max-steps";
+ // No next step → no tool-result boundary. Leave any pending
+ // steering messages for the caller (it owns the queue).
+ } else {
+ // Tool-result boundary: this step produced tool calls and we are
+ // about to call provider.stream again. Drain steering messages
+ // and append them after the tool results, before the next call.
+ // The kernel owns no queue and names no feature — it just calls
+ // the callback and appends. Emits nothing (caller emits the
+ // `steering` AgentEvent in its own wrapper).
+ const steering = input.drainSteering?.() ?? [];
+ for (const msg of steering) {
+ messages.push(msg);
+ }
}
}
} finally {