summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-03-30 21:16:02 -0400
committerGitHub <[email protected]>2026-03-31 01:16:02 +0000
commita898c2ea3ad404056e015de8f37106cca7b7a4c3 (patch)
tree4f8e8b7d60bd7785db23563eeb7ed3ceee3b4adc
parentbf777298c8b64997bcbb8e52b016e417e7e62114 (diff)
downloadopencode-a898c2ea3ad404056e015de8f37106cca7b7a4c3.tar.gz
opencode-a898c2ea3ad404056e015de8f37106cca7b7a4c3.zip
refactor(storage): effectify Storage service (#20132)
-rw-r--r--packages/opencode/src/storage/storage.ts400
-rw-r--r--packages/opencode/test/server/session-messages.test.ts156
-rw-r--r--packages/opencode/test/storage/storage.test.ts295
3 files changed, 651 insertions, 200 deletions
diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts
index a78607cdf..268b18f68 100644
--- a/packages/opencode/src/storage/storage.ts
+++ b/packages/opencode/src/storage/storage.ts
@@ -1,19 +1,17 @@
import { Log } from "../util/log"
import path from "path"
-import fs from "fs/promises"
import { Global } from "../global"
-import { Filesystem } from "../util/filesystem"
-import { lazy } from "../util/lazy"
-import { Lock } from "../util/lock"
import { NamedError } from "@opencode-ai/util/error"
import z from "zod"
-import { Glob } from "../util/glob"
import { git } from "@/util/git"
+import { AppFileSystem } from "@/filesystem"
+import { makeRuntime } from "@/effect/run-service"
+import { Effect, Exit, Layer, Option, RcMap, Schema, ServiceMap, TxReentrantLock } from "effect"
export namespace Storage {
const log = Log.create({ service: "storage" })
- type Migration = (dir: string) => Promise<void>
+ type Migration = (dir: string, fs: AppFileSystem.Interface) => Effect.Effect<void, AppFileSystem.Error>
export const NotFoundError = NamedError.create(
"NotFoundError",
@@ -22,36 +20,101 @@ export namespace Storage {
}),
)
+ export type Error = AppFileSystem.Error | InstanceType<typeof NotFoundError>
+
+ const RootFile = Schema.Struct({
+ path: Schema.optional(
+ Schema.Struct({
+ root: Schema.optional(Schema.String),
+ }),
+ ),
+ })
+
+ const SessionFile = Schema.Struct({
+ id: Schema.String,
+ })
+
+ const MessageFile = Schema.Struct({
+ id: Schema.String,
+ })
+
+ const DiffFile = Schema.Struct({
+ additions: Schema.Number,
+ deletions: Schema.Number,
+ })
+
+ const SummaryFile = Schema.Struct({
+ id: Schema.String,
+ projectID: Schema.String,
+ summary: Schema.Struct({ diffs: Schema.Array(DiffFile) }),
+ })
+
+ const decodeRoot = Schema.decodeUnknownOption(RootFile)
+ const decodeSession = Schema.decodeUnknownOption(SessionFile)
+ const decodeMessage = Schema.decodeUnknownOption(MessageFile)
+ const decodeSummary = Schema.decodeUnknownOption(SummaryFile)
+
+ export interface Interface {
+ readonly remove: (key: string[]) => Effect.Effect<void, AppFileSystem.Error>
+ readonly read: <T>(key: string[]) => Effect.Effect<T, Error>
+ readonly update: <T>(key: string[], fn: (draft: T) => void) => Effect.Effect<T, Error>
+ readonly write: <T>(key: string[], content: T) => Effect.Effect<void, AppFileSystem.Error>
+ readonly list: (prefix: string[]) => Effect.Effect<string[][], AppFileSystem.Error>
+ }
+
+ export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Storage") {}
+
+ function file(dir: string, key: string[]) {
+ return path.join(dir, ...key) + ".json"
+ }
+
+ function missing(err: unknown) {
+ if (!err || typeof err !== "object") return false
+ if ("code" in err && err.code === "ENOENT") return true
+ if ("reason" in err && err.reason && typeof err.reason === "object" && "_tag" in err.reason) {
+ return err.reason._tag === "NotFound"
+ }
+ return false
+ }
+
+ function parseMigration(text: string) {
+ const value = Number.parseInt(text, 10)
+ return Number.isNaN(value) ? 0 : value
+ }
+
const MIGRATIONS: Migration[] = [
- async (dir) => {
+ Effect.fn("Storage.migration.1")(function* (dir: string, fs: AppFileSystem.Interface) {
const project = path.resolve(dir, "../project")
- if (!(await Filesystem.isDir(project))) return
- const projectDirs = await Glob.scan("*", {
+ if (!(yield* fs.isDir(project))) return
+ const projectDirs = yield* fs.glob("*", {
cwd: project,
include: "all",
})
for (const projectDir of projectDirs) {
- const fullPath = path.join(project, projectDir)
- if (!(await Filesystem.isDir(fullPath))) continue
+ const full = path.join(project, projectDir)
+ if (!(yield* fs.isDir(full))) continue
log.info(`migrating project ${projectDir}`)
let projectID = projectDir
- const fullProjectDir = path.join(project, projectDir)
let worktree = "/"
if (projectID !== "global") {
- for (const msgFile of await Glob.scan("storage/session/message/*/*.json", {
- cwd: path.join(project, projectDir),
+ for (const msgFile of yield* fs.glob("storage/session/message/*/*.json", {
+ cwd: full,
absolute: true,
})) {
- const json = await Filesystem.readJson<any>(msgFile)
- worktree = json.path?.root
- if (worktree) break
+ const json = decodeRoot(yield* fs.readJson(msgFile), { onExcessProperty: "preserve" })
+ const root = Option.isSome(json) ? json.value.path?.root : undefined
+ if (!root) continue
+ worktree = root
+ break
}
if (!worktree) continue
- if (!(await Filesystem.isDir(worktree))) continue
- const result = await git(["rev-list", "--max-parents=0", "--all"], {
- cwd: worktree,
- })
+ if (!(yield* fs.isDir(worktree))) continue
+ const result = yield* Effect.promise(() =>
+ git(["rev-list", "--max-parents=0", "--all"], {
+ cwd: worktree,
+ }),
+ )
const [id] = result
.text()
.split("\n")
@@ -61,157 +124,230 @@ export namespace Storage {
if (!id) continue
projectID = id
- await Filesystem.writeJson(path.join(dir, "project", projectID + ".json"), {
- id,
- vcs: "git",
- worktree,
- time: {
- created: Date.now(),
- initialized: Date.now(),
- },
- })
+ yield* fs.writeWithDirs(
+ path.join(dir, "project", projectID + ".json"),
+ JSON.stringify(
+ {
+ id,
+ vcs: "git",
+ worktree,
+ time: {
+ created: Date.now(),
+ initialized: Date.now(),
+ },
+ },
+ null,
+ 2,
+ ),
+ )
log.info(`migrating sessions for project ${projectID}`)
- for (const sessionFile of await Glob.scan("storage/session/info/*.json", {
- cwd: fullProjectDir,
+ for (const sessionFile of yield* fs.glob("storage/session/info/*.json", {
+ cwd: full,
absolute: true,
})) {
const dest = path.join(dir, "session", projectID, path.basename(sessionFile))
- log.info("copying", {
- sessionFile,
- dest,
- })
- const session = await Filesystem.readJson<any>(sessionFile)
- await Filesystem.writeJson(dest, session)
- log.info(`migrating messages for session ${session.id}`)
- for (const msgFile of await Glob.scan(`storage/session/message/${session.id}/*.json`, {
- cwd: fullProjectDir,
+ log.info("copying", { sessionFile, dest })
+ const session = yield* fs.readJson(sessionFile)
+ const info = decodeSession(session, { onExcessProperty: "preserve" })
+ yield* fs.writeWithDirs(dest, JSON.stringify(session, null, 2))
+ if (Option.isNone(info)) continue
+ log.info(`migrating messages for session ${info.value.id}`)
+ for (const msgFile of yield* fs.glob(`storage/session/message/${info.value.id}/*.json`, {
+ cwd: full,
absolute: true,
})) {
- const dest = path.join(dir, "message", session.id, path.basename(msgFile))
+ const next = path.join(dir, "message", info.value.id, path.basename(msgFile))
log.info("copying", {
msgFile,
- dest,
+ dest: next,
})
- const message = await Filesystem.readJson<any>(msgFile)
- await Filesystem.writeJson(dest, message)
+ const message = yield* fs.readJson(msgFile)
+ const item = decodeMessage(message, { onExcessProperty: "preserve" })
+ yield* fs.writeWithDirs(next, JSON.stringify(message, null, 2))
+ if (Option.isNone(item)) continue
- log.info(`migrating parts for message ${message.id}`)
- for (const partFile of await Glob.scan(`storage/session/part/${session.id}/${message.id}/*.json`, {
- cwd: fullProjectDir,
+ log.info(`migrating parts for message ${item.value.id}`)
+ for (const partFile of yield* fs.glob(`storage/session/part/${info.value.id}/${item.value.id}/*.json`, {
+ cwd: full,
absolute: true,
})) {
- const dest = path.join(dir, "part", message.id, path.basename(partFile))
- const part = await Filesystem.readJson(partFile)
+ const out = path.join(dir, "part", item.value.id, path.basename(partFile))
+ const part = yield* fs.readJson(partFile)
log.info("copying", {
partFile,
- dest,
+ dest: out,
})
- await Filesystem.writeJson(dest, part)
+ yield* fs.writeWithDirs(out, JSON.stringify(part, null, 2))
}
}
}
}
}
- },
- async (dir) => {
- for (const item of await Glob.scan("session/*/*.json", {
+ }),
+ Effect.fn("Storage.migration.2")(function* (dir: string, fs: AppFileSystem.Interface) {
+ for (const item of yield* fs.glob("session/*/*.json", {
cwd: dir,
absolute: true,
})) {
- const session = await Filesystem.readJson<any>(item)
- if (!session.projectID) continue
- if (!session.summary?.diffs) continue
- const { diffs } = session.summary
- await Filesystem.write(path.join(dir, "session_diff", session.id + ".json"), JSON.stringify(diffs))
- await Filesystem.writeJson(path.join(dir, "session", session.projectID, session.id + ".json"), {
- ...session,
- summary: {
- additions: diffs.reduce((sum: any, x: any) => sum + x.additions, 0),
- deletions: diffs.reduce((sum: any, x: any) => sum + x.deletions, 0),
- },
- })
+ const raw = yield* fs.readJson(item)
+ const session = decodeSummary(raw, { onExcessProperty: "preserve" })
+ if (Option.isNone(session)) continue
+ const diffs = session.value.summary.diffs
+ yield* fs.writeWithDirs(
+ path.join(dir, "session_diff", session.value.id + ".json"),
+ JSON.stringify(diffs, null, 2),
+ )
+ yield* fs.writeWithDirs(
+ path.join(dir, "session", session.value.projectID, session.value.id + ".json"),
+ JSON.stringify(
+ {
+ ...(raw as Record<string, unknown>),
+ summary: {
+ additions: diffs.reduce((sum, x) => sum + x.additions, 0),
+ deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
+ },
+ },
+ null,
+ 2,
+ ),
+ )
}
- },
+ }),
]
- const state = lazy(async () => {
- const dir = path.join(Global.Path.data, "storage")
- const migration = await Filesystem.readJson<string>(path.join(dir, "migration"))
- .then((x) => parseInt(x))
- .catch(() => 0)
- for (let index = migration; index < MIGRATIONS.length; index++) {
- log.info("running migration", { index })
- const migration = MIGRATIONS[index]
- await migration(dir).catch(() => log.error("failed to run migration", { index }))
- await Filesystem.write(path.join(dir, "migration"), (index + 1).toString())
- }
- return {
- dir,
- }
- })
+ export const layer = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const fs = yield* AppFileSystem.Service
+ const locks = yield* RcMap.make({
+ lookup: () => TxReentrantLock.make(),
+ idleTimeToLive: 0,
+ })
+ const state = yield* Effect.cached(
+ Effect.gen(function* () {
+ const dir = path.join(Global.Path.data, "storage")
+ const marker = path.join(dir, "migration")
+ const migration = yield* fs.readFileString(marker).pipe(
+ Effect.map(parseMigration),
+ Effect.catchIf(missing, () => Effect.succeed(0)),
+ Effect.orElseSucceed(() => 0),
+ )
+ for (let i = migration; i < MIGRATIONS.length; i++) {
+ log.info("running migration", { index: i })
+ const step = MIGRATIONS[i]!
+ const exit = yield* Effect.exit(step(dir, fs))
+ if (Exit.isFailure(exit)) {
+ log.error("failed to run migration", { index: i, cause: exit.cause })
+ break
+ }
+ yield* fs.writeWithDirs(marker, String(i + 1))
+ }
+ return { dir }
+ }),
+ )
+
+ const fail = (target: string): Effect.Effect<never, InstanceType<typeof NotFoundError>> =>
+ Effect.fail(new NotFoundError({ message: `Resource not found: ${target}` }))
+
+ const wrap = <A>(target: string, body: Effect.Effect<A, AppFileSystem.Error>) =>
+ body.pipe(Effect.catchIf(missing, () => fail(target)))
+
+ const writeJson = Effect.fnUntraced(function* (target: string, content: unknown) {
+ yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2))
+ })
+
+ const withResolved = <A, E>(
+ key: string[],
+ fn: (target: string, rw: TxReentrantLock.TxReentrantLock) => Effect.Effect<A, E>,
+ ): Effect.Effect<A, E | AppFileSystem.Error> =>
+ Effect.scoped(
+ Effect.gen(function* () {
+ const target = file((yield* state).dir, key)
+ return yield* fn(target, yield* RcMap.get(locks, target))
+ }),
+ )
+
+ const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) {
+ yield* withResolved(key, (target, rw) =>
+ TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))),
+ )
+ })
+
+ const read: Interface["read"] = <T>(key: string[]) =>
+ Effect.gen(function* () {
+ const value = yield* withResolved(key, (target, rw) =>
+ TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))),
+ )
+ return value as T
+ })
+
+ const update: Interface["update"] = <T>(key: string[], fn: (draft: T) => void) =>
+ Effect.gen(function* () {
+ const value = yield* withResolved(key, (target, rw) =>
+ TxReentrantLock.withWriteLock(
+ rw,
+ Effect.gen(function* () {
+ const content = yield* wrap(target, fs.readJson(target))
+ fn(content as T)
+ yield* writeJson(target, content)
+ return content
+ }),
+ ),
+ )
+ return value as T
+ })
+
+ const write: Interface["write"] = (key: string[], content: unknown) =>
+ Effect.gen(function* () {
+ yield* withResolved(key, (target, rw) => TxReentrantLock.withWriteLock(rw, writeJson(target, content)))
+ })
+
+ const list: Interface["list"] = Effect.fn("Storage.list")(function* (prefix: string[]) {
+ const dir = (yield* state).dir
+ const cwd = path.join(dir, ...prefix)
+ const result = yield* fs
+ .glob("**/*", {
+ cwd,
+ include: "file",
+ })
+ .pipe(Effect.catch(() => Effect.succeed<string[]>([])))
+ return result
+ .map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])
+ .toSorted((a, b) => a.join("/").localeCompare(b.join("/")))
+ })
+
+ return Service.of({
+ remove,
+ read,
+ update,
+ write,
+ list,
+ })
+ }),
+ )
+
+ export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
+
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function remove(key: string[]) {
- const dir = await state().then((x) => x.dir)
- const target = path.join(dir, ...key) + ".json"
- return withErrorHandling(async () => {
- await fs.unlink(target).catch(() => {})
- })
+ return runPromise((svc) => svc.remove(key))
}
export async function read<T>(key: string[]) {
- const dir = await state().then((x) => x.dir)
- const target = path.join(dir, ...key) + ".json"
- return withErrorHandling(async () => {
- using _ = await Lock.read(target)
- const result = await Filesystem.readJson<T>(target)
- return result as T
- })
+ return runPromise((svc) => svc.read<T>(key))
}
export async function update<T>(key: string[], fn: (draft: T) => void) {
- const dir = await state().then((x) => x.dir)
- const target = path.join(dir, ...key) + ".json"
- return withErrorHandling(async () => {
- using _ = await Lock.write(target)
- const content = await Filesystem.readJson<T>(target)
- fn(content as T)
- await Filesystem.writeJson(target, content)
- return content
- })
+ return runPromise((svc) => svc.update<T>(key, fn))
}
export async function write<T>(key: string[], content: T) {
- const dir = await state().then((x) => x.dir)
- const target = path.join(dir, ...key) + ".json"
- return withErrorHandling(async () => {
- using _ = await Lock.write(target)
- await Filesystem.writeJson(target, content)
- })
- }
-
- async function withErrorHandling<T>(body: () => Promise<T>) {
- return body().catch((e) => {
- if (!(e instanceof Error)) throw e
- const errnoException = e as NodeJS.ErrnoException
- if (errnoException.code === "ENOENT") {
- throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` })
- }
- throw e
- })
+ return runPromise((svc) => svc.write(key, content))
}
export async function list(prefix: string[]) {
- const dir = await state().then((x) => x.dir)
- try {
- const result = await Glob.scan("**/*", {
- cwd: path.join(dir, ...prefix),
- include: "file",
- }).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
- result.sort()
- return result
- } catch {
- return []
- }
+ return runPromise((svc) => svc.list(prefix))
}
}
diff --git a/packages/opencode/test/server/session-messages.test.ts b/packages/opencode/test/server/session-messages.test.ts
index d7e44cbec..89e6fba5c 100644
--- a/packages/opencode/test/server/session-messages.test.ts
+++ b/packages/opencode/test/server/session-messages.test.ts
@@ -13,6 +13,18 @@ afterEach(async () => {
await Instance.disposeAll()
})
+async function withoutWatcher<T>(fn: () => Promise<T>) {
+ if (process.platform !== "win32") return fn()
+ const prev = process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
+ process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = "true"
+ try {
+ return await fn()
+ } finally {
+ if (prev === undefined) delete process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
+ else process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = prev
+ }
+}
+
async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) {
const ids = [] as MessageID[]
for (let i = 0; i < count; i++) {
@@ -42,86 +54,94 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D
describe("session messages endpoint", () => {
test("returns cursor headers for older pages", async () => {
await using tmp = await tmpdir({ git: true })
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await Session.create({})
- const ids = await fill(session.id, 5)
- const app = Server.Default()
-
- const a = await app.request(`/session/${session.id}/message?limit=2`)
- expect(a.status).toBe(200)
- const aBody = (await a.json()) as MessageV2.WithParts[]
- expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2))
- const cursor = a.headers.get("x-next-cursor")
- expect(cursor).toBeTruthy()
- expect(a.headers.get("link")).toContain('rel="next"')
-
- const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`)
- expect(b.status).toBe(200)
- const bBody = (await b.json()) as MessageV2.WithParts[]
- expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2))
-
- await Session.remove(session.id)
- },
- })
+ await withoutWatcher(() =>
+ Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ const ids = await fill(session.id, 5)
+ const app = Server.Default()
+
+ const a = await app.request(`/session/${session.id}/message?limit=2`)
+ expect(a.status).toBe(200)
+ const aBody = (await a.json()) as MessageV2.WithParts[]
+ expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2))
+ const cursor = a.headers.get("x-next-cursor")
+ expect(cursor).toBeTruthy()
+ expect(a.headers.get("link")).toContain('rel="next"')
+
+ const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`)
+ expect(b.status).toBe(200)
+ const bBody = (await b.json()) as MessageV2.WithParts[]
+ expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2))
+
+ await Session.remove(session.id)
+ },
+ }),
+ )
})
test("keeps full-history responses when limit is omitted", async () => {
await using tmp = await tmpdir({ git: true })
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await Session.create({})
- const ids = await fill(session.id, 3)
- const app = Server.Default()
-
- const res = await app.request(`/session/${session.id}/message`)
- expect(res.status).toBe(200)
- const body = (await res.json()) as MessageV2.WithParts[]
- expect(body.map((item) => item.info.id)).toEqual(ids)
-
- await Session.remove(session.id)
- },
- })
+ await withoutWatcher(() =>
+ Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ const ids = await fill(session.id, 3)
+ const app = Server.Default()
+
+ const res = await app.request(`/session/${session.id}/message`)
+ expect(res.status).toBe(200)
+ const body = (await res.json()) as MessageV2.WithParts[]
+ expect(body.map((item) => item.info.id)).toEqual(ids)
+
+ await Session.remove(session.id)
+ },
+ }),
+ )
})
test("rejects invalid cursors and missing sessions", async () => {
await using tmp = await tmpdir({ git: true })
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await Session.create({})
- const app = Server.Default()
-
- const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`)
- expect(bad.status).toBe(400)
-
- const miss = await app.request(`/session/ses_missing/message?limit=2`)
- expect(miss.status).toBe(404)
-
- await Session.remove(session.id)
- },
- })
+ await withoutWatcher(() =>
+ Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ const app = Server.Default()
+
+ const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`)
+ expect(bad.status).toBe(400)
+
+ const miss = await app.request(`/session/ses_missing/message?limit=2`)
+ expect(miss.status).toBe(404)
+
+ await Session.remove(session.id)
+ },
+ }),
+ )
})
test("does not truncate large legacy limit requests", async () => {
await using tmp = await tmpdir({ git: true })
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- const session = await Session.create({})
- await fill(session.id, 520)
- const app = Server.Default()
-
- const res = await app.request(`/session/${session.id}/message?limit=510`)
- expect(res.status).toBe(200)
- const body = (await res.json()) as MessageV2.WithParts[]
- expect(body).toHaveLength(510)
-
- await Session.remove(session.id)
- },
- })
+ await withoutWatcher(() =>
+ Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ await fill(session.id, 520)
+ const app = Server.Default()
+
+ const res = await app.request(`/session/${session.id}/message?limit=510`)
+ expect(res.status).toBe(200)
+ const body = (await res.json()) as MessageV2.WithParts[]
+ expect(body).toHaveLength(510)
+
+ await Session.remove(session.id)
+ },
+ }),
+ )
})
})
diff --git a/packages/opencode/test/storage/storage.test.ts b/packages/opencode/test/storage/storage.test.ts
new file mode 100644
index 000000000..e5a04c082
--- /dev/null
+++ b/packages/opencode/test/storage/storage.test.ts
@@ -0,0 +1,295 @@
+import { describe, expect, test } from "bun:test"
+import fs from "fs/promises"
+import path from "path"
+import { Effect, Layer, ManagedRuntime } from "effect"
+import { AppFileSystem } from "../../src/filesystem"
+import { Global } from "../../src/global"
+import { Storage } from "../../src/storage/storage"
+import { tmpdir } from "../fixture/fixture"
+
+const dir = path.join(Global.Path.data, "storage")
+
+async function withScope<T>(fn: (root: string[]) => Promise<T>) {
+ const root = ["storage_test", crypto.randomUUID()]
+ try {
+ return await fn(root)
+ } finally {
+ await fs.rm(path.join(dir, ...root), { recursive: true, force: true })
+ }
+}
+
+function map(root: string, file: string) {
+ if (file === Global.Path.data) return root
+ if (file.startsWith(Global.Path.data + path.sep)) return path.join(root, path.relative(Global.Path.data, file))
+ return file
+}
+
+function layer(root: string) {
+ return Layer.effect(
+ AppFileSystem.Service,
+ Effect.gen(function* () {
+ const fs = yield* AppFileSystem.Service
+ return AppFileSystem.Service.of({
+ ...fs,
+ isDir: (file) => fs.isDir(map(root, file)),
+ readJson: (file) => fs.readJson(map(root, file)),
+ writeWithDirs: (file, content, mode) => fs.writeWithDirs(map(root, file), content, mode),
+ readFileString: (file) => fs.readFileString(map(root, file)),
+ remove: (file) => fs.remove(map(root, file)),
+ glob: (pattern, options) =>
+ fs.glob(pattern, options?.cwd ? { ...options, cwd: map(root, options.cwd) } : options),
+ })
+ }),
+ ).pipe(Layer.provide(AppFileSystem.defaultLayer))
+}
+
+async function withStorage<T>(
+ root: string,
+ fn: (run: <A, E>(body: Effect.Effect<A, E, Storage.Service>) => Promise<A>) => Promise<T>,
+) {
+ const rt = ManagedRuntime.make(Storage.layer.pipe(Layer.provide(layer(root))))
+ try {
+ return await fn((body) => rt.runPromise(body))
+ } finally {
+ await rt.dispose()
+ }
+}
+
+async function write(file: string, value: unknown) {
+ await fs.mkdir(path.dirname(file), { recursive: true })
+ await Bun.write(file, JSON.stringify(value, null, 2))
+}
+
+async function text(file: string, value: string) {
+ await fs.mkdir(path.dirname(file), { recursive: true })
+ await Bun.write(file, value)
+}
+
+async function exists(file: string) {
+ return fs
+ .stat(file)
+ .then(() => true)
+ .catch(() => false)
+}
+
+describe("Storage", () => {
+ test("round-trips JSON content", async () => {
+ await withScope(async (root) => {
+ const key = [...root, "session_diff", "roundtrip"]
+ const value = [{ file: "a.ts", additions: 2, deletions: 1 }]
+
+ await Storage.write(key, value)
+
+ expect(await Storage.read<typeof value>(key)).toEqual(value)
+ })
+ })
+
+ test("maps missing reads to NotFoundError", async () => {
+ await withScope(async (root) => {
+ await expect(Storage.read([...root, "missing", "value"])).rejects.toMatchObject({ name: "NotFoundError" })
+ })
+ })
+
+ test("update on missing key throws NotFoundError", async () => {
+ await withScope(async (root) => {
+ await expect(
+ Storage.update<{ value: number }>([...root, "missing", "key"], (draft) => {
+ draft.value += 1
+ }),
+ ).rejects.toMatchObject({ name: "NotFoundError" })
+ })
+ })
+
+ test("write overwrites existing value", async () => {
+ await withScope(async (root) => {
+ const key = [...root, "overwrite", "test"]
+ await Storage.write<{ v: number }>(key, { v: 1 })
+ await Storage.write<{ v: number }>(key, { v: 2 })
+
+ expect(await Storage.read<{ v: number }>(key)).toEqual({ v: 2 })
+ })
+ })
+
+ test("remove on missing key is a no-op", async () => {
+ await withScope(async (root) => {
+ await expect(Storage.remove([...root, "nonexistent", "key"])).resolves.toBeUndefined()
+ })
+ })
+
+ test("list on missing prefix returns empty", async () => {
+ await withScope(async (root) => {
+ expect(await Storage.list([...root, "nonexistent"])).toEqual([])
+ })
+ })
+
+ test("serializes concurrent updates for the same key", async () => {
+ await withScope(async (root) => {
+ const key = [...root, "counter", "shared"]
+ await Storage.write(key, { value: 0 })
+
+ await Promise.all(
+ Array.from({ length: 25 }, () =>
+ Storage.update<{ value: number }>(key, (draft) => {
+ draft.value += 1
+ }),
+ ),
+ )
+
+ expect(await Storage.read<{ value: number }>(key)).toEqual({ value: 25 })
+ })
+ })
+
+ test("concurrent reads do not block each other", async () => {
+ await withScope(async (root) => {
+ const key = [...root, "concurrent", "reads"]
+ await Storage.write(key, { ok: true })
+
+ const results = await Promise.all(Array.from({ length: 10 }, () => Storage.read(key)))
+
+ expect(results).toHaveLength(10)
+ for (const r of results) expect(r).toEqual({ ok: true })
+ })
+ })
+
+ test("nested keys create deep paths", async () => {
+ await withScope(async (root) => {
+ const key = [...root, "a", "b", "c", "deep"]
+ await Storage.write<{ nested: boolean }>(key, { nested: true })
+
+ expect(await Storage.read<{ nested: boolean }>(key)).toEqual({ nested: true })
+ expect(await Storage.list([...root, "a"])).toEqual([key])
+ })
+ })
+
+ test("lists and removes stored entries", async () => {
+ await withScope(async (root) => {
+ const a = [...root, "list", "a"]
+ const b = [...root, "list", "b"]
+ const prefix = [...root, "list"]
+
+ await Storage.write(b, { value: 2 })
+ await Storage.write(a, { value: 1 })
+
+ expect(await Storage.list(prefix)).toEqual([a, b])
+
+ await Storage.remove(a)
+
+ expect(await Storage.list(prefix)).toEqual([b])
+ await expect(Storage.read(a)).rejects.toMatchObject({ name: "NotFoundError" })
+ })
+ })
+
+ test("migration 2 runs when marker contents are invalid", async () => {
+ await using tmp = await tmpdir()
+ const storage = path.join(tmp.path, "storage")
+ const diffs = [
+ { additions: 2, deletions: 1 },
+ { additions: 3, deletions: 4 },
+ ]
+
+ await text(path.join(storage, "migration"), "wat")
+ await write(path.join(storage, "session", "proj_test", "ses_test.json"), {
+ id: "ses_test",
+ projectID: "proj_test",
+ title: "legacy",
+ summary: { diffs },
+ })
+
+ await withStorage(tmp.path, async (run) => {
+ expect(await run(Storage.Service.use((svc) => svc.list(["session_diff"])))).toEqual([
+ ["session_diff", "ses_test"],
+ ])
+ expect(await run(Storage.Service.use((svc) => svc.read<typeof diffs>(["session_diff", "ses_test"])))).toEqual(
+ diffs,
+ )
+ expect(
+ await run(
+ Storage.Service.use((svc) =>
+ svc.read<{
+ id: string
+ projectID: string
+ title: string
+ summary: {
+ additions: number
+ deletions: number
+ }
+ }>(["session", "proj_test", "ses_test"]),
+ ),
+ ),
+ ).toEqual({
+ id: "ses_test",
+ projectID: "proj_test",
+ title: "legacy",
+ summary: {
+ additions: 5,
+ deletions: 5,
+ },
+ })
+ })
+
+ expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2")
+ })
+
+ test("migration 1 tolerates malformed legacy records", async () => {
+ await using tmp = await tmpdir({ git: true })
+ const storage = path.join(tmp.path, "storage")
+ const legacy = path.join(tmp.path, "project", "legacy")
+
+ await write(path.join(legacy, "storage", "session", "message", "probe", "0.json"), [])
+ await write(path.join(legacy, "storage", "session", "message", "probe", "1.json"), {
+ path: { root: tmp.path },
+ })
+ await write(path.join(legacy, "storage", "session", "info", "ses_legacy.json"), {
+ id: "ses_legacy",
+ title: "legacy",
+ })
+ await write(path.join(legacy, "storage", "session", "message", "ses_legacy", "msg_legacy.json"), {
+ role: "user",
+ text: "hello",
+ })
+
+ await withStorage(tmp.path, async (run) => {
+ const projects = await run(Storage.Service.use((svc) => svc.list(["project"])))
+ expect(projects).toHaveLength(1)
+ const project = projects[0]![1]
+
+ expect(await run(Storage.Service.use((svc) => svc.list(["session", project])))).toEqual([
+ ["session", project, "ses_legacy"],
+ ])
+ expect(
+ await run(
+ Storage.Service.use((svc) => svc.read<{ id: string; title: string }>(["session", project, "ses_legacy"])),
+ ),
+ ).toEqual({
+ id: "ses_legacy",
+ title: "legacy",
+ })
+ expect(
+ await run(
+ Storage.Service.use((svc) =>
+ svc.read<{ role: string; text: string }>(["message", "ses_legacy", "msg_legacy"]),
+ ),
+ ),
+ ).toEqual({
+ role: "user",
+ text: "hello",
+ })
+ })
+
+ expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2")
+ })
+
+ test("failed migrations do not advance the marker", async () => {
+ await using tmp = await tmpdir()
+ const storage = path.join(tmp.path, "storage")
+ const legacy = path.join(tmp.path, "project", "legacy")
+
+ await text(path.join(legacy, "storage", "session", "message", "probe", "0.json"), "{")
+
+ await withStorage(tmp.path, async (run) => {
+ expect(await run(Storage.Service.use((svc) => svc.list(["project"])))).toEqual([])
+ })
+
+ expect(await exists(path.join(storage, "migration"))).toBe(false)
+ })
+})