diff options
| author | Adam Malczewski <[email protected]> | 2026-06-22 13:52:42 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-22 13:52:42 +0900 |
| commit | 519b79999d49156bbfaecc91a2d882fba2475fef (patch) | |
| tree | 35fb9a7e0a7b1b1f4377b83ca89282bdb78f3398 /packages | |
| parent | 20db60b0705ab65b6ade67ff614d347e13dc9803 (diff) | |
| download | dispatch-519b79999d49156bbfaecc91a2d882fba2475fef.tar.gz dispatch-519b79999d49156bbfaecc91a2d882fba2475fef.zip | |
feat: incremental seq assignment during generation (CR-6)
The backend now persists chunks at step boundaries during generation,
not only at turn-seal. This enables the FE to syncTail mid-turn and
pick up committed, seq'd chunks (eliminating the provisional state).
Changes:
- RunTurnInput: add onStepComplete callback (kernel contract)
- runTurn: call onStepComplete after each step's messages are finalized
- Orchestrator: persist userMsg at turn start + each step's messages
via onStepComplete. Falls back to batch persist if callback isn't
called (backward compatible with test fakes).
The user message gets seq numbers before the first step generates.
Each step's assistant + tool messages get seq numbers as they complete.
The FE's existing syncTail (?sinceSeq=N) picks them up during generation.
Also adds backend-to-fe-handoff.md with CR-6 response + full endpoint list.
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 12 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 17 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 2 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 25 |
4 files changed, 52 insertions, 4 deletions
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index c449a68..02fc446 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -117,6 +117,18 @@ export interface RunTurnInput { * what to do with any pending messages after the turn ends). */ readonly drainSteering?: () => readonly ChatMessage[]; + + /** + * Optional. Called by the runtime after each step's messages are finalized + * (the assistant message + tool-result messages are built). The caller can + * use this to persist step messages incrementally — assigning seq numbers + * during generation so consumers can `GET /conversations/:id?sinceSeq=N` + * mid-turn. When omitted, the caller must persist all messages at turn end + * (via `RunTurnResult.messages`). The messages passed to this callback are + * the SAME objects in `RunTurnResult.messages` — the caller must NOT + * double-persist them. + */ + readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void; } /** diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 228ef8a..4a12e28 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -526,6 +526,23 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { resultMessages.push(msg); } + // Incremental persistence: notify the caller that this step's + // messages are finalized. The caller can persist them immediately + // (assigning seq numbers during generation). The messages are the + // SAME objects in resultMessages — the caller must NOT double-persist. + if (input.onStepComplete !== undefined) { + const stepMessages: ChatMessage[] = []; + if (stepResult.assistantMessage !== undefined) { + stepMessages.push(stepResult.assistantMessage); + } + for (const msg of stepResult.toolMessages) { + stepMessages.push(msg); + } + if (stepMessages.length > 0) { + await input.onStepComplete(stepMessages); + } + } + if (signal.aborted) { finishReason = "aborted"; break; diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index d5585c2..012ac5c 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -574,7 +574,7 @@ describe("turn-sealed event", () => { }, }); - expect(ordering).toEqual(["append", "appendMetrics", "turn-sealed"]); + expect(ordering).toEqual(["append", "append", "appendMetrics", "turn-sealed"]); }); it("does not emit turn-sealed when append throws — emits error event instead", async () => { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index bc9e78b..22a5c11 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -457,10 +457,29 @@ export function createSessionOrchestrator( ...(drainSteering !== undefined ? { drainSteering } : {}), }; - const result = await deps.runTurn(opts); + // Persist the user message at turn start so it has a seq + // number before the first step generates. This enables the + // FE to syncTail during generation (CR-6). + await deps.conversationStore.append(conversationId, [userMsg]); + + let stepsPersisted = false; + const result = await deps.runTurn({ + ...opts, + // Incremental persistence: persist each step's messages + // as they are finalized. Seq numbers are assigned during + // generation, so the FE can GET /conversations/:id?sinceSeq=N + // mid-turn and pick up committed chunks (CR-6). + onStepComplete: async (stepMessages) => { + await deps.conversationStore.append(conversationId, stepMessages); + stepsPersisted = true; + }, + }); - const toPersist: ChatMessage[] = [userMsg, ...result.messages]; - await deps.conversationStore.append(conversationId, toPersist); + // Fallback: if onStepComplete was never called (e.g., a fake + // runTurn in tests), persist all result messages as a batch. + if (!stepsPersisted && result.messages.length > 0) { + await deps.conversationStore.append(conversationId, result.messages); + } const turnMetrics = metrics.build(turnId); await deps.conversationStore.appendMetrics(conversationId, turnMetrics); |
