diff options
| author | Adam Malczewski <[email protected]> | 2026-06-12 14:58:33 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-12 14:58:33 +0900 |
| commit | b3d270803f95db2467e20bb742aa42faf6867f91 (patch) | |
| tree | 6abac5ec8ef76bcb95a4b9e93891cd3c87f451a6 | |
| parent | 86b5137c4f7f2bcc08f0580f1edaa05d14015e63 (diff) | |
| download | dispatch-b3d270803f95db2467e20bb742aa42faf6867f91.tar.gz dispatch-b3d270803f95db2467e20bb742aa42faf6867f91.zip | |
fix(turns): emit user prompt on the turn event stream (CR-3)
A pure watcher (subscribed but not the sender) couldn't see the user prompt
until the turn sealed: the user message was only persisted at seal and never
entered the live/replayable stream. Add an additive TurnInputEvent
{type:"user-message", conversationId, turnId, text} to the AgentEvent union and
emit it via the broadcast/buffer path as the first event of every turn, so it is
replayed to all subscribers (live + late-join) and on the HTTP path. Persistence
and metrics unchanged; the union widening breaks no exhaustive switch.
- @dispatch/wire 0.5.0->0.6.0; @dispatch/transport-contract 0.7.0->0.8.0 (re-export)
- session-orchestrator: emit user-message at runTurnDetached start; +3 tests,
3 Wave-1 tests updated (user-message precedes turn-start)
- FE courier: frontend-cr3-user-message-handoff.md
Live-verified vs flash: watcher receives user-message (correct text) as its first
chat.delta before turn-sealed. 894 vitest + transport bun green; tsc -b EXIT 0.
| -rw-r--r-- | frontend-cr3-user-message-handoff.md | 54 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/events.ts | 1 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 1 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 152 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 2 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 20 | ||||
| -rw-r--r-- | tasks.md | 19 |
9 files changed, 247 insertions, 6 deletions
diff --git a/frontend-cr3-user-message-handoff.md b/frontend-cr3-user-message-handoff.md new file mode 100644 index 0000000..0a7859a --- /dev/null +++ b/frontend-cr3-user-message-handoff.md @@ -0,0 +1,54 @@ +# FE handoff — CR-3 fixed: user prompt is now on the turn's event stream + +Courier to `../dispatch-web`. This resolves CR-3 from `backend-handoff.md` ("a watcher can't see +the turn's USER prompt until seal"). **Option B implemented + live-verified.** Your staged-but-inert +consumption can now be turned on. + +## What shipped (backend) + +A new **additive** `AgentEvent` variant carries the user prompt INTO the turn's outward stream: + +```ts +// @dispatch/wire — added to the AgentEvent union +interface TurnInputEvent { + type: "user-message"; + conversationId: string; + turnId: string; + text: string; // the raw prompt, exactly as sent +} +``` + +`session-orchestrator` emits it via the broadcast/buffer path as the **FIRST event of every turn** +(before `turn-start`), so it is replayed to every subscriber — live AND late-join — and arrives on +the HTTP/NDJSON path too. Persistence is unchanged (the user message is still appended atomically at +seal); this only adds a buffered/broadcast event. Metrics are unaffected (it is not usage). + +## Version bumps (re-pin both) + +- `@dispatch/wire` **`0.5.0 → 0.6.0`** (additive union member). +- `@dispatch/transport-contract` **`0.7.0 → 0.8.0`** (re-exports `AgentEvent`/`chat.delta`, which now + carries `user-message`; no other transport-contract change). + +Re-mirror `.dispatch/{wire,transport-contract}.reference.md` and add `user-message` to the FE +exhaustiveness guard. + +## FE action + +Flip on the already-staged `core/chunks` branch that folds a `user-message` event into a provisional +user chunk for watchers, with your text dedup against the sender's optimistic echo. After re-pin: +- a **pure watcher** (second device / `chat.subscribe` only) now shows the user bubble the moment the + turn starts, not at seal; +- the **sender** is unchanged (its optimistic echo dedups against the replayed `user-message`); +- a **late-joiner** gets `user-message` first in the replay, then the rest of the in-flight turn. + +## Live-verified (backend, vs flash) + +Two WS clients on one conversation; client B subscribed but never sent. On A's `chat.send`, B received +`chat.delta { event:{ type:"user-message", text:"…", turnId, conversationId } }` as its **first** delta +(index 0), **before** `turn-sealed`, with `text` equal to A's prompt, then the streaming reply. `RESULT: OK`. + +## Note + +The ordering guarantee is: `user-message` is the first event of the turn, immediately followed by +`turn-start`, then the usual deltas → `done` → `turn-sealed`. Treat `user-message` as turn-scoped +(it carries `turnId`) so a multi-turn transcript attributes each prompt to its turn. diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts index be09066..b1385a2 100644 --- a/packages/kernel/src/contracts/events.ts +++ b/packages/kernel/src/contracts/events.ts @@ -10,6 +10,7 @@ export type { StatusEvent, TurnDoneEvent, TurnErrorEvent, + TurnInputEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index b5802f3..10025e2 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -34,6 +34,7 @@ export type { StatusEvent, TurnDoneEvent, TurnErrorEvent, + TurnInputEvent, TurnReasoningDeltaEvent, TurnSealedEvent, TurnStartEvent, diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index efa5d4e..c3fbbc8 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -1646,7 +1646,8 @@ describe("detached turn hub", () => { expect(events.length).toBeGreaterThan(0); const types = events.map((e) => e.type); - expect(types[0]).toBe("turn-start"); + expect(types[0]).toBe("user-message"); + expect(types[1]).toBe("turn-start"); expect(types).toContain("text-delta"); expect(types[types.length - 1]).toBe("turn-sealed"); @@ -1690,7 +1691,8 @@ describe("detached turn hub", () => { expect(eventsA).toEqual(eventsB); const types = eventsA.map((e) => e.type); - expect(types[0]).toBe("turn-start"); + expect(types[0]).toBe("user-message"); + expect(types[1]).toBe("turn-start"); expect(types[types.length - 1]).toBe("turn-sealed"); }); @@ -1946,7 +1948,8 @@ describe("detached turn hub", () => { }); const types = events.map((e) => e.type); - expect(types[0]).toBe("turn-start"); + expect(types[0]).toBe("user-message"); + expect(types[1]).toBe("turn-start"); expect(types).toContain("text-delta"); expect(types[types.length - 1]).toBe("turn-sealed"); @@ -2016,3 +2019,146 @@ describe("detached turn hub", () => { expect(firstEvents.some((e) => e.type === "turn-sealed")).toBe(true); }); }); + +describe("user-message event", () => { + function waitForEvent( + orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"], + conversationId: string, + eventType: string, + ): Promise<AgentEvent> { + return new Promise((resolve) => { + const unsub = orchestrator.subscribe(conversationId, (event) => { + if (event.type === eventType) { + unsub(); + resolve(event); + } + }); + }); + } + + it("emits user-message first — pre-subscriber receives user-message before turn-start", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + }); + + const events: AgentEvent[] = []; + const unsub = orchestrator.subscribe("conv-um-first", (e) => events.push(e)); + + orchestrator.startTurn({ conversationId: "conv-um-first", text: "What is 2+2?" }); + + const sealed = waitForEvent(orchestrator, "conv-um-first", "turn-sealed"); + await sealed; + unsub(); + + expect(events.length).toBeGreaterThan(1); + expect(events[0]?.type).toBe("user-message"); + const um = events[0] as AgentEvent & { type: "user-message" }; + expect(um.text).toBe("What is 2+2?"); + expect(um.conversationId).toBe("conv-um-first"); + expect(um.turnId).toMatch(/^turn-/); + expect(events[1]?.type).toBe("turn-start"); + }); + + it("late-join replays user-message — buffer starts with user-message", async () => { + const store = createInMemoryStore(); + let emitBarrierResolve: (() => void) | undefined; + const emitBarrier = new Promise<void>((resolve) => { + emitBarrierResolve = resolve; + }); + + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = callIndex++; + return (async function* () { + if (idx === 0) { + yield { type: "text-delta", delta: "Hello" } as ProviderEvent; + await emitBarrier; + yield { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + }); + + orchestrator.startTurn({ conversationId: "conv-um-late", text: "late prompt" }); + + await new Promise<void>((resolve) => setTimeout(resolve, 10)); + + const lateEvents: AgentEvent[] = []; + const unsubLate = orchestrator.subscribe("conv-um-late", (e) => lateEvents.push(e)); + + expect(lateEvents.length).toBeGreaterThanOrEqual(1); + expect(lateEvents[0]?.type).toBe("user-message"); + const um = lateEvents[0] as AgentEvent & { type: "user-message" }; + expect(um.text).toBe("late prompt"); + expect(um.turnId).toMatch(/^turn-/); + + emitBarrierResolve?.(); + const sealed = waitForEvent(orchestrator, "conv-um-late", "turn-sealed"); + await sealed; + unsubLate(); + }); + + it("metrics unaffected — user-message does not alter TurnMetrics", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "ok" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { orchestrator } = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + applyToolsFilter: identityApplyToolsFilter, + runTurn, + now: () => 1000, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-um-metrics", + text: "test", + onEvent: () => {}, + }); + + const metrics = store.metricsData.get("conv-um-metrics"); + expect(metrics).toBeDefined(); + expect(metrics).toHaveLength(1); + + const tm = metrics?.[0]; + if (tm === undefined) throw new Error("expected metrics"); + + expect(tm.turnId).toMatch(/^turn-/); + expect(tm.usage.inputTokens).toBe(10); + expect(tm.usage.outputTokens).toBe(5); + expect(tm.steps).toHaveLength(1); + expect(tm.steps[0]?.usage.inputTokens).toBe(10); + expect(tm.steps[0]?.usage.outputTokens).toBe(5); + }); +}); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 0421108..d55114b 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -162,6 +162,8 @@ export function createSessionOrchestrator( activeTurns.set(conversationId, { buffer: [], turnId }); activeConversations.add(conversationId); + emitToHub(conversationId, { type: "user-message", conversationId, turnId, text }); + const effectiveCwdPromise = cwd !== undefined ? Promise.resolve(cwd) diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index 6e00599..6a7a946 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.7.0", + "version": "0.8.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/package.json b/packages/wire/package.json index 80671cf..f4ede19 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.5.0", + "version": "0.6.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 52662ef..28aab87 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -213,6 +213,7 @@ export interface TurnMetrics { export type AgentEvent = | StatusEvent | TurnStartEvent + | TurnInputEvent | TurnTextDeltaEvent | TurnReasoningDeltaEvent | TurnToolCallEvent @@ -238,6 +239,25 @@ export interface TurnStartEvent { readonly turnId: string; } +/** + * The user prompt that opened this turn, surfaced INTO the turn's outward event + * stream. The user message is persisted only when the turn seals (atomically with + * the assistant reply), so without this event a client that is merely WATCHING a + * conversation (subscribed but not the sender) has no source for the prompt text + * mid-turn — it would see the streaming reply with no preceding user bubble until + * seal. Emitted once, as the FIRST event of the turn (before `turn-start`), so it + * is buffered and replayed to every subscriber — live and late-join — exactly like + * the rest of the turn. The sender already echoes its own prompt optimistically, so + * a consumer should de-dup against that (e.g. by text); a pure watcher renders it + * directly. Carries the raw prompt `text` (the same text passed to the provider). + */ +export interface TurnInputEvent { + readonly type: "user-message"; + readonly conversationId: string; + readonly turnId: string; + readonly text: string; +} + /** Incremental text content from the model during a turn. */ export interface TurnTextDeltaEvent { readonly type: "text-delta"; @@ -5,7 +5,7 @@ > Keep this lean and current; do not let it re-accrete a step-by-step changelog. ## Status (current) -`tsc -b` EXIT 0 · biome clean · **891 vitest + transport bun green**. +`tsc -b` EXIT 0 · biome clean · **894 vitest + transport bun green**. Built and verified live (full-fidelity: every feature is a manifest-loaded extension through the host): @@ -283,6 +283,23 @@ Principle enforced: **the FE is only a control interface; the AI runs independen zero deltas though the turn ran+persisted. Caught by live-verify (unit test had subscribed AFTER start, masking it). Fixed via the persistent-subscribers / per-turn-buffer split. +## Turn continuity — CR-3: user prompt on the event stream (DONE) +FE bug (multi-client): a pure watcher (subscribed, not the sender) couldn't see the USER prompt until +seal — the user message was passed to the provider + persisted only at seal, never on the turn's +outward stream/buffer. FE courier: `frontend-cr3-user-message-handoff.md`. +- **Contract:** `@dispatch/wire` `0.5.0→0.6.0` — additive `TurnInputEvent` + `{ type:"user-message"; conversationId; turnId; text }` on the `AgentEvent` union (kernel barrels + re-export it). `@dispatch/transport-contract` `0.7.0→0.8.0` (re-export only). Widening broke NO + exhaustive switch (typecheck clean) — zero consumer fan-out. +- **session-orchestrator:** `emitToHub({type:"user-message",…})` as the FIRST event of `runTurnDetached` + (before `runTurn`) → buffered + broadcast to all subscribers (live + late-join); HTTP path covered via + `handleMessage`'s buffer replay. Persistence + metrics unchanged. +3 tests; 3 Wave-1 tests updated + (user-message now precedes turn-start). +- **LIVE-VERIFIED vs flash:** a watcher that never sent receives `user-message` (correct text) as its + FIRST `chat.delta`, before `turn-sealed`, then the streaming reply. `RESULT: OK`. +- **Process note:** implemented directly by the orchestrator (user directive: "do implementations + yourself going forward") rather than via a summoned owner-agent. + ## Open items - **Context window LIMIT (next, sibling of context size):** expose the selected model's max context-window token limit so the FE can render `contextSize / limit` (e.g. `1286 / 200000`). |
