diff options
| -rw-r--r-- | backend-to-fe-handoff.md | 141 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 12 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 17 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 2 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 25 |
5 files changed, 193 insertions, 4 deletions
diff --git a/backend-to-fe-handoff.md b/backend-to-fe-handoff.md new file mode 100644 index 0000000..1e9ffb2 --- /dev/null +++ b/backend-to-fe-handoff.md @@ -0,0 +1,141 @@ +# Backend → FE handoff — CR-6 resolved + full endpoint list + +> Response to `backend-handoff.md` §2 CR-6. Courier back to `../dispatch-web`. + +## CR-6: Assign seq during generation — RESOLVED + +**What changed:** The backend now persists chunks **incrementally at step +boundaries** during generation, not only at turn-seal. The user message is +persisted at turn start (before the first step), and each step's messages +(assistant + tool-results) are persisted as soon as that step completes. + +**How it works:** +1. Turn starts → user message is `append`ed immediately (gets seq numbers). +2. Each step completes → step's messages are `append`ed immediately (get seq numbers). +3. Turn seals → `turn-sealed` emitted (no batch `append` needed — already persisted). + +**What this means for the FE:** +- `GET /conversations/:id?sinceSeq=N` returns committed, seq'd chunks **during + generation**. The FE's existing `syncTail` already polls this — it will now + find new chunks as each step completes. +- The FE can adopt option (c) from the CR: fold events for the **current + in-progress step** only (streaming text, thinking dots), and `syncTail` for + **sealed steps**. The provisional state shrinks to just one step's worth of + chunks — never a trim concern. +- `turn-sealed` becomes a "refresh" signal — all chunks are already committed. + The `done` event still carries final usage + contextSize (unchanged). + +**No wire/transport-contract change needed.** `StoredChunk` already has `seq`. +`AgentEvent` types are unchanged. The FE just needs `syncTail` to find seq'd +chunks during generation (which it already does). + +**Implementation detail:** The kernel calls a new `onStepComplete` callback +(`RunTurnInput.onStepComplete`) after each step's messages are finalized. +The orchestrator persists them via `conversationStore.append`. If the callback +isn't called (e.g., test fakes), the orchestrator falls back to batch persist +after `runTurn` returns — backward compatible. + +--- + +## Full endpoint list (current as of [email protected] / [email protected]) + +### HTTP (port 24203) + +| Method | Path | Purpose | +|---|---|---| +| `POST` | `/chat` | Stream a turn (NDJSON response, `X-Conversation-Id` header) | +| `POST` | `/chat/warm` | Cache-warm probe | +| `GET` | `/models` | List available models | +| `GET` | `/conversations` | List conversations (`?q=` prefix filter, `?status=active,idle` status filter) | +| `GET` | `/conversations/:id` | Conversation history (`?sinceSeq=`, `?beforeSeq=`, `?limit=` windowing) | +| `GET` | `/conversations/:id/metrics` | Per-turn metrics (tokens, timing) | +| `GET` | `/conversations/:id/last` | Blocking last assistant message | +| `GET` | `/conversations/:id/cwd` | Per-conversation working directory | +| `PUT` | `/conversations/:id/cwd` | Set working directory | +| `GET` | `/conversations/:id/reasoning-effort` | Per-conversation reasoning effort | +| `PUT` | `/conversations/:id/reasoning-effort` | Set reasoning effort | +| `GET` | `/conversations/:id/lsp` | LSP server status | +| `GET` | `/conversations/:id/compact-threshold` | Auto-compact threshold (0=manual, null=default 350k) | +| `PUT` | `/conversations/:id/compact-threshold` | Set auto-compact threshold | +| `GET` | `/conversations/:id/title` | Read conversation title | +| `PUT` | `/conversations/:id/title` | Set conversation title | +| `POST` | `/conversations/:id/close` | Close tab (abort turn + mark `closed`) | +| `POST` | `/conversations/:id/stop` | **NEW** — Stop generation (abort turn, keep conversation `idle`) | +| `POST` | `/conversations/:id/compact` | **NEW** — Manual compaction (fork history + replace with summary) | +| `POST` | `/conversations/:id/open` | **NEW** — Signal FE to open/focus tab (broadcasts `conversation.open`) | +| `POST` | `/conversations/:id/queue` | Enqueue steering message | +| `GET` | `/health` | Health check | +| `GET` | `/metrics/throughput` | Per-model throughput samples | +| `GET` | `/*` | Static frontend serving (SPA fallback, when `DISPATCH_WEB_DIR` is set) | + +### WebSocket (port 24205) + +**Client → Server:** +| Type | Purpose | +|---|---| +| `chat.send` | Start a turn (stream events back via `chat.delta`) | +| `chat.subscribe` | Watch a conversation's turns without sending | +| `chat.unsubscribe` | Stop watching | +| `chat.queue` | Enqueue steering (fire-and-forget) | +| Surface ops | `surface.subscribe`, `surface.invoke`, etc. | + +**Server → Client (broadcasts):** +| Type | Purpose | +|---|---| +| `chat.delta` | Per-conversation event (turn-start, text-delta, tool-call, usage, done, etc.) | +| `chat.error` | Turn error | +| `conversation.open` | **NEW** — CLI `--open` flag → open/focus a tab | +| `conversation.statusChanged` | **NEW** — Lifecycle status change (`active`/`idle`/`closed`) | +| `conversation.compacted` | **NEW** — History compacted (includes `newConversationId` = archive ID) | +| Surface ops | Catalog, surface data, etc. | + +### New types the FE should consume + +```ts +// ConversationMeta ([email protected]) — now has status + compactedFrom +interface ConversationMeta { + id: string; + createdAt: number; + lastActivityAt: number; + title: string; + status: "active" | "idle" | "closed"; + compactedFrom?: string; // archive ID (pre-compaction history) +} + +// WS messages ([email protected]) +interface ConversationCompactedMessage { + type: "conversation.compacted"; + conversationId: string; + newConversationId: string; // archive ID + messagesSummarized: number; + messagesKept: number; +} + +// HTTP response types +interface CompactResponse { + conversationId: string; + newConversationId: string; // archive ID + messagesSummarized: number; + messagesKept: number; +} + +interface CompactThresholdResponse { + conversationId: string; + threshold: number; // 0 = manual; null = default 350000 +} + +interface SetCompactThresholdRequest { + threshold: number; +} +``` + +### FE handoff docs (in the backend repo) + +| File | Feature | +|---|---| +| `frontend-conversation-lifecycle-handoff.md` | Tab persistence (active/idle/closed) | +| `frontend-compaction-handoff.md` | Compacting (non-destructive, chained archives) | +| `frontend-stop-generation-handoff.md` | Stop generation mid-turn | +| `frontend-conversation-list-handoff.md` | Conversation list + title + open tab | +| `frontend-conversation-open-handoff.md` | CLI `--open` → `conversation.open` WS message | +| `frontend-cache-rate-handoff.md` | Cache hit/miss calculation (updated for providers that don't report cache) | diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index c449a68..02fc446 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -117,6 +117,18 @@ export interface RunTurnInput { * what to do with any pending messages after the turn ends). */ readonly drainSteering?: () => readonly ChatMessage[]; + + /** + * Optional. Called by the runtime after each step's messages are finalized + * (the assistant message + tool-result messages are built). The caller can + * use this to persist step messages incrementally — assigning seq numbers + * during generation so consumers can `GET /conversations/:id?sinceSeq=N` + * mid-turn. When omitted, the caller must persist all messages at turn end + * (via `RunTurnResult.messages`). The messages passed to this callback are + * the SAME objects in `RunTurnResult.messages` — the caller must NOT + * double-persist them. + */ + readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void; } /** diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 228ef8a..4a12e28 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -526,6 +526,23 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { resultMessages.push(msg); } + // Incremental persistence: notify the caller that this step's + // messages are finalized. The caller can persist them immediately + // (assigning seq numbers during generation). The messages are the + // SAME objects in resultMessages — the caller must NOT double-persist. + if (input.onStepComplete !== undefined) { + const stepMessages: ChatMessage[] = []; + if (stepResult.assistantMessage !== undefined) { + stepMessages.push(stepResult.assistantMessage); + } + for (const msg of stepResult.toolMessages) { + stepMessages.push(msg); + } + if (stepMessages.length > 0) { + await input.onStepComplete(stepMessages); + } + } + if (signal.aborted) { finishReason = "aborted"; break; diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index d5585c2..012ac5c 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -574,7 +574,7 @@ describe("turn-sealed event", () => { }, }); - expect(ordering).toEqual(["append", "appendMetrics", "turn-sealed"]); + expect(ordering).toEqual(["append", "append", "appendMetrics", "turn-sealed"]); }); it("does not emit turn-sealed when append throws — emits error event instead", async () => { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index bc9e78b..22a5c11 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -457,10 +457,29 @@ export function createSessionOrchestrator( ...(drainSteering !== undefined ? { drainSteering } : {}), }; - const result = await deps.runTurn(opts); + // Persist the user message at turn start so it has a seq + // number before the first step generates. This enables the + // FE to syncTail during generation (CR-6). + await deps.conversationStore.append(conversationId, [userMsg]); + + let stepsPersisted = false; + const result = await deps.runTurn({ + ...opts, + // Incremental persistence: persist each step's messages + // as they are finalized. Seq numbers are assigned during + // generation, so the FE can GET /conversations/:id?sinceSeq=N + // mid-turn and pick up committed chunks (CR-6). + onStepComplete: async (stepMessages) => { + await deps.conversationStore.append(conversationId, stepMessages); + stepsPersisted = true; + }, + }); - const toPersist: ChatMessage[] = [userMsg, ...result.messages]; - await deps.conversationStore.append(conversationId, toPersist); + // Fallback: if onStepComplete was never called (e.g., a fake + // runTurn in tests), persist all result messages as a batch. + if (!stepsPersisted && result.messages.length > 0) { + await deps.conversationStore.append(conversationId, result.messages); + } const turnMetrics = metrics.build(turnId); await deps.conversationStore.appendMetrics(conversationId, turnMetrics); |
