summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-12 14:58:33 +0900
committerAdam Malczewski <[email protected]>2026-06-12 14:58:33 +0900
commitb3d270803f95db2467e20bb742aa42faf6867f91 (patch)
tree6abac5ec8ef76bcb95a4b9e93891cd3c87f451a6
parent86b5137c4f7f2bcc08f0580f1edaa05d14015e63 (diff)
downloaddispatch-b3d270803f95db2467e20bb742aa42faf6867f91.tar.gz
dispatch-b3d270803f95db2467e20bb742aa42faf6867f91.zip
fix(turns): emit user prompt on the turn event stream (CR-3)
A pure watcher (subscribed but not the sender) couldn't see the user prompt until the turn sealed: the user message was only persisted at seal and never entered the live/replayable stream. Add an additive TurnInputEvent {type:"user-message", conversationId, turnId, text} to the AgentEvent union and emit it via the broadcast/buffer path as the first event of every turn, so it is replayed to all subscribers (live + late-join) and on the HTTP path. Persistence and metrics unchanged; the union widening breaks no exhaustive switch. - @dispatch/wire 0.5.0->0.6.0; @dispatch/transport-contract 0.7.0->0.8.0 (re-export) - session-orchestrator: emit user-message at runTurnDetached start; +3 tests, 3 Wave-1 tests updated (user-message precedes turn-start) - FE courier: frontend-cr3-user-message-handoff.md Live-verified vs flash: watcher receives user-message (correct text) as its first chat.delta before turn-sealed. 894 vitest + transport bun green; tsc -b EXIT 0.
-rw-r--r--frontend-cr3-user-message-handoff.md54
-rw-r--r--packages/kernel/src/contracts/events.ts1
-rw-r--r--packages/kernel/src/contracts/index.ts1
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts152
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts2
-rw-r--r--packages/transport-contract/package.json2
-rw-r--r--packages/wire/package.json2
-rw-r--r--packages/wire/src/index.ts20
-rw-r--r--tasks.md19
9 files changed, 247 insertions, 6 deletions
diff --git a/frontend-cr3-user-message-handoff.md b/frontend-cr3-user-message-handoff.md
new file mode 100644
index 0000000..0a7859a
--- /dev/null
+++ b/frontend-cr3-user-message-handoff.md
@@ -0,0 +1,54 @@
+# FE handoff — CR-3 fixed: user prompt is now on the turn's event stream
+
+Courier to `../dispatch-web`. This resolves CR-3 from `backend-handoff.md` ("a watcher can't see
+the turn's USER prompt until seal"). **Option B implemented + live-verified.** Your staged-but-inert
+consumption can now be turned on.
+
+## What shipped (backend)
+
+A new **additive** `AgentEvent` variant carries the user prompt INTO the turn's outward stream:
+
+```ts
+// @dispatch/wire — added to the AgentEvent union
+interface TurnInputEvent {
+ type: "user-message";
+ conversationId: string;
+ turnId: string;
+ text: string; // the raw prompt, exactly as sent
+}
+```
+
+`session-orchestrator` emits it via the broadcast/buffer path as the **FIRST event of every turn**
+(before `turn-start`), so it is replayed to every subscriber — live AND late-join — and arrives on
+the HTTP/NDJSON path too. Persistence is unchanged (the user message is still appended atomically at
+seal); this only adds a buffered/broadcast event. Metrics are unaffected (it is not usage).
+
+## Version bumps (re-pin both)
+
+- `@dispatch/wire` **`0.5.0 → 0.6.0`** (additive union member).
+- `@dispatch/transport-contract` **`0.7.0 → 0.8.0`** (re-exports `AgentEvent`/`chat.delta`, which now
+ carries `user-message`; no other transport-contract change).
+
+Re-mirror `.dispatch/{wire,transport-contract}.reference.md` and add `user-message` to the FE
+exhaustiveness guard.
+
+## FE action
+
+Flip on the already-staged `core/chunks` branch that folds a `user-message` event into a provisional
+user chunk for watchers, with your text dedup against the sender's optimistic echo. After re-pin:
+- a **pure watcher** (second device / `chat.subscribe` only) now shows the user bubble the moment the
+ turn starts, not at seal;
+- the **sender** is unchanged (its optimistic echo dedups against the replayed `user-message`);
+- a **late-joiner** gets `user-message` first in the replay, then the rest of the in-flight turn.
+
+## Live-verified (backend, vs flash)
+
+Two WS clients on one conversation; client B subscribed but never sent. On A's `chat.send`, B received
+`chat.delta { event:{ type:"user-message", text:"…", turnId, conversationId } }` as its **first** delta
+(index 0), **before** `turn-sealed`, with `text` equal to A's prompt, then the streaming reply. `RESULT: OK`.
+
+## Note
+
+The ordering guarantee is: `user-message` is the first event of the turn, immediately followed by
+`turn-start`, then the usual deltas → `done` → `turn-sealed`. Treat `user-message` as turn-scoped
+(it carries `turnId`) so a multi-turn transcript attributes each prompt to its turn.
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts
index be09066..b1385a2 100644
--- a/packages/kernel/src/contracts/events.ts
+++ b/packages/kernel/src/contracts/events.ts
@@ -10,6 +10,7 @@ export type {
StatusEvent,
TurnDoneEvent,
TurnErrorEvent,
+ TurnInputEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index b5802f3..10025e2 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -34,6 +34,7 @@ export type {
StatusEvent,
TurnDoneEvent,
TurnErrorEvent,
+ TurnInputEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index efa5d4e..c3fbbc8 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -1646,7 +1646,8 @@ describe("detached turn hub", () => {
expect(events.length).toBeGreaterThan(0);
const types = events.map((e) => e.type);
- expect(types[0]).toBe("turn-start");
+ expect(types[0]).toBe("user-message");
+ expect(types[1]).toBe("turn-start");
expect(types).toContain("text-delta");
expect(types[types.length - 1]).toBe("turn-sealed");
@@ -1690,7 +1691,8 @@ describe("detached turn hub", () => {
expect(eventsA).toEqual(eventsB);
const types = eventsA.map((e) => e.type);
- expect(types[0]).toBe("turn-start");
+ expect(types[0]).toBe("user-message");
+ expect(types[1]).toBe("turn-start");
expect(types[types.length - 1]).toBe("turn-sealed");
});
@@ -1946,7 +1948,8 @@ describe("detached turn hub", () => {
});
const types = events.map((e) => e.type);
- expect(types[0]).toBe("turn-start");
+ expect(types[0]).toBe("user-message");
+ expect(types[1]).toBe("turn-start");
expect(types).toContain("text-delta");
expect(types[types.length - 1]).toBe("turn-sealed");
@@ -2016,3 +2019,146 @@ describe("detached turn hub", () => {
expect(firstEvents.some((e) => e.type === "turn-sealed")).toBe(true);
});
});
+
+describe("user-message event", () => {
+ function waitForEvent(
+ orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"],
+ conversationId: string,
+ eventType: string,
+ ): Promise<AgentEvent> {
+ return new Promise((resolve) => {
+ const unsub = orchestrator.subscribe(conversationId, (event) => {
+ if (event.type === eventType) {
+ unsub();
+ resolve(event);
+ }
+ });
+ });
+ }
+
+ it("emits user-message first — pre-subscriber receives user-message before turn-start", async () => {
+ const store = createInMemoryStore();
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "Hello" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ });
+
+ const events: AgentEvent[] = [];
+ const unsub = orchestrator.subscribe("conv-um-first", (e) => events.push(e));
+
+ orchestrator.startTurn({ conversationId: "conv-um-first", text: "What is 2+2?" });
+
+ const sealed = waitForEvent(orchestrator, "conv-um-first", "turn-sealed");
+ await sealed;
+ unsub();
+
+ expect(events.length).toBeGreaterThan(1);
+ expect(events[0]?.type).toBe("user-message");
+ const um = events[0] as AgentEvent & { type: "user-message" };
+ expect(um.text).toBe("What is 2+2?");
+ expect(um.conversationId).toBe("conv-um-first");
+ expect(um.turnId).toMatch(/^turn-/);
+ expect(events[1]?.type).toBe("turn-start");
+ });
+
+ it("late-join replays user-message — buffer starts with user-message", async () => {
+ const store = createInMemoryStore();
+ let emitBarrierResolve: (() => void) | undefined;
+ const emitBarrier = new Promise<void>((resolve) => {
+ emitBarrierResolve = resolve;
+ });
+
+ let callIndex = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = callIndex++;
+ return (async function* () {
+ if (idx === 0) {
+ yield { type: "text-delta", delta: "Hello" } as ProviderEvent;
+ await emitBarrier;
+ yield { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ });
+
+ orchestrator.startTurn({ conversationId: "conv-um-late", text: "late prompt" });
+
+ await new Promise<void>((resolve) => setTimeout(resolve, 10));
+
+ const lateEvents: AgentEvent[] = [];
+ const unsubLate = orchestrator.subscribe("conv-um-late", (e) => lateEvents.push(e));
+
+ expect(lateEvents.length).toBeGreaterThanOrEqual(1);
+ expect(lateEvents[0]?.type).toBe("user-message");
+ const um = lateEvents[0] as AgentEvent & { type: "user-message" };
+ expect(um.text).toBe("late prompt");
+ expect(um.turnId).toMatch(/^turn-/);
+
+ emitBarrierResolve?.();
+ const sealed = waitForEvent(orchestrator, "conv-um-late", "turn-sealed");
+ await sealed;
+ unsubLate();
+ });
+
+ it("metrics unaffected — user-message does not alter TurnMetrics", async () => {
+ const store = createInMemoryStore();
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "ok" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ now: () => 1000,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-um-metrics",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const metrics = store.metricsData.get("conv-um-metrics");
+ expect(metrics).toBeDefined();
+ expect(metrics).toHaveLength(1);
+
+ const tm = metrics?.[0];
+ if (tm === undefined) throw new Error("expected metrics");
+
+ expect(tm.turnId).toMatch(/^turn-/);
+ expect(tm.usage.inputTokens).toBe(10);
+ expect(tm.usage.outputTokens).toBe(5);
+ expect(tm.steps).toHaveLength(1);
+ expect(tm.steps[0]?.usage.inputTokens).toBe(10);
+ expect(tm.steps[0]?.usage.outputTokens).toBe(5);
+ });
+});
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 0421108..d55114b 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -162,6 +162,8 @@ export function createSessionOrchestrator(
activeTurns.set(conversationId, { buffer: [], turnId });
activeConversations.add(conversationId);
+ emitToHub(conversationId, { type: "user-message", conversationId, turnId, text });
+
const effectiveCwdPromise =
cwd !== undefined
? Promise.resolve(cwd)
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index 6e00599..6a7a946 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/transport-contract",
- "version": "0.7.0",
+ "version": "0.8.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/package.json b/packages/wire/package.json
index 80671cf..f4ede19 100644
--- a/packages/wire/package.json
+++ b/packages/wire/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/wire",
- "version": "0.5.0",
+ "version": "0.6.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 52662ef..28aab87 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -213,6 +213,7 @@ export interface TurnMetrics {
export type AgentEvent =
| StatusEvent
| TurnStartEvent
+ | TurnInputEvent
| TurnTextDeltaEvent
| TurnReasoningDeltaEvent
| TurnToolCallEvent
@@ -238,6 +239,25 @@ export interface TurnStartEvent {
readonly turnId: string;
}
+/**
+ * The user prompt that opened this turn, surfaced INTO the turn's outward event
+ * stream. The user message is persisted only when the turn seals (atomically with
+ * the assistant reply), so without this event a client that is merely WATCHING a
+ * conversation (subscribed but not the sender) has no source for the prompt text
+ * mid-turn — it would see the streaming reply with no preceding user bubble until
+ * seal. Emitted once, as the FIRST event of the turn (before `turn-start`), so it
+ * is buffered and replayed to every subscriber — live and late-join — exactly like
+ * the rest of the turn. The sender already echoes its own prompt optimistically, so
+ * a consumer should de-dup against that (e.g. by text); a pure watcher renders it
+ * directly. Carries the raw prompt `text` (the same text passed to the provider).
+ */
+export interface TurnInputEvent {
+ readonly type: "user-message";
+ readonly conversationId: string;
+ readonly turnId: string;
+ readonly text: string;
+}
+
/** Incremental text content from the model during a turn. */
export interface TurnTextDeltaEvent {
readonly type: "text-delta";
diff --git a/tasks.md b/tasks.md
index b745810..f7e9330 100644
--- a/tasks.md
+++ b/tasks.md
@@ -5,7 +5,7 @@
> Keep this lean and current; do not let it re-accrete a step-by-step changelog.
## Status (current)
-`tsc -b` EXIT 0 · biome clean · **891 vitest + transport bun green**.
+`tsc -b` EXIT 0 · biome clean · **894 vitest + transport bun green**.
Built and verified live (full-fidelity: every feature is a manifest-loaded
extension through the host):
@@ -283,6 +283,23 @@ Principle enforced: **the FE is only a control interface; the AI runs independen
zero deltas though the turn ran+persisted. Caught by live-verify (unit test had subscribed
AFTER start, masking it). Fixed via the persistent-subscribers / per-turn-buffer split.
+## Turn continuity — CR-3: user prompt on the event stream (DONE)
+FE bug (multi-client): a pure watcher (subscribed, not the sender) couldn't see the USER prompt until
+seal — the user message was passed to the provider + persisted only at seal, never on the turn's
+outward stream/buffer. FE courier: `frontend-cr3-user-message-handoff.md`.
+- **Contract:** `@dispatch/wire` `0.5.0→0.6.0` — additive `TurnInputEvent`
+ `{ type:"user-message"; conversationId; turnId; text }` on the `AgentEvent` union (kernel barrels
+ re-export it). `@dispatch/transport-contract` `0.7.0→0.8.0` (re-export only). Widening broke NO
+ exhaustive switch (typecheck clean) — zero consumer fan-out.
+- **session-orchestrator:** `emitToHub({type:"user-message",…})` as the FIRST event of `runTurnDetached`
+ (before `runTurn`) → buffered + broadcast to all subscribers (live + late-join); HTTP path covered via
+ `handleMessage`'s buffer replay. Persistence + metrics unchanged. +3 tests; 3 Wave-1 tests updated
+ (user-message now precedes turn-start).
+- **LIVE-VERIFIED vs flash:** a watcher that never sent receives `user-message` (correct text) as its
+ FIRST `chat.delta`, before `turn-sealed`, then the streaming reply. `RESULT: OK`.
+- **Process note:** implemented directly by the orchestrator (user directive: "do implementations
+ yourself going forward") rather than via a summoned owner-agent.
+
## Open items
- **Context window LIMIT (next, sibling of context size):** expose the selected model's max
context-window token limit so the FE can render `contextSize / limit` (e.g. `1286 / 200000`).