diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 14:58:33 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 14:58:33 +0900 |
| commit | b3d270803f95db2467e20bb742aa42faf6867f91 (patch) | |
| tree | 6abac5ec8ef76bcb95a4b9e93891cd3c87f451a6 /packages | |
| parent | 86b5137c4f7f2bcc08f0580f1edaa05d14015e63 (diff) | |
| download | dispatch-b3d270803f95db2467e20bb742aa42faf6867f91.tar.gz dispatch-b3d270803f95db2467e20bb742aa42faf6867f91.zip | |
fix(turns): emit user prompt on the turn event stream (CR-3)
A pure watcher (subscribed but not the sender) couldn't see the user prompt
until the turn sealed: the user message was only persisted at seal and never
entered the live/replayable stream. Add an additive TurnInputEvent
{type:"user-message", conversationId, turnId, text} to the AgentEvent union and
emit it via the broadcast/buffer path as the first event of every turn, so it is
replayed to all subscribers (live + late-join) and on the HTTP path. Persistence
and metrics unchanged; the union widening breaks no exhaustive switch.
- @dispatch/wire 0.5.0->0.6.0; @dispatch/transport-contract 0.7.0->0.8.0 (re-export)
- session-orchestrator: emit user-message at runTurnDetached start; +3 tests,
3 Wave-1 tests updated (user-message precedes turn-start)
- FE courier: frontend-cr3-user-message-handoff.md
Live-verified vs flash: watcher receives user-message (correct text) as its first
chat.delta before turn-sealed. 894 vitest + transport bun green; tsc -b EXIT 0.
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/kernel/src/contracts/events.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 1 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 152 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 2 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 20 |
7 files changed, 175 insertions, 5 deletions
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index be09066..b1385a2 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -10,6 +10,7 @@ export type { StatusEvent, TurnDoneEvent, TurnErrorEvent, + TurnInputEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index b5802f3..10025e2 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -34,6 +34,7 @@ export type { StatusEvent, TurnDoneEvent, TurnErrorEvent, + TurnInputEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index efa5d4e..c3fbbc8 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -1646,7 +1646,8 @@ describe("detached turn hub", () => { expect(events.length).toBeGreaterThan(0); const types = events.map((e) => e.type); - expect(types[0]).toBe("turn-start"); + expect(types[0]).toBe("user-message"); + expect(types[1]).toBe("turn-start"); expect(types).toContain("text-delta"); expect(types[types.length - 1]).toBe("turn-sealed"); @@ -1690,7 +1691,8 @@ describe("detached turn hub", () => { expect(eventsA).toEqual(eventsB); const types = eventsA.map((e) => e.type); - expect(types[0]).toBe("turn-start"); + expect(types[0]).toBe("user-message"); + expect(types[1]).toBe("turn-start"); expect(types[types.length - 1]).toBe("turn-sealed"); }); @@ -1946,7 +1948,8 @@ describe("detached turn hub", () => { }); const types = events.map((e) => e.type); - expect(types[0]).toBe("turn-start"); + expect(types[0]).toBe("user-message"); + expect(types[1]).toBe("turn-start"); expect(types).toContain("text-delta"); expect(types[types.length - 1]).toBe("turn-sealed"); @@ -2016,3 +2019,146 @@ describe("detached turn hub", () => { expect(firstEvents.some((e) => e.type === "turn-sealed")).toBe(true); }); }); + +describe("user-message event", () => { + function waitForEvent( + orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"], + conversationId: string, + eventType: string, + ): Promise<AgentEvent> { + return new Promise((resolve) => { + const unsub = orchestrator.subscribe(conversationId, (event) => { + if (event.type === eventType) { + unsub(); + resolve(event); + } + }); + }); + } + + it("emits user-message first — pre-subscriber receives user-message before turn-start", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + }); + + const events: AgentEvent[] = []; + const unsub = orchestrator.subscribe("conv-um-first", (e) => events.push(e)); + + orchestrator.startTurn({ conversationId: "conv-um-first", text: "What is 2+2?" }); + + const sealed = waitForEvent(orchestrator, "conv-um-first", "turn-sealed"); + await sealed; + unsub(); + + expect(events.length).toBeGreaterThan(1); + expect(events[0]?.type).toBe("user-message"); + const um = events[0] as AgentEvent & { type: "user-message" }; + expect(um.text).toBe("What is 2+2?"); + expect(um.conversationId).toBe("conv-um-first"); + expect(um.turnId).toMatch(/^turn-/); + expect(events[1]?.type).toBe("turn-start"); + }); + + it("late-join replays user-message — buffer starts with user-message", async () => { + const store = createInMemoryStore(); + let emitBarrierResolve: (() => void) | undefined; + const emitBarrier = new Promise<void>((resolve) => { + emitBarrierResolve = resolve; + }); + + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = callIndex++; + return (async function* () { + if (idx === 0) { + yield { type: "text-delta", delta: "Hello" } as ProviderEvent; + await emitBarrier; + yield { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + }); + + orchestrator.startTurn({ conversationId: "conv-um-late", text: "late prompt" }); + + await new Promise<void>((resolve) => setTimeout(resolve, 10)); + + const lateEvents: AgentEvent[] = []; + const unsubLate = orchestrator.subscribe("conv-um-late", (e) => lateEvents.push(e)); + + expect(lateEvents.length).toBeGreaterThanOrEqual(1); + expect(lateEvents[0]?.type).toBe("user-message"); + const um = lateEvents[0] as AgentEvent & { type: "user-message" }; + expect(um.text).toBe("late prompt"); + expect(um.turnId).toMatch(/^turn-/); + + emitBarrierResolve?.(); + const sealed = waitForEvent(orchestrator, "conv-um-late", "turn-sealed"); + await sealed; + unsubLate(); + }); + + it("metrics unaffected — user-message does not alter TurnMetrics", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "ok" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + now: () => 1000, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-um-metrics", + text: "test", + onEvent: () => {}, + }); + + const metrics = store.metricsData.get("conv-um-metrics"); + expect(metrics).toBeDefined(); + expect(metrics).toHaveLength(1); + + const tm = metrics?.[0]; + if (tm === undefined) throw new Error("expected metrics"); + + expect(tm.turnId).toMatch(/^turn-/); + expect(tm.usage.inputTokens).toBe(10); + expect(tm.usage.outputTokens).toBe(5); + expect(tm.steps).toHaveLength(1); + expect(tm.steps[0]?.usage.inputTokens).toBe(10); + expect(tm.steps[0]?.usage.outputTokens).toBe(5); + }); +}); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 0421108..d55114b 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -162,6 +162,8 @@ export function createSessionOrchestrator( activeTurns.set(conversationId, { buffer: [], turnId }); activeConversations.add(conversationId); + emitToHub(conversationId, { type: "user-message", conversationId, turnId, text }); + const effectiveCwdPromise = cwd !== undefined ? Promise.resolve(cwd) diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 6e00599..6a7a946 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.7.0", + "version": "0.8.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/package.json b/packages/wire/package.json index 80671cf..f4ede19 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.5.0", + "version": "0.6.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 52662ef..28aab87 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -213,6 +213,7 @@ export interface TurnMetrics { export type AgentEvent = | StatusEvent | TurnStartEvent + | TurnInputEvent | TurnTextDeltaEvent | TurnReasoningDeltaEvent | TurnToolCallEvent @@ -238,6 +239,25 @@ export interface TurnStartEvent { readonly turnId: string; } +/** + * The user prompt that opened this turn, surfaced INTO the turn's outward event + * stream. The user message is persisted only when the turn seals (atomically with + * the assistant reply), so without this event a client that is merely WATCHING a + * conversation (subscribed but not the sender) has no source for the prompt text + * mid-turn — it would see the streaming reply with no preceding user bubble until + * seal. Emitted once, as the FIRST event of the turn (before `turn-start`), so it + * is buffered and replayed to every subscriber — live and late-join — exactly like + * the rest of the turn. The sender already echoes its own prompt optimistically, so + * a consumer should de-dup against that (e.g. by text); a pure watcher renders it + * directly. Carries the raw prompt `text` (the same text passed to the provider). + */ +export interface TurnInputEvent { + readonly type: "user-message"; + readonly conversationId: string; + readonly turnId: string; + readonly text: string; +} + /** Incremental text content from the model during a turn. */ export interface TurnTextDeltaEvent { readonly type: "text-delta"; |
