diff options
| author | Dax <[email protected]> | 2026-04-15 11:50:24 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-04-15 15:50:24 +0000 |
| commit | 4ae7c77f8abda8d51ddf52ee6e07890fa19b6629 (patch) | |
| tree | d1b2891cb58ffe0d7d2a9e3af67340921f9f9758 /packages/shared/test/util | |
| parent | f1751401aa2c53a4a0215c6deddf93df306aac8b (diff) | |
| download | opencode-4ae7c77f8abda8d51ddf52ee6e07890fa19b6629.tar.gz opencode-4ae7c77f8abda8d51ddf52ee6e07890fa19b6629.zip | |
migrate: move flock and hash utilities to shared package (#22640)
Diffstat (limited to 'packages/shared/test/util')
| -rw-r--r-- | packages/shared/test/util/flock.test.ts | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/packages/shared/test/util/flock.test.ts b/packages/shared/test/util/flock.test.ts new file mode 100644 index 000000000..f1053dfd2 --- /dev/null +++ b/packages/shared/test/util/flock.test.ts @@ -0,0 +1,426 @@ +import { describe, expect, test } from "bun:test" +import fs from "fs/promises" +import { spawn } from "child_process" +import path from "path" +import os from "os" +import { Flock } from "@opencode-ai/shared/util/flock" +import { Hash } from "@opencode-ai/shared/util/hash" + +type Msg = { + key: string + dir: string + staleMs?: number + timeoutMs?: number + baseDelayMs?: number + maxDelayMs?: number + holdMs?: number + ready?: string + active?: string + done?: string +} + +const root = path.join(import.meta.dir, "../..") +const worker = path.join(import.meta.dir, "../fixture/flock-worker.ts") + +async function tmpdir() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "flock-test-")) + return { + path: dir, + async [Symbol.asyncDispose]() { + await fs.rm(dir, { recursive: true, force: true }) + }, + } +} + +function lock(dir: string, key: string) { + return path.join(dir, Hash.fast(key) + ".lock") +} + +function sleep(ms: number) { + return new Promise<void>((resolve) => { + setTimeout(resolve, ms) + }) +} + +async function exists(file: string) { + return fs + .stat(file) + .then(() => true) + .catch(() => false) +} + +async function wait(file: string, timeout = 3_000) { + const stop = Date.now() + timeout + while (Date.now() < stop) { + if (await exists(file)) return + await sleep(20) + } + + throw new Error(`Timed out waiting for file: ${file}`) +} + +function run(msg: Msg) { + return new Promise<{ code: number; stdout: Buffer; stderr: Buffer }>((resolve) => { + const proc = spawn(process.execPath, [worker, JSON.stringify(msg)], { + cwd: root, + }) + + const stdout: Buffer[] = [] + const stderr: Buffer[] = [] + + proc.stdout?.on("data", (data) => stdout.push(Buffer.from(data))) + proc.stderr?.on("data", (data) => stderr.push(Buffer.from(data))) + + proc.on("close", (code) => { + resolve({ + code: code ?? 1, + stdout: Buffer.concat(stdout), + stderr: Buffer.concat(stderr), + }) + }) + }) +} + +function spawnWorker(msg: Msg) { + return spawn(process.execPath, [worker, JSON.stringify(msg)], { + cwd: root, + stdio: ["ignore", "pipe", "pipe"], + }) +} + +function stopWorker(proc: ReturnType<typeof spawnWorker>) { + if (proc.exitCode !== null || proc.signalCode !== null) return Promise.resolve() + + if (process.platform !== "win32" || !proc.pid) { + proc.kill() + return Promise.resolve() + } + + return new Promise<void>((resolve) => { + const killProc = spawn("taskkill", ["/pid", String(proc.pid), "/T", "/F"]) + killProc.on("close", () => { + proc.kill() + resolve() + }) + }) +} + +async function readJson<T>(p: string): Promise<T> { + return JSON.parse(await fs.readFile(p, "utf8")) +} + +describe("util.flock", () => { + test("enforces mutual exclusion under process contention", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const done = path.join(tmp.path, "done.log") + const active = path.join(tmp.path, "active") + const key = "flock:stress" + const n = 16 + + const out = await Promise.all( + Array.from({ length: n }, () => + run({ + key, + dir, + done, + active, + holdMs: 30, + staleMs: 1_000, + timeoutMs: 15_000, + }), + ), + ) + + expect(out.map((x) => x.code)).toEqual(Array.from({ length: n }, () => 0)) + expect(out.map((x) => x.stderr.toString()).filter(Boolean)).toEqual([]) + + const lines = (await fs.readFile(done, "utf8")) + .split("\n") + .map((x) => x.trim()) + .filter(Boolean) + expect(lines.length).toBe(n) + }, 20_000) + + test("times out while waiting when lock is still healthy", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:timeout" + const ready = path.join(tmp.path, "ready") + const proc = spawnWorker({ + key, + dir, + ready, + holdMs: 20_000, + staleMs: 10_000, + timeoutMs: 30_000, + }) + + try { + await wait(ready, 5_000) + const seen: string[] = [] + const err = await Flock.withLock(key, async () => {}, { + dir, + staleMs: 10_000, + timeoutMs: 1_000, + onWait: (tick) => { + seen.push(tick.key) + }, + }).catch((err) => err) + + expect(err).toBeInstanceOf(Error) + if (!(err instanceof Error)) throw err + expect(err.message).toContain("Timed out waiting for lock") + expect(seen.length).toBeGreaterThan(0) + expect(seen.every((x) => x === key)).toBe(true) + } finally { + await stopWorker(proc).catch(() => undefined) + await new Promise((resolve) => proc.on("close", resolve)) + } + }, 15_000) + + test("recovers after a crashed lock owner", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:crash" + const ready = path.join(tmp.path, "ready") + const proc = spawnWorker({ + key, + dir, + ready, + holdMs: 20_000, + staleMs: 500, + timeoutMs: 30_000, + }) + + await wait(ready, 5_000) + await stopWorker(proc) + await new Promise((resolve) => proc.on("close", resolve)) + + let hit = false + await Flock.withLock( + key, + async () => { + hit = true + }, + { + dir, + staleMs: 500, + timeoutMs: 8_000, + }, + ) + + expect(hit).toBe(true) + }, 20_000) + + test("breaks stale lock dirs when heartbeat is missing", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:missing-heartbeat" + const lockDir = lock(dir, key) + + await fs.mkdir(lockDir, { recursive: true }) + const old = new Date(Date.now() - 2_000) + await fs.utimes(lockDir, old, old) + + let hit = false + await Flock.withLock( + key, + async () => { + hit = true + }, + { + dir, + staleMs: 200, + timeoutMs: 3_000, + }, + ) + + expect(hit).toBe(true) + }) + + test("recovers when a stale breaker claim was left behind", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:stale-breaker" + const lockDir = lock(dir, key) + const breaker = lockDir + ".breaker" + + await fs.mkdir(lockDir, { recursive: true }) + await fs.mkdir(breaker) + + const old = new Date(Date.now() - 2_000) + await fs.utimes(lockDir, old, old) + await fs.utimes(breaker, old, old) + + let hit = false + await Flock.withLock( + key, + async () => { + hit = true + }, + { + dir, + staleMs: 200, + timeoutMs: 3_000, + }, + ) + + expect(hit).toBe(true) + expect(await exists(breaker)).toBe(false) + }) + + test("fails clearly if lock dir is removed while held", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:compromised" + const lockDir = lock(dir, key) + + const err = await Flock.withLock( + key, + async () => { + await fs.rm(lockDir, { + recursive: true, + force: true, + }) + }, + { + dir, + staleMs: 1_000, + timeoutMs: 3_000, + }, + ).catch((err) => err) + + expect(err).toBeInstanceOf(Error) + if (!(err instanceof Error)) throw err + expect(err.message).toContain("compromised") + + let hit = false + await Flock.withLock( + key, + async () => { + hit = true + }, + { + dir, + staleMs: 200, + timeoutMs: 3_000, + }, + ) + expect(hit).toBe(true) + }) + + test("writes owner metadata while lock is held", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:meta" + const file = path.join(lock(dir, key), "meta.json") + + await Flock.withLock( + key, + async () => { + const json = await readJson<{ + token?: unknown + pid?: unknown + hostname?: unknown + createdAt?: unknown + }>(file) + + expect(typeof json.token).toBe("string") + expect(typeof json.pid).toBe("number") + expect(typeof json.hostname).toBe("string") + expect(typeof json.createdAt).toBe("string") + }, + { + dir, + staleMs: 1_000, + timeoutMs: 3_000, + }, + ) + }) + + test("supports acquire with await using", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:acquire" + const lockDir = lock(dir, key) + + { + await using _ = await Flock.acquire(key, { + dir, + staleMs: 1_000, + timeoutMs: 3_000, + }) + expect(await exists(lockDir)).toBe(true) + } + + expect(await exists(lockDir)).toBe(false) + }) + + test("refuses token mismatch release and recovers from stale", async () => { + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:token" + const lockDir = lock(dir, key) + const meta = path.join(lockDir, "meta.json") + + const err = await Flock.withLock( + key, + async () => { + const json = await readJson<{ token?: string }>(meta) + json.token = "tampered" + await fs.writeFile(meta, JSON.stringify(json, null, 2)) + }, + { + dir, + staleMs: 500, + timeoutMs: 3_000, + }, + ).catch((err) => err) + + expect(err).toBeInstanceOf(Error) + if (!(err instanceof Error)) throw err + expect(err.message).toContain("token mismatch") + expect(await exists(lockDir)).toBe(true) + + let hit = false + await Flock.withLock( + key, + async () => { + hit = true + }, + { + dir, + staleMs: 500, + timeoutMs: 6_000, + }, + ) + expect(hit).toBe(true) + }) + + test("fails clearly on unwritable lock roots", async () => { + if (process.platform === "win32") return + + await using tmp = await tmpdir() + const dir = path.join(tmp.path, "locks") + const key = "flock:perm" + + await fs.mkdir(dir, { recursive: true }) + await fs.chmod(dir, 0o500) + + try { + const err = await Flock.withLock(key, async () => {}, { + dir, + staleMs: 100, + timeoutMs: 500, + }).catch((err) => err) + + expect(err).toBeInstanceOf(Error) + if (!(err instanceof Error)) throw err + const text = err.message + expect(text.includes("EACCES") || text.includes("EPERM")).toBe(true) + } finally { + await fs.chmod(dir, 0o700) + } + }) +}) |
