summaryrefslogtreecommitdiffhomepage
path: root/packages/core/src/db/chunks.ts
blob: b434a47b2c59c56ff905767845f610c12a00d296 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import { randomUUID } from "node:crypto";
import {
	explodeTurn,
	explodeUserText,
	groupRowsToMessages,
	type MessageRow,
} from "../chunks/transform.js";
import type {
	ChunkData,
	ChunkRow,
	ChunkRowDraft,
	TextData,
	UsageData,
	UsageStats,
} from "../types/index.js";
import { getDatabase } from "./index.js";

// Re-export the DB-free transforms so existing barrel consumers
// (`@dispatch/core`) keep importing them from here. The browser frontend deep-
// imports them directly from `chunks/transform.js` to avoid the DB dependency.
export { explodeTurn, explodeUserText, groupRowsToMessages, type MessageRow };

// ─── Persistence ─────────────────────────────────────────────────

function mapRow(row: Record<string, unknown>): ChunkRow {
	let data: ChunkData;
	try {
		data = JSON.parse(row.data_json as string) as ChunkData;
	} catch {
		data = { text: "" } as TextData;
	}
	return {
		id: row.id as string,
		tabId: row.tab_id as string,
		seq: row.seq as number,
		turnId: row.turn_id as string,
		step: row.step as number,
		role: row.role as ChunkRow["role"],
		type: row.type as ChunkRow["type"],
		data,
		createdAt: row.created_at as number,
	};
}

/**
 * Append one or more chunk-row drafts to a tab, assigning a monotonic per-tab
 * `seq` and a fresh id/timestamp to each. Returns the inserted rows in order.
 */
export function appendChunks(tabId: string, drafts: ChunkRowDraft[]): ChunkRow[] {
	if (drafts.length === 0) return [];
	const db = getDatabase();
	const maxSeq = db
		.query("SELECT COALESCE(MAX(seq), -1) as max_seq FROM chunks WHERE tab_id = $tabId")
		.get({ $tabId: tabId }) as { max_seq: number };
	let seq = (maxSeq?.max_seq ?? -1) + 1;
	const now = Date.now();
	const insert = db.query(
		`INSERT INTO chunks (id, tab_id, seq, turn_id, step, role, type, data_json, created_at)
		 VALUES ($id, $tabId, $seq, $turnId, $step, $role, $type, $dataJson, $now)`,
	);
	const out: ChunkRow[] = [];
	// Wrap the whole batch in one transaction: a turn's chunks are persisted in
	// a single `appendChunks` call, so this is one fsync per turn instead of one
	// per row — the chosen low-IO write strategy for constrained backends.
	const insertAll = db.transaction(() => {
		for (const draft of drafts) {
			const id = randomUUID();
			insert.run({
				$id: id,
				$tabId: tabId,
				$seq: seq,
				$turnId: draft.turnId,
				$step: draft.step,
				$role: draft.role,
				$type: draft.type,
				$dataJson: JSON.stringify(draft.data),
				$now: now,
			});
			out.push({
				id,
				tabId,
				seq,
				turnId: draft.turnId,
				step: draft.step,
				role: draft.role,
				type: draft.type,
				data: draft.data,
				createdAt: now,
			});
			seq++;
		}
	});
	insertAll();
	return out;
}

/**
 * Read chunk rows for a tab in `seq` order (ASC). Pagination mirrors the old
 * message pagination but at chunk granularity:
 *   - no options → all rows;
 *   - `before` → rows with `seq < before`, most-recent-first then reversed;
 *   - `limit` → most recent `limit` rows, reversed to ASC.
 */
export function getChunksForTab(
	tabId: string,
	options?: { limit?: number; before?: number },
): ChunkRow[] {
	const db = getDatabase();
	if (!options) {
		const rows = db
			.query("SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC")
			.all({ $tabId: tabId }) as Array<Record<string, unknown>>;
		return rows.map(mapRow);
	}
	const { limit, before } = options;
	if (before !== undefined) {
		if (limit !== undefined) {
			const rows = db
				.query(
					"SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC LIMIT $limit",
				)
				.all({ $tabId: tabId, $before: before, $limit: limit }) as Array<Record<string, unknown>>;
			return rows.map(mapRow).reverse();
		}
		const rows = db
			.query(
				"SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC",
			)
			.all({ $tabId: tabId, $before: before }) as Array<Record<string, unknown>>;
		return rows.map(mapRow).reverse();
	}
	if (limit !== undefined) {
		const rows = db
			.query(
				"SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq DESC LIMIT $limit",
			)
			.all({ $tabId: tabId, $limit: limit }) as Array<Record<string, unknown>>;
		return rows.map(mapRow).reverse();
	}
	const rows = db
		.query("SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC")
		.all({ $tabId: tabId }) as Array<Record<string, unknown>>;
	return rows.map(mapRow);
}

