diff options
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/kernel/src/contracts/runtime.ts | 12 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 17 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 2 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 25 |
4 files changed, 52 insertions, 4 deletions
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index c449a68..02fc446 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -117,6 +117,18 @@ export interface RunTurnInput { * what to do with any pending messages after the turn ends). */ readonly drainSteering?: () => readonly ChatMessage[]; + + /** + * Optional. Called by the runtime after each step's messages are finalized + * (the assistant message + tool-result messages are built). The caller can + * use this to persist step messages incrementally — assigning seq numbers + * during generation so consumers can `GET /conversations/:id?sinceSeq=N` + * mid-turn. When omitted, the caller must persist all messages at turn end + * (via `RunTurnResult.messages`). The messages passed to this callback are + * the SAME objects in `RunTurnResult.messages` — the caller must NOT + * double-persist them. + */ + readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void; } /** diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 228ef8a..4a12e28 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -526,6 +526,23 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { resultMessages.push(msg); } + // Incremental persistence: notify the caller that this step's + // messages are finalized. The caller can persist them immediately + // (assigning seq numbers during generation). The messages are the + // SAME objects in resultMessages — the caller must NOT double-persist. + if (input.onStepComplete !== undefined) { + const stepMessages: ChatMessage[] = []; + if (stepResult.assistantMessage !== undefined) { + stepMessages.push(stepResult.assistantMessage); + } + for (const msg of stepResult.toolMessages) { + stepMessages.push(msg); + } + if (stepMessages.length > 0) { + await input.onStepComplete(stepMessages); + } + } + if (signal.aborted) { finishReason = "aborted"; break; diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index d5585c2..012ac5c 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -574,7 +574,7 @@ describe("turn-sealed event", () => { }, }); - expect(ordering).toEqual(["append", "appendMetrics", "turn-sealed"]); + expect(ordering).toEqual(["append", "append", "appendMetrics", "turn-sealed"]); }); it("does not emit turn-sealed when append throws — emits error event instead", async () => { diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index bc9e78b..22a5c11 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -457,10 +457,29 @@ export function createSessionOrchestrator( ...(drainSteering !== undefined ? { drainSteering } : {}), }; - const result = await deps.runTurn(opts); + // Persist the user message at turn start so it has a seq + // number before the first step generates. This enables the + // FE to syncTail during generation (CR-6). + await deps.conversationStore.append(conversationId, [userMsg]); + + let stepsPersisted = false; + const result = await deps.runTurn({ + ...opts, + // Incremental persistence: persist each step's messages + // as they are finalized. Seq numbers are assigned during + // generation, so the FE can GET /conversations/:id?sinceSeq=N + // mid-turn and pick up committed chunks (CR-6). + onStepComplete: async (stepMessages) => { + await deps.conversationStore.append(conversationId, stepMessages); + stepsPersisted = true; + }, + }); - const toPersist: ChatMessage[] = [userMsg, ...result.messages]; - await deps.conversationStore.append(conversationId, toPersist); + // Fallback: if onStepComplete was never called (e.g., a fake + // runTurn in tests), persist all result messages as a batch. + if (!stepsPersisted && result.messages.length > 0) { + await deps.conversationStore.append(conversationId, result.messages); + } const turnMetrics = metrics.build(turnId); await deps.conversationStore.appendMetrics(conversationId, turnMetrics); |
