diff options
| author | Adam Malczewski <[email protected]> | 2026-06-22 01:09:26 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-22 01:09:26 +0900 |
| commit | 5af664777bd64cddd168679d6369cd188212201a (patch) | |
| tree | 8e72022183b8d72eccdb568de5776d74cd91f1aa | |
| parent | 28154825fb47248be21a0d64fc36492fb01c9a42 (diff) | |
| download | dispatch-5af664777bd64cddd168679d6369cd188212201a.tar.gz dispatch-5af664777bd64cddd168679d6369cd188212201a.zip | |
feat: non-destructive compaction — fork history to archive before replacing
Compaction now preserves the full pre-compaction history:
1. Forks the conversation to a new archive ID (complete copy: chunks,
metadata, cwd, reasoning-effort). Archive gets status=closed,
title='Archive: <original>', compactedFrom=<originalId>.
2. Replaces the original conversation's history with [system: summary]
+ recent N messages (same as before).
3. Sets compactedFrom=<archiveId> on the original conversation's metadata.
The original history is never destroyed. The archive is accessible via
GET /conversations/:id using the archive ID.
Wire/contract changes:
- ConversationMeta: add compactedFrom?: string
- CompactionResult: add archiveId: string
- ConversationCompactedMessage: add archiveId
- CompactResponse: add archiveId
Conversation store:
- forkHistory(sourceId, targetId): copies all chunks + metadata to a
new conversation ID
- setCompactedFrom(conversationId, archiveId): marks the conversation
| -rw-r--r-- | frontend-compaction-handoff.md | 26 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.ts | 76 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 8 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 9 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/queue.test.ts | 2 | ||||
| -rw-r--r-- | packages/transport-contract/src/index.ts | 2 | ||||
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 10 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 1 | ||||
| -rw-r--r-- | packages/transport-http/src/server.bun.test.ts | 2 | ||||
| -rw-r--r-- | packages/transport-ws/src/extension.ts | 20 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 9 |
11 files changed, 151 insertions, 14 deletions
diff --git a/frontend-compaction-handoff.md b/frontend-compaction-handoff.md index 4cce9f5..d82016a 100644 --- a/frontend-compaction-handoff.md +++ b/frontend-compaction-handoff.md @@ -21,6 +21,7 @@ retain the most recent N messages. Two modes: // @dispatch/wire export interface CompactionResult { readonly summary: string; + readonly archiveId: string; // ID of the archive conversation with full pre-compaction history readonly messagesSummarized: number; readonly messagesKept: number; } @@ -29,6 +30,7 @@ export interface CompactionResult { export interface ConversationCompactedMessage { readonly type: "conversation.compacted"; readonly conversationId: string; + readonly archiveId: string; // ID of the archive conversation readonly messagesSummarized: number; readonly messagesKept: number; } @@ -37,6 +39,7 @@ export interface ConversationCompactedMessage { // @dispatch/transport-contract — HTTP response types export interface CompactResponse { readonly conversationId: string; + readonly archiveId: string; // ID of the archive conversation readonly messagesSummarized: number; readonly messagesKept: number; } @@ -89,14 +92,27 @@ Broadcast to all connected WS clients when compaction completes. The FE should reload the conversation history via `GET /conversations/:id` to reflect the compacted state (the old messages are replaced by a system summary + recent N). -## How compaction works (backend) +## How compaction works (backend) — non-destructive 1. Load full conversation history. 2. Split: old messages (to summarize) + recent N (to keep, default 10). -3. Call the model with a summarization system prompt + old messages. -4. Replace the entire history with: `[system: "Summary of previous conversation: ..."] + recent N messages`. -5. Emit `conversationCompacted` hook → WS broadcast. -6. The summary preserves key decisions, file paths, and unresolved questions. +3. **Fork** the full pre-compaction history to a new archive conversation + (new UUID). The archive gets `status: "closed"`, title `"Archive: <original>"`, + and `compactedFrom: <originalId>`. +4. Call the model with a summarization system prompt + old messages. +5. **Replace** the original conversation's history with: + `[system: "Summary of previous conversation: ..."] + recent N messages`. +6. Set `compactedFrom: <archiveId>` on the original conversation's metadata. +7. Emit `conversationCompacted` hook → WS broadcast (includes `archiveId`). + +**The original history is never destroyed.** The archive conversation is a +complete copy of the pre-compaction state, accessible via `GET /conversations/:id` +using the archive ID. The FE can link to it from the compacted conversation. + +`ConversationMeta` now has an optional `compactedFrom?: string` field. On a +compacted conversation, it points to the archive ID. On the archive, it points +to the original conversation ID. The FE can use this to show a "View full +history" link. ## What the FE needs to do diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts index 2f9f475..d713de3 100644 --- a/packages/conversation-store/src/store.ts +++ b/packages/conversation-store/src/store.ts @@ -97,10 +97,23 @@ export interface ConversationStore { conversationId: string, messages: readonly ChatMessage[], ) => Promise<void>; + /** + * Fork (copy) the full conversation history from `sourceId` to `targetId`. + * Copies all chunks, metadata, cwd, and reasoning-effort. The target's + * status is set to "closed" (it's an archive) and `compactedFrom` is set + * to `sourceId`. Used by compaction to preserve the pre-compaction history + * non-destructively before replacing it with a summary. + */ + readonly forkHistory: (sourceId: string, targetId: string) => Promise<void>; /** Get the compact threshold (token count, 0 = manual only), or null if unset. */ readonly getCompactThreshold: (conversationId: string) => Promise<number | null>; /** Set the compact threshold (token count, 0 = manual only). */ readonly setCompactThreshold: (conversationId: string, threshold: number) => Promise<void>; + /** + * Set the `compactedFrom` field on a conversation's metadata, pointing to + * the archive conversation that holds the pre-compaction history. + */ + readonly setCompactedFrom: (conversationId: string, archiveId: string) => Promise<void>; } export const conversationStoreHandle = defineService<ConversationStore>("conversation-store/store"); @@ -146,6 +159,7 @@ interface ConversationMetaRow { readonly lastActivityAt: number; readonly title: string; readonly status: ConversationStatus; + readonly compactedFrom?: string; } /** Maximum title length (in characters) before truncation with an ellipsis. */ @@ -195,7 +209,13 @@ function parseMetaRow(raw: string): ConversationMetaRow | null { const row = parsed as ConversationMetaRow; const status: ConversationStatus = row.status === "active" || row.status === "closed" ? row.status : "idle"; - return { createdAt: row.createdAt, lastActivityAt: row.lastActivityAt, title: row.title, status }; + return { + createdAt: row.createdAt, + lastActivityAt: row.lastActivityAt, + title: row.title, + status, + ...(row.compactedFrom !== undefined ? { compactedFrom: row.compactedFrom } : {}), + }; } function toMeta(id: string, row: ConversationMetaRow): ConversationMeta { @@ -205,6 +225,7 @@ function toMeta(id: string, row: ConversationMetaRow): ConversationMeta { lastActivityAt: row.lastActivityAt, title: row.title, status: row.status, + ...(row.compactedFrom !== undefined ? { compactedFrom: row.compactedFrom } : {}), }; } @@ -558,6 +579,43 @@ export function createConversationStore( await this.append(conversationId, messages); }, + async forkHistory(sourceId, targetId) { + // Copy all chunks from source to target, re-numbered from seq 1. + const keys = await storage.keys(chunkPrefix(sourceId)); + const sorted = [...keys].sort(); + let seq = 1; + for (const key of sorted) { + const value = await storage.get(key); + if (value === null) continue; + await storage.set(chunkKey(targetId, seq), value); + seq++; + } + await storage.set(seqKey(targetId), String(Math.max(seq - 1, 0))); + + // Copy metadata with archive title + closed status + compactedFrom. + const metaRaw = await storage.get(metaKey(sourceId)); + if (metaRaw !== null) { + const existing = parseMetaRow(metaRaw); + if (existing !== null) { + const row: ConversationMetaRow = { + createdAt: existing.createdAt, + lastActivityAt: existing.lastActivityAt, + title: `Archive: ${existing.title}`, + status: "closed", + compactedFrom: sourceId, + }; + await storage.set(metaKey(targetId), JSON.stringify(row)); + } + } + await ensureInIndex(targetId); + + // Copy cwd + reasoning-effort (so the archive is self-contained). + const cwd = await storage.get(cwdKey(sourceId)); + if (cwd !== null) await storage.set(cwdKey(targetId), cwd); + const effort = await storage.get(reasoningEffortKey(sourceId)); + if (effort !== null) await storage.set(reasoningEffortKey(targetId), effort); + }, + async getCompactThreshold(conversationId) { const raw = await storage.get(compactThresholdKey(conversationId)); if (raw === null) return null; @@ -571,5 +629,21 @@ export function createConversationStore( logger.debug("compact-threshold set", { conversationId, threshold }); } }, + + async setCompactedFrom(conversationId, archiveId) { + const raw = await storage.get(metaKey(conversationId)); + const existing = raw !== null ? parseMetaRow(raw) : null; + const ts = now(); + const row: ConversationMetaRow = existing ?? { + createdAt: ts, + lastActivityAt: ts, + title: "Untitled", + status: "idle", + }; + await storage.set( + metaKey(conversationId), + JSON.stringify({ ...row, compactedFrom: archiveId }), + ); + }, }; } diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index e657fe2..d5585c2 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -95,6 +95,8 @@ function createInMemoryStore(): ConversationStore & { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; } @@ -550,6 +552,8 @@ describe("turn-sealed event", () => { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; const { orchestrator } = createSessionOrchestrator({ @@ -621,6 +625,8 @@ describe("turn-sealed event", () => { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; const { orchestrator } = createSessionOrchestrator({ @@ -981,6 +987,8 @@ describe("turn metrics persistence", () => { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; const { orchestrator } = createSessionOrchestrator({ diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index c3b94bf..21c068c 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -132,6 +132,7 @@ export const conversationStatusChanged: EventHookDescriptor<ConversationStatusCh /** Payload for the conversationCompacted bus event. */ export interface ConversationCompactedPayload { readonly conversationId: string; + readonly archiveId: string; readonly messagesSummarized: number; readonly messagesKept: number; } @@ -801,6 +802,11 @@ export function createCompactionService( return { error: "model produced empty summary" }; } + // Non-destructive: fork the full pre-compaction history to an + // archive conversation before replacing it. + const archiveId = crypto.randomUUID(); + await deps.conversationStore.forkHistory(conversationId, archiveId); + // Replace history: [system: summary] + recent messages const summaryMessage: ChatMessage = { role: "system", @@ -813,15 +819,18 @@ export function createCompactionService( }; await deps.conversationStore.replaceHistory(conversationId, [summaryMessage, ...toKeep]); + await deps.conversationStore.setCompactedFrom(conversationId, archiveId); const result: CompactionResult = { summary, + archiveId, messagesSummarized: toSummarize.length, messagesKept: toKeep.length, }; deps.emit(conversationCompacted, { conversationId, + archiveId, messagesSummarized: toSummarize.length, messagesKept: toKeep.length, }); diff --git a/packages/session-orchestrator/src/queue.test.ts b/packages/session-orchestrator/src/queue.test.ts index ae24ddf..346f9c4 100644 --- a/packages/session-orchestrator/src/queue.test.ts +++ b/packages/session-orchestrator/src/queue.test.ts @@ -91,6 +91,8 @@ function createInMemoryStore(): ConversationStore & { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; } diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index 86eb807..2316c96 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -521,6 +521,7 @@ export interface ConversationStatusChangedMessage { export interface ConversationCompactedMessage { readonly type: "conversation.compacted"; readonly conversationId: string; + readonly archiveId: string; readonly messagesSummarized: number; readonly messagesKept: number; } @@ -576,6 +577,7 @@ export interface TitleResponse { */ export interface CompactResponse { readonly conversationId: string; + readonly archiveId: string; readonly messagesSummarized: number; readonly messagesKept: number; } diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index 51f791f..b265f5e 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -143,6 +143,8 @@ function createFakeConversationStore( return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; } @@ -869,6 +871,8 @@ describe("GET /conversations/:id", () => { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; const app = createApp({ conversationStore: store, @@ -942,6 +946,8 @@ describe("GET /conversations/:id", () => { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; const app = createApp({ conversationStore: store, @@ -1084,6 +1090,8 @@ describe("GET /conversations/:id/metrics", () => { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; const app = createApp({ conversationStore: brokenStore, @@ -2059,6 +2067,8 @@ describe("PUT /conversations/:id/reasoning-effort", () => { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; const app = createApp({ conversationStore: store, diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index fd78f3e..233e30e 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -725,6 +725,7 @@ export function createApp(opts: CreateServerOptions): Hono { const response: CompactResponse = { conversationId, + archiveId: result.archiveId, messagesSummarized: result.messagesSummarized, messagesKept: result.messagesKept, }; diff --git a/packages/transport-http/src/server.bun.test.ts b/packages/transport-http/src/server.bun.test.ts index e770867..3b7700e 100644 --- a/packages/transport-http/src/server.bun.test.ts +++ b/packages/transport-http/src/server.bun.test.ts @@ -70,6 +70,8 @@ function fakeConversationStore(): ConversationStore { return null; }, async setCompactThreshold() {}, + async forkHistory() {}, + async setCompactedFrom() {}, }; } diff --git a/packages/transport-ws/src/extension.ts b/packages/transport-ws/src/extension.ts index c3b9771..abc69e7 100644 --- a/packages/transport-ws/src/extension.ts +++ b/packages/transport-ws/src/extension.ts @@ -151,14 +151,18 @@ export function createTransportWsExtension(): Extension { // Broadcast `conversation.compacted` to all connected clients so // the FE reloads the conversation history after compaction. disposers.push( - host.on(conversationCompacted, ({ conversationId, messagesSummarized, messagesKept }) => { - broadcast({ - type: "conversation.compacted", - conversationId, - messagesSummarized, - messagesKept, - }); - }), + host.on( + conversationCompacted, + ({ conversationId, archiveId, messagesSummarized, messagesKept }) => { + broadcast({ + type: "conversation.compacted", + conversationId, + archiveId, + messagesSummarized, + messagesKept, + }); + }, + ), ); server = Bun.serve<ConnectionState>({ diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index 69f35f0..8c85f89 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -521,6 +521,12 @@ export interface ConversationMeta { readonly lastActivityAt: number; readonly title: string; readonly status: ConversationStatus; + /** + * Set on a compacted conversation: points to the archive conversation ID + * that holds the full pre-compaction history. Absent on conversations + * that have never been compacted. + */ + readonly compactedFrom?: string; } // ─── Compaction ────────────────────────────────────────────────────────────── @@ -529,9 +535,12 @@ export interface ConversationMeta { * Result of a compaction operation. `summary` is the text the model produced; * `messagesKept` is how many recent messages were retained after the summary; * `messagesSummarized` is how many old messages were replaced by the summary. + * `archiveId` is the ID of the new conversation that holds the full + * pre-compaction history (non-destructive — the original history is preserved). */ export interface CompactionResult { readonly summary: string; + readonly archiveId: string; readonly messagesSummarized: number; readonly messagesKept: number; } |