/**
 * Derived, grouped view of a tab's full history as messages. Used to
 * pre-populate the agent's in-memory `ChatMessage[]` history when an Agent is
 * (re)constructed. Always reads the full log (grouping a partial window would
 * be lossy for the rebuild path).
 */
export function getMessagesForTab(tabId: string): MessageRow[] {
	return groupRowsToMessages(getChunksForTab(tabId));
}

export function getTotalChunkCount(tabId: string): number {
	const db = getDatabase();
	const row = db
		.query("SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId AND type != 'usage'")
		.get({ $tabId: tabId }) as { count: number } | null;
	return row?.count ?? 0;
}

/**
 * Aggregate per-tab token/cache usage across ALL persisted `usage` chunk rows.
 *
 * Usage rows are written as an invisible side channel (one row per `usage`
 * AgentEvent) and are query-excluded from `getChunksForTab`/`getTotalChunkCount`,
 * so this aggregate is the read path. Because it sums server-side over every
 * row, it stays complete even after the frontend evicts/pages out old turns
 * (eviction is in-memory only). The return shape is structurally identical to
 * the frontend `CacheStats`, so reload can seed it directly.
 *
 *   - cumulative `inputTokens`/`outputTokens`/`cacheReadTokens`/`cacheWriteTokens`
 *     = SUM over all usage rows;
 *   - `requests` = COUNT of usage rows;
 *   - `last` = the highest-seq usage row's split (most recent request);
 *   - `null` when the tab has no usage rows.
 *
 * Sums in JS after selecting the rows (mirroring `mapRow`) to avoid relying on
 * `json_extract` over the freeform `data_json`.
 */
export function getUsageStatsForTab(tabId: string): UsageStats | null {
	const db = getDatabase();
	const rows = db
		.query("SELECT data_json FROM chunks WHERE tab_id = $tabId AND type = 'usage' ORDER BY seq ASC")
		.all({ $tabId: tabId }) as Array<{ data_json: string }>;
	if (rows.length === 0) return null;

	let inputTokens = 0;
	let outputTokens = 0;
	let cacheReadTokens = 0;
	let cacheWriteTokens = 0;
	let last: UsageData | null = null;
	for (const row of rows) {
		let u: UsageData;
		try {
			u = JSON.parse(row.data_json) as UsageData;
		} catch {
			continue;
		}
		inputTokens += u.inputTokens ?? 0;
		outputTokens += u.outputTokens ?? 0;
		cacheReadTokens += u.cacheReadTokens ?? 0;
		cacheWriteTokens += u.cacheWriteTokens ?? 0;
		last = {
			inputTokens: u.inputTokens ?? 0,
			outputTokens: u.outputTokens ?? 0,
			cacheReadTokens: u.cacheReadTokens ?? 0,
			cacheWriteTokens: u.cacheWriteTokens ?? 0,
		};
	}

	return {
		inputTokens,
		outputTokens,
		cacheReadTokens,
		cacheWriteTokens,
		requests: rows.length,
		last,
	};
}

export function clearChunksForTab(tabId: string): void {
	const db = getDatabase();
	db.query("DELETE FROM chunks WHERE tab_id = $tabId").run({ $tabId: tabId });
}

/**
 * Relocate every chunk row from one tab to another (compaction backup path).
 *
 * Used by conversation compaction to move the FULL pre-compaction history off
 * the canonical tab id (`fromTabId`) onto a freshly-created backup tab id
 * (`toTabId`), leaving the canonical id free to be re-seeded with the summary +
 * preserved tail. `seq` values are preserved (they remain per-tab monotonic for
 * the destination since it starts empty), as are turn ids, so the relocated
 * history groups identically under its new tab. Returns the number of rows
 * moved.
 */
export function rekeyChunks(fromTabId: string, toTabId: string): number {
	const db = getDatabase();
	const result = db
		.query("UPDATE chunks SET tab_id = $to WHERE tab_id = $from")
		.run({ $from: fromTabId, $to: toTabId });
	return Number(result.changes ?? 0);
}