diff options
| author | Adam Malczewski <[email protected]> | 2026-06-22 13:52:42 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-22 13:52:42 +0900 |
| commit | 519b79999d49156bbfaecc91a2d882fba2475fef (patch) | |
| tree | 35fb9a7e0a7b1b1f4377b83ca89282bdb78f3398 | |
| parent | 20db60b0705ab65b6ade67ff614d347e13dc9803 (diff) | |
| download | dispatch-519b79999d49156bbfaecc91a2d882fba2475fef.tar.gz dispatch-519b79999d49156bbfaecc91a2d882fba2475fef.zip | |
feat: incremental seq assignment during generation (CR-6)
The backend now persists chunks at step boundaries during generation,
not only at turn-seal. This enables the FE to syncTail mid-turn and
pick up committed, seq'd chunks (eliminating the provisional state).
Changes:
- RunTurnInput: add onStepComplete callback (kernel contract)
- runTurn: call onStepComplete after each step's messages are finalized
- Orchestrator: persist userMsg at turn start + each step's messages
via onStepComplete. Falls back to batch persist if callback isn't
called (backward compatible with test fakes).
The user message gets seq numbers before the first step generates.
Each step's assistant + tool messages get seq numbers as they complete.
The FE's existing syncTail (?sinceSeq=N) picks them up during generation.
Also adds backend-to-fe-handoff.md with CR-6 response + full endpoint list.
| -rw-r--r-- | backend-to-fe-handoff.md | 141 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 12 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 17 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 2 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 25 |
5 files changed, 193 insertions, 4 deletions
diff --git a/backend-to-fe-handoff.md b/backend-to-fe-handoff.md new file mode 100644 index 0000000..1e9ffb2 --- /dev/null +++ b/backend-to-fe-handoff.md @@ -0,0 +1,141 @@ +# Backend → FE handoff — CR-6 resolved + full endpoint list + +> Response to `backend-handoff.md` §2 CR-6. Courier back to `../dispatch-web`. + +## CR-6: Assign seq during generation — RESOLVED + +**What changed:** The backend now persists chunks **incrementally at step +boundaries** during generation, not only at turn-seal. The user message is +persisted at turn start (before the first step), and each step's messages +(assistant + tool-results) are persisted as soon as that step completes. + +**How it works:** +1. Turn starts → user message is `append`ed immediately (gets seq numbers). +2. Each step completes → step's messages are `append`ed immediately (get seq numbers). +3. Turn seals → `turn-sealed` emitted (no batch `append` needed — already persisted). + +**What this means for the FE:** +- `GET /conversations/:id?sinceSeq=N` returns committed, seq'd chunks **during + generation**. The FE's existing `syncTail` already polls this — it will now + find new chunks as each step completes. +- The FE can adopt option (c) from the CR: fold events for the **current + in-progress step** only (streaming text, thinking dots), and `syncTail` for + **sealed steps**. The provisional state shrinks to just one step's worth of + chunks — never a trim concern. +- `turn-sealed` becomes a "refresh" signal — all chunks are already committed. + The `done` event still carries final usage + contextSize (unchanged). + +**No wire/transport-contract change needed.** `StoredChunk` already has `seq`. +`AgentEvent` types are unchanged. The FE just needs `syncTail` to find seq'd +chunks during generation (which it already does). + +**Implementation detail:** The kernel calls a new `onStepComplete` callback +(`RunTurnInput.onStepComplete`) after each step's messages are finalized. +The orchestrator persists them via `conversationStore.append`. If the callback +isn't called (e.g., test fakes), the orchestrator falls back to batch persist +after `runTurn` returns — backward compatible. + +--- + +## Full endpoint list (current as of [email protected] / [email protected]) + +### HTTP (port 24203) + +| Method | Path | Purpose | +|---|---|---| +| `POST` | `/chat` | Stream a turn (NDJSON response, `X-Conversation-Id` header) | +| `POST` | `/chat/warm` | Cache-warm probe | +| `GET` | `/models` | List available models | +| `GET` | `/conversations` | List conversations (`?q=` prefix filter, `?status=active,idle` status filter) | +| `GET` | `/conversations/:id` | Conversation history (`?sinceSeq=`, `?beforeSeq=`, `?limit=` windowing) | +| `GET` | `/conversations/:id/metrics` | Per-turn metrics (tokens, timing) | +| `GET` | `/conversations/:id/last` | Blocking last assistant message | +| `GET` | `/conversations/:id/cwd` | Per-conversation working directory | +| `PUT` | `/conversations/:id/cwd` | Set working directory | +| `GET` | `/conversations/:id/reasoning-effort` | Per-conversation reasoning effort | +| `PUT` | `/conversations/:id/reasoning-effort` | Set reasoning effort | +| `GET` | `/conversations/:id/lsp` | LSP server status | +| `GET` | `/conversations/:id/compact-threshold` | Auto-compact threshold (0=manual, null=default 350k) | +| `PUT` | `/conversations/:id/compact-threshold` | Set auto-compact threshold | +| `GET` | `/conversations/:id/title` | Read conversation title | +| `PUT` | `/conversations/:id/title` | Set conversation title | +| `POST` | `/conversations/:id/close` | Close tab (abort turn + mark `closed`) | +| `POST` | `/conversations/:id/stop` | **NEW** — Stop generation (abort turn, keep conversation `idle`) | +| `POST` | `/conversations/:id/compact` | **NEW** — Manual compaction (fork history + replace with summary) | +| `POST` | `/conversations/:id/open` | **NEW** — Signal FE to open/focus tab (broadcasts `conversation.open`) | +| `POST` | `/conversations/:id/queue` | Enqueue steering message | +| `GET` | `/health` | Health check | +| `GET` | `/metrics/throughput` | Per-model throughput samples | +| `GET` | `/*` | Static frontend serving (SPA fallback, when `DISPATCH_WEB_DIR` is set) | + +### WebSocket (port 24205) + +**Client → Server:** +| Type | Purpose | +|---|---| +| `chat.send` | Start a turn (stream events back via `chat.delta`) | +| `chat.subscribe` | Watch a conversation's turns without sending | +| `chat.unsubscribe` | Stop watching | +| `chat.queue` | Enqueue steering (fire-and-forget) | +| Surface ops | `surface.subscribe`, `surface.invoke`, etc. | + +**Server → Client (broadcasts):** +| Type | Purpose | +|---|---| +| `chat.delta` | Per-conversation event (turn-start, text-delta, tool-call, usage, done, etc.) | +| `chat.error` | Turn error | +| `conversation.open` | **NEW** — CLI `--open` flag → open/focus a tab | +| `conversation.statusChanged` | **NEW** — Lifecycle status change (`active`/`idle`/`closed`) | +| `conversation.compacted` | **NEW** — History compacted (includes `newConversationId` = archive ID) | +| Surface ops | Catalog, surface data, etc. | + +### New types the FE should consume + +```ts +// ConversationMeta ([email protected]) — now has status + compactedFrom +interface ConversationMeta { + id: string; + createdAt: number; + lastActivityAt: number; + title: string; + status: "active" | "idle" | "closed"; + compactedFrom?: string; // archive ID (pre-compaction history) +} + +// WS messages ([email protected]) +interface ConversationCompactedMessage { + type: "conversation.compacted"; + conversationId: string; + newConversationId: string; // archive ID + messagesSummarized: number; + messagesKept: number; +} + +// HTTP response types +interface CompactResponse { + conversationId: string; + newConversationId: string; // archive ID + messagesSummarized: number; + messagesKept: number; +} + +interface CompactThresholdResponse { + conversationId: string; + threshold: number; // 0 = manual; null = default 350000 +} + +interface SetCompactThresholdRequest { + threshold: number; +} +``` + +### FE handoff docs (in the backend repo) + +| File | Feature | +|---|---| +| `frontend-conversation-lifecycle-handoff.md` | Tab persistence (active/idle/closed) | +| `frontend-compaction-handoff.md` | Compacting (non-destructive, chained archives) | +| `frontend-stop-generation-handoff.md` | Stop generation mid-turn | +| `frontend-conversation-list-handoff.md` | Conversation list + title + open tab | +| `frontend-conversation-open-handoff.md` | CLI `--open` → `conversation.open` WS message | +| `frontend-cache-rate-handoff.md` | Cache hit/miss calculation (updated for providers that don't report cache) | diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index c449a68..02fc446 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -117,6 +117,18 @@ export interface RunTurnInput { * what to do with any pending messages after the turn ends). */ readonly drainSteering?: () => readonly ChatMessage[]; + + /** + * Optional. Called by the runtime after each step's messages are finalized + * (the assistant message + tool-result messages are built). The caller can + * use this to persist step messages incrementally — assigning seq numbers + * during generation so consumers can `GET /conversations/:id?sinceSeq=N` + * mid-turn. When omitted, the caller must persist all messages at turn end + * (via `RunTurnResult.messages`). The messages passed to this callback are + * the SAME objects in `RunTurnResult.messages` — the caller must NOT + * double-persist them. + */ + readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void; } /** diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 228ef8a..4a12e28 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -526,6 +526,23 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { resultMessages.push(msg); } + // Incremental persistence: notify the caller that this step's + // messages are finalized. The caller can persist them immediately + // (assigning seq numbers during generation). The messages are the + // SAME objects in resultMessages — the caller must NOT double-persist. + if (input.onStepComplete !== undefined) { + const stepMessages: ChatMessage[] = []; + if (stepResult.assistantMessage !== undefined) { + stepMessages.push(stepResult.assistantMessage); + } + for (const msg of stepResult.toolMessages) { + stepMessages.push(msg); + } + if (stepMessages.length > 0) { + await input.onStepComplete(stepMessages); + } + } + if (signal.aborted) { finishReason = "aborted"; break; diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index d5585c2..012ac5c 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -574,7 +574,7 @@ describe("turn-sealed event", () => { }, }); - expect(ordering).toEqual(["append", "appendMetrics", "turn-sealed"]); + expect(ordering).toEqual(["append", "append", "appendMetrics", "turn-sealed"]); }); it("does not emit turn-sealed when append throws — emits error event instead", async () => { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index bc9e78b..22a5c11 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -457,10 +457,29 @@ export function createSessionOrchestrator( ...(drainSteering !== undefined ? { drainSteering } : {}), }; - const result = await deps.runTurn(opts); + // Persist the user message at turn start so it has a seq + // number before the first step generates. This enables the + // FE to syncTail during generation (CR-6). + await deps.conversationStore.append(conversationId, [userMsg]); + + let stepsPersisted = false; + const result = await deps.runTurn({ + ...opts, + // Incremental persistence: persist each step's messages + // as they are finalized. Seq numbers are assigned during + // generation, so the FE can GET /conversations/:id?sinceSeq=N + // mid-turn and pick up committed chunks (CR-6). + onStepComplete: async (stepMessages) => { + await deps.conversationStore.append(conversationId, stepMessages); + stepsPersisted = true; + }, + }); - const toPersist: ChatMessage[] = [userMsg, ...result.messages]; - await deps.conversationStore.append(conversationId, toPersist); + // Fallback: if onStepComplete was never called (e.g., a fake + // runTurn in tests), persist all result messages as a batch. + if (!stepsPersisted && result.messages.length > 0) { + await deps.conversationStore.append(conversationId, result.messages); + } const turnMetrics = metrics.build(turnId); await deps.conversationStore.appendMetrics(conversationId, turnMetrics); |
