summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
Diffstat (limited to 'packages')
-rw-r--r--packages/kernel/src/contracts/runtime.ts12
-rw-r--r--packages/kernel/src/runtime/run-turn.ts17
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts2
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts25
4 files changed, 52 insertions, 4 deletions
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts
index c449a68..02fc446 100644
--- a/packages/kernel/src/contracts/runtime.ts
+++ b/packages/kernel/src/contracts/runtime.ts
@@ -117,6 +117,18 @@ export interface RunTurnInput {
* what to do with any pending messages after the turn ends).
*/
readonly drainSteering?: () => readonly ChatMessage[];
+
+ /**
+ * Optional. Called by the runtime after each step's messages are finalized
+ * (the assistant message + tool-result messages are built). The caller can
+ * use this to persist step messages incrementally — assigning seq numbers
+ * during generation so consumers can `GET /conversations/:id?sinceSeq=N`
+ * mid-turn. When omitted, the caller must persist all messages at turn end
+ * (via `RunTurnResult.messages`). The messages passed to this callback are
+ * the SAME objects in `RunTurnResult.messages` — the caller must NOT
+ * double-persist them.
+ */
+ readonly onStepComplete?: (messages: readonly ChatMessage[]) => Promise<void> | void;
}
/**
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 228ef8a..4a12e28 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -526,6 +526,23 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
resultMessages.push(msg);
}
+ // Incremental persistence: notify the caller that this step's
+ // messages are finalized. The caller can persist them immediately
+ // (assigning seq numbers during generation). The messages are the
+ // SAME objects in resultMessages — the caller must NOT double-persist.
+ if (input.onStepComplete !== undefined) {
+ const stepMessages: ChatMessage[] = [];
+ if (stepResult.assistantMessage !== undefined) {
+ stepMessages.push(stepResult.assistantMessage);
+ }
+ for (const msg of stepResult.toolMessages) {
+ stepMessages.push(msg);
+ }
+ if (stepMessages.length > 0) {
+ await input.onStepComplete(stepMessages);
+ }
+ }
+
if (signal.aborted) {
finishReason = "aborted";
break;
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index d5585c2..012ac5c 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -574,7 +574,7 @@ describe("turn-sealed event", () => {
},
});
- expect(ordering).toEqual(["append", "appendMetrics", "turn-sealed"]);
+ expect(ordering).toEqual(["append", "append", "appendMetrics", "turn-sealed"]);
});
it("does not emit turn-sealed when append throws — emits error event instead", async () => {
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index bc9e78b..22a5c11 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -457,10 +457,29 @@ export function createSessionOrchestrator(
...(drainSteering !== undefined ? { drainSteering } : {}),
};
- const result = await deps.runTurn(opts);
+ // Persist the user message at turn start so it has a seq
+ // number before the first step generates. This enables the
+ // FE to syncTail during generation (CR-6).
+ await deps.conversationStore.append(conversationId, [userMsg]);
+
+ let stepsPersisted = false;
+ const result = await deps.runTurn({
+ ...opts,
+ // Incremental persistence: persist each step's messages
+ // as they are finalized. Seq numbers are assigned during
+ // generation, so the FE can GET /conversations/:id?sinceSeq=N
+ // mid-turn and pick up committed chunks (CR-6).
+ onStepComplete: async (stepMessages) => {
+ await deps.conversationStore.append(conversationId, stepMessages);
+ stepsPersisted = true;
+ },
+ });
- const toPersist: ChatMessage[] = [userMsg, ...result.messages];
- await deps.conversationStore.append(conversationId, toPersist);
+ // Fallback: if onStepComplete was never called (e.g., a fake
+ // runTurn in tests), persist all result messages as a batch.
+ if (!stepsPersisted && result.messages.length > 0) {
+ await deps.conversationStore.append(conversationId, result.messages);
+ }
const turnMetrics = metrics.build(turnId);
await deps.conversationStore.appendMetrics(conversationId, turnMetrics);