summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-12 14:58:33 +0900
committerAdam Malczewski <[email protected]>2026-06-12 14:58:33 +0900
commitb3d270803f95db2467e20bb742aa42faf6867f91 (patch)
tree6abac5ec8ef76bcb95a4b9e93891cd3c87f451a6 /packages
parent86b5137c4f7f2bcc08f0580f1edaa05d14015e63 (diff)
downloaddispatch-b3d270803f95db2467e20bb742aa42faf6867f91.tar.gz
dispatch-b3d270803f95db2467e20bb742aa42faf6867f91.zip
fix(turns): emit user prompt on the turn event stream (CR-3)
A pure watcher (subscribed but not the sender) couldn't see the user prompt until the turn sealed: the user message was only persisted at seal and never entered the live/replayable stream. Add an additive TurnInputEvent {type:"user-message", conversationId, turnId, text} to the AgentEvent union and emit it via the broadcast/buffer path as the first event of every turn, so it is replayed to all subscribers (live + late-join) and on the HTTP path. Persistence and metrics unchanged; the union widening breaks no exhaustive switch. - @dispatch/wire 0.5.0->0.6.0; @dispatch/transport-contract 0.7.0->0.8.0 (re-export) - session-orchestrator: emit user-message at runTurnDetached start; +3 tests, 3 Wave-1 tests updated (user-message precedes turn-start) - FE courier: frontend-cr3-user-message-handoff.md Live-verified vs flash: watcher receives user-message (correct text) as its first chat.delta before turn-sealed. 894 vitest + transport bun green; tsc -b EXIT 0.
Diffstat (limited to 'packages')
-rw-r--r--packages/kernel/src/contracts/events.ts1
-rw-r--r--packages/kernel/src/contracts/index.ts1
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts152
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts2
-rw-r--r--packages/transport-contract/package.json2
-rw-r--r--packages/wire/package.json2
-rw-r--r--packages/wire/src/index.ts20
7 files changed, 175 insertions, 5 deletions
diff --git a/packages/kernel/src/contracts/events.ts b/packages/kernel/src/contracts/events.ts
index be09066..b1385a2 100644
--- a/packages/kernel/src/contracts/events.ts
+++ b/packages/kernel/src/contracts/events.ts
@@ -10,6 +10,7 @@ export type {
StatusEvent,
TurnDoneEvent,
TurnErrorEvent,
+ TurnInputEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index b5802f3..10025e2 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -34,6 +34,7 @@ export type {
StatusEvent,
TurnDoneEvent,
TurnErrorEvent,
+ TurnInputEvent,
TurnReasoningDeltaEvent,
TurnSealedEvent,
TurnStartEvent,
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index efa5d4e..c3fbbc8 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -1646,7 +1646,8 @@ describe("detached turn hub", () => {
expect(events.length).toBeGreaterThan(0);
const types = events.map((e) => e.type);
- expect(types[0]).toBe("turn-start");
+ expect(types[0]).toBe("user-message");
+ expect(types[1]).toBe("turn-start");
expect(types).toContain("text-delta");
expect(types[types.length - 1]).toBe("turn-sealed");
@@ -1690,7 +1691,8 @@ describe("detached turn hub", () => {
expect(eventsA).toEqual(eventsB);
const types = eventsA.map((e) => e.type);
- expect(types[0]).toBe("turn-start");
+ expect(types[0]).toBe("user-message");
+ expect(types[1]).toBe("turn-start");
expect(types[types.length - 1]).toBe("turn-sealed");
});
@@ -1946,7 +1948,8 @@ describe("detached turn hub", () => {
});
const types = events.map((e) => e.type);
- expect(types[0]).toBe("turn-start");
+ expect(types[0]).toBe("user-message");
+ expect(types[1]).toBe("turn-start");
expect(types).toContain("text-delta");
expect(types[types.length - 1]).toBe("turn-sealed");
@@ -2016,3 +2019,146 @@ describe("detached turn hub", () => {
expect(firstEvents.some((e) => e.type === "turn-sealed")).toBe(true);
});
});
+
+describe("user-message event", () => {
+ function waitForEvent(
+ orchestrator: ReturnType<typeof createSessionOrchestrator>["orchestrator"],
+ conversationId: string,
+ eventType: string,
+ ): Promise<AgentEvent> {
+ return new Promise((resolve) => {
+ const unsub = orchestrator.subscribe(conversationId, (event) => {
+ if (event.type === eventType) {
+ unsub();
+ resolve(event);
+ }
+ });
+ });
+ }
+
+ it("emits user-message first — pre-subscriber receives user-message before turn-start", async () => {
+ const store = createInMemoryStore();
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "Hello" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ });
+
+ const events: AgentEvent[] = [];
+ const unsub = orchestrator.subscribe("conv-um-first", (e) => events.push(e));
+
+ orchestrator.startTurn({ conversationId: "conv-um-first", text: "What is 2+2?" });
+
+ const sealed = waitForEvent(orchestrator, "conv-um-first", "turn-sealed");
+ await sealed;
+ unsub();
+
+ expect(events.length).toBeGreaterThan(1);
+ expect(events[0]?.type).toBe("user-message");
+ const um = events[0] as AgentEvent & { type: "user-message" };
+ expect(um.text).toBe("What is 2+2?");
+ expect(um.conversationId).toBe("conv-um-first");
+ expect(um.turnId).toMatch(/^turn-/);
+ expect(events[1]?.type).toBe("turn-start");
+ });
+
+ it("late-join replays user-message — buffer starts with user-message", async () => {
+ const store = createInMemoryStore();
+ let emitBarrierResolve: (() => void) | undefined;
+ const emitBarrier = new Promise<void>((resolve) => {
+ emitBarrierResolve = resolve;
+ });
+
+ let callIndex = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = callIndex++;
+ return (async function* () {
+ if (idx === 0) {
+ yield { type: "text-delta", delta: "Hello" } as ProviderEvent;
+ await emitBarrier;
+ yield { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ });
+
+ orchestrator.startTurn({ conversationId: "conv-um-late", text: "late prompt" });
+
+ await new Promise<void>((resolve) => setTimeout(resolve, 10));
+
+ const lateEvents: AgentEvent[] = [];
+ const unsubLate = orchestrator.subscribe("conv-um-late", (e) => lateEvents.push(e));
+
+ expect(lateEvents.length).toBeGreaterThanOrEqual(1);
+ expect(lateEvents[0]?.type).toBe("user-message");
+ const um = lateEvents[0] as AgentEvent & { type: "user-message" };
+ expect(um.text).toBe("late prompt");
+ expect(um.turnId).toMatch(/^turn-/);
+
+ emitBarrierResolve?.();
+ const sealed = waitForEvent(orchestrator, "conv-um-late", "turn-sealed");
+ await sealed;
+ unsubLate();
+ });
+
+ it("metrics unaffected — user-message does not alter TurnMetrics", async () => {
+ const store = createInMemoryStore();
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "ok" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { orchestrator } = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ applyToolsFilter: identityApplyToolsFilter,
+ runTurn,
+ now: () => 1000,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-um-metrics",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const metrics = store.metricsData.get("conv-um-metrics");
+ expect(metrics).toBeDefined();
+ expect(metrics).toHaveLength(1);
+
+ const tm = metrics?.[0];
+ if (tm === undefined) throw new Error("expected metrics");
+
+ expect(tm.turnId).toMatch(/^turn-/);
+ expect(tm.usage.inputTokens).toBe(10);
+ expect(tm.usage.outputTokens).toBe(5);
+ expect(tm.steps).toHaveLength(1);
+ expect(tm.steps[0]?.usage.inputTokens).toBe(10);
+ expect(tm.steps[0]?.usage.outputTokens).toBe(5);
+ });
+});
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 0421108..d55114b 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -162,6 +162,8 @@ export function createSessionOrchestrator(
activeTurns.set(conversationId, { buffer: [], turnId });
activeConversations.add(conversationId);
+ emitToHub(conversationId, { type: "user-message", conversationId, turnId, text });
+
const effectiveCwdPromise =
cwd !== undefined
? Promise.resolve(cwd)
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index 6e00599..6a7a946 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/transport-contract",
- "version": "0.7.0",
+ "version": "0.8.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/package.json b/packages/wire/package.json
index 80671cf..f4ede19 100644
--- a/packages/wire/package.json
+++ b/packages/wire/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/wire",
- "version": "0.5.0",
+ "version": "0.6.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index 52662ef..28aab87 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -213,6 +213,7 @@ export interface TurnMetrics {
export type AgentEvent =
| StatusEvent
| TurnStartEvent
+ | TurnInputEvent
| TurnTextDeltaEvent
| TurnReasoningDeltaEvent
| TurnToolCallEvent
@@ -238,6 +239,25 @@ export interface TurnStartEvent {
readonly turnId: string;
}
+/**
+ * The user prompt that opened this turn, surfaced INTO the turn's outward event
+ * stream. The user message is persisted only when the turn seals (atomically with
+ * the assistant reply), so without this event a client that is merely WATCHING a
+ * conversation (subscribed but not the sender) has no source for the prompt text
+ * mid-turn — it would see the streaming reply with no preceding user bubble until
+ * seal. Emitted once, as the FIRST event of the turn (before `turn-start`), so it
+ * is buffered and replayed to every subscriber — live and late-join — exactly like
+ * the rest of the turn. The sender already echoes its own prompt optimistically, so
+ * a consumer should de-dup against that (e.g. by text); a pure watcher renders it
+ * directly. Carries the raw prompt `text` (the same text passed to the provider).
+ */
+export interface TurnInputEvent {
+ readonly type: "user-message";
+ readonly conversationId: string;
+ readonly turnId: string;
+ readonly text: string;
+}
+
/** Incremental text content from the model during a turn. */
export interface TurnTextDeltaEvent {
readonly type: "text-delta";