diff options
| author | Adam Malczewski <[email protected]> | 2026-06-21 02:08:44 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-21 02:08:44 +0900 |
| commit | ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92 (patch) | |
| tree | 21d87eb847cd526a506cf274467fd1359f349705 /packages/kernel/src | |
| parent | 75032313a96856a932c109efbbe6b6a7eb782222 (diff) | |
| download | dispatch-ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92.tar.gz dispatch-ba47df37f0c89bff4f0c3dd7d0bc2ef6c8062b92.zip | |
feat(message-queue): per-conversation queue + steering injection
A per-conversation message queue (new message-queue extension) holds user
messages enqueued while a turn generates; delivered mid-turn as steering at the
tool-result boundary (or carried to a new turn if no tool call fires).
- kernel: RunTurnInput.drainSteering callback (generic; kernel stays pure)
- wire 0.7.0->0.8.0: QueuedMessage, QueuePayload, TurnSteeringEvent (additive)
- transport-contract 0.11.0->0.12.0: POST /conversations/:id/queue + chat.queue WS op
- message-queue ext: queue state + per-conversation custom surface (rendererId message-queue)
- session-orchestrator: enqueue facade + drainSteering wiring + post-seal carry
- transport-http/ws: queue endpoint + chat.queue op (fixes WsClientMessage exhaustive switch)
- host-bin: register message-queue
1043 vitest + 199 transport bun pass; tsc/biome clean; boot smoke clean.
FE courier: frontend-message-queue-handoff.md.
Diffstat (limited to 'packages/kernel/src')
| -rw-r--r-- | packages/kernel/src/contracts/events.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 17 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 246 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 13 |
5 files changed, 277 insertions, 1 deletions
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index b1385a2..6c9652d 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -14,6 +14,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnSteeringEvent, TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index ffcbe76..4b1350b 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -38,6 +38,7 @@ export type { TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, + TurnSteeringEvent, TurnStepCompleteEvent, TurnTextDeltaEvent, TurnToolCallEvent, diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index b7fe23c..c449a68 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -100,6 +100,23 @@ export interface RunTurnInput { * absent) — backward-compatible with callers that don't provide a clock. */ readonly now?: () => number; + + /** + * Optional. Called by the runtime at the tool-result boundary — after a + * step whose tool calls have all executed, before the next step begins — + * to drain messages to inject alongside the tool results. Whatever it + * returns is appended as user-role messages to the next step's input, so + * a caller can inject mid-turn guidance the model sees with the tool + * results. When omitted or returning an empty array, no injection happens + * (the runtime is unchanged). + * + * Injected (not ambient) so the kernel stays pure: it owns no queue and + * names no feature — it just calls the callback and appends what it gets. + * Only invoked when a step PRODUCED tool calls (the tool-result boundary); + * a step that ends without tool calls does not drain (the caller decides + * what to do with any pending messages after the turn ends). + */ + readonly drainSteering?: () => readonly ChatMessage[]; } /** diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index dcaea7f..0d4c59d 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -5,7 +5,7 @@ import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.j import type { ProviderContract, ProviderEvent } from "../contracts/provider.js"; import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { createLogger } from "../logging/logger.js"; -import { runTurn } from "./run-turn.js"; +import { MAX_STEPS, runTurn } from "./run-turn.js"; function delay(ms: number): Promise<void> { return new Promise((resolve) => { @@ -29,6 +29,28 @@ function createFakeProvider(script: ProviderEvent[][]): ProviderContract { }; } +function createCapturingProvider(script: ProviderEvent[][]): { + provider: ProviderContract; + capturedMessages: ChatMessage[][]; +} { + const capturedMessages: ChatMessage[][] = []; + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream(messages, _tools) { + capturedMessages.push([...messages]); + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) { + yield event; + } + })(); + }, + }; + return { provider, capturedMessages }; +} + function createFakeTool( name: string, handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>, @@ -2577,4 +2599,226 @@ describe("runTurn", () => { } }); }); + + describe("drainSteering", () => { + it("drainSteering called once at the tool-result boundary; returned messages appended to the next step's provider input (after tool results)", async () => { + let drainCallCount = 0; + const steeringMessage: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "steer!" }], + }; + + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return [steeringMessage]; + }, + }); + + expect(drainCallCount).toBe(1); + // The provider was called twice (tool-call step, then text step). + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + // user, assistant(tool-call), tool-result, steering(user) — in order, + // steering appended AFTER the tool results, before the next call. + expect(secondStepMessages).toHaveLength(4); + expect(secondStepMessages[0]?.role).toBe("user"); + expect(secondStepMessages[1]?.role).toBe("assistant"); + expect(secondStepMessages[2]?.role).toBe("tool"); + expect(secondStepMessages[3]).toEqual(steeringMessage); + expect(secondStepMessages[3]?.role).toBe("user"); + // Steering is fed to the next provider call, NOT surfaced in the + // turn result — the caller owns the steering messages' lifecycle. + expect(result.messages).toHaveLength(3); + }); + + it("drainSteering omitted → no injection; turn byte-identical to before", async () => { + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + // drainSteering omitted — must be a strict no-op. + }); + + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + // user, assistant(tool-call), tool-result — NO steering injected. + expect(secondStepMessages).toHaveLength(3); + expect(secondStepMessages[0]?.role).toBe("user"); + expect(secondStepMessages[1]?.role).toBe("assistant"); + expect(secondStepMessages[2]?.role).toBe("tool"); + expect(result.messages).toHaveLength(3); + }); + + it("drainSteering returns [] → no injection", async () => { + let drainCallCount = 0; + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + // Called at the boundary, but returned nothing → no injection. + expect(drainCallCount).toBe(1); + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + expect(secondStepMessages).toHaveLength(3); + expect(secondStepMessages[2]?.role).toBe("tool"); + }); + + it("drainSteering NOT called when a step has no tool calls (text-only turn)", async () => { + let drainCallCount = 0; + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hello" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + expect(drainCallCount).toBe(0); + }); + + it("multiple tool-call steps → drainSteering called once per tool-call step", async () => { + let drainCallCount = 0; + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "tool-call", toolCallId: "tc2", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + // Steps 0 and 1 each produced tool calls → drained once each. + // Step 2 (text-only) → no boundary → no drain. Total = 2. + expect(drainCallCount).toBe(2); + }); + + it("drainSteering NOT called when max-steps ends the turn after a tool-call step (no next step → no drain)", async () => { + let drainCallCount = 0; + // Every step produces a tool call → the turn runs to MAX_STEPS. + const script: ProviderEvent[][] = Array.from({ length: MAX_STEPS }, () => [ + { type: "tool-call", toolCallId: "tc", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ]); + const provider = createFakeProvider(script); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + expect(result.finishReason).toBe("max-steps"); + // MAX_STEPS tool-call steps (indices 0..MAX_STEPS-1). Drained on every + // step that is followed by a next step (0..MAX_STEPS-2 = MAX_STEPS-1 + // calls); the final step is the max-steps boundary → no next step → + // no drain (queue left intact for the caller). + expect(drainCallCount).toBe(MAX_STEPS - 1); + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index bf57854..228ef8a 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -538,6 +538,19 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { if (step === MAX_STEPS - 1) { finishReason = "max-steps"; + // No next step → no tool-result boundary. Leave any pending + // steering messages for the caller (it owns the queue). + } else { + // Tool-result boundary: this step produced tool calls and we are + // about to call provider.stream again. Drain steering messages + // and append them after the tool results, before the next call. + // The kernel owns no queue and names no feature — it just calls + // the callback and appends. Emits nothing (caller emits the + // `steering` AgentEvent in its own wrapper). + const steering = input.drainSteering?.() ?? []; + for (const msg of steering) { + messages.push(msg); + } } } } finally { |
