summaryrefslogtreecommitdiffhomepage
path: root/packages/storage-sqlite/src/storage.ts
blob: 2499664339f184ffe59b76310f19350eb1dfeeee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import { Database } from "bun:sqlite";
import type { StorageNamespace } from "@dispatch/kernel";
import { computePending, type Migration } from "./migrate.js";

export interface SqliteStorageBackend {
	readonly storage: (namespace: string) => StorageNamespace;
	readonly migrate: (namespace: string, migrations: readonly Migration[]) => Promise<void>;
	readonly close: () => void;
}

export type StorageFactory = (opts: { path: string }) => SqliteStorageBackend;

export function createSqliteStorage(opts: { path: string }): SqliteStorageBackend {
	const db = new Database(opts.path);
	db.exec("PRAGMA journal_mode = WAL;");
	db.exec(`
		CREATE TABLE IF NOT EXISTS kv (
			namespace TEXT NOT NULL,
			key       TEXT NOT NULL,
			value     TEXT NOT NULL,
			PRIMARY KEY (namespace, key)
		);
	`);
	db.exec(`
		CREATE TABLE IF NOT EXISTS _migrations (
			namespace  TEXT NOT NULL,
			version    INTEGER NOT NULL,
			name       TEXT NOT NULL,
			applied_at TEXT NOT NULL DEFAULT (datetime('now')),
			PRIMARY KEY (namespace, version)
		);
	`);

	const getStmt = db.prepare("SELECT value FROM kv WHERE namespace = ?1 AND key = ?2");
	const setStmt = db.prepare(
		"INSERT OR REPLACE INTO kv (namespace, key, value) VALUES (?1, ?2, ?3)",
	);
	const deleteStmt = db.prepare("DELETE FROM kv WHERE namespace = ?1 AND key = ?2");
	const hasStmt = db.prepare("SELECT 1 FROM kv WHERE namespace = ?1 AND key = ?2");
	const keysAllStmt = db.prepare("SELECT key FROM kv WHERE namespace = ?1");
	const keysPrefixStmt = db.prepare("SELECT key FROM kv WHERE namespace = ?1 AND key LIKE ?2");
	const appliedVersionsStmt = db.prepare("SELECT version FROM _migrations WHERE namespace = ?1");
	const recordMigrationStmt = db.prepare(
		"INSERT INTO _migrations (namespace, version, name) VALUES (?1, ?2, ?3)",
	);

	function storage(namespace: string): StorageNamespace {
		return {
			get: async (key: string) => {
				const row = getStmt.get(namespace, key) as { value: string } | null;
				return row?.value ?? null;
			},
			set: async (key: string, value: string) => {
				setStmt.run(namespace, key, value);
			},
			delete: async (key: string) => {
				deleteStmt.run(namespace, key);
			},
			has: async (key: string) => {
				return hasStmt.get(namespace, key) !== null;
			},
			keys: async (prefix?: string) => {
				if (prefix !== undefined) {
					const rows = keysPrefixStmt.all(namespace, `${prefix}%`) as { key: string }[];
					return rows.map((r) => r.key);
				}
				const rows = keysAllStmt.all(namespace) as { key: string }[];
				return rows.map((r) => r.key);
			},
		};
	}

	async function migrate(namespace: string, migrations: readonly Migration[]): Promise<void> {
		const rows = appliedVersionsStmt.all(namespace) as { version: number }[];
		const applied = new Set(rows.map((r) => r.version));
		const pending = computePending(applied, migrations);

		for (const m of pending) {
			const runInTransaction = db.transaction(() => {
				db.exec(m.up);
				recordMigrationStmt.run(namespace, m.version, m.name);
			});
			runInTransaction();
		}
	}

	function close(): void {
		db.close();
	}

	return { storage, migrate, close };
}