diff options
| author | Dax <[email protected]> | 2026-04-15 11:50:24 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-04-15 15:50:24 +0000 |
| commit | 4ae7c77f8abda8d51ddf52ee6e07890fa19b6629 (patch) | |
| tree | d1b2891cb58ffe0d7d2a9e3af67340921f9f9758 /packages/shared/src/util | |
| parent | f1751401aa2c53a4a0215c6deddf93df306aac8b (diff) | |
| download | opencode-4ae7c77f8abda8d51ddf52ee6e07890fa19b6629.tar.gz opencode-4ae7c77f8abda8d51ddf52ee6e07890fa19b6629.zip | |
migrate: move flock and hash utilities to shared package (#22640)
Diffstat (limited to 'packages/shared/src/util')
| -rw-r--r-- | packages/shared/src/util/flock.ts | 354 | ||||
| -rw-r--r-- | packages/shared/src/util/hash.ts | 7 |
2 files changed, 361 insertions, 0 deletions
diff --git a/packages/shared/src/util/flock.ts b/packages/shared/src/util/flock.ts new file mode 100644 index 000000000..4a1df1dee --- /dev/null +++ b/packages/shared/src/util/flock.ts @@ -0,0 +1,354 @@ +import path from "path" +import os from "os" +import { randomBytes, randomUUID } from "crypto" +import { mkdir, readFile, rm, stat, utimes, writeFile } from "fs/promises" +import { Hash } from "./hash" +import { Effect } from "effect" + +export type FlockGlobal = { + state: string +} + +export namespace Flock { + let global: FlockGlobal | undefined + + export function setGlobal(g: FlockGlobal) { + global = g + } + + const root = () => { + if (!global) throw new Error("Flock global not set") + return path.join(global.state, "locks") + } + + // Defaults for callers that do not provide timing options. + const defaultOpts = { + staleMs: 60_000, + timeoutMs: 5 * 60_000, + baseDelayMs: 100, + maxDelayMs: 2_000, + } + + export interface WaitEvent { + key: string + attempt: number + delay: number + waited: number + } + + export type Wait = (input: WaitEvent) => void | Promise<void> + + export interface Options { + dir?: string + signal?: AbortSignal + staleMs?: number + timeoutMs?: number + baseDelayMs?: number + maxDelayMs?: number + onWait?: Wait + } + + type Opts = { + staleMs: number + timeoutMs: number + baseDelayMs: number + maxDelayMs: number + } + + type Owned = { + acquired: true + startHeartbeat: (intervalMs?: number) => void + release: () => Promise<void> + } + + export interface Lease { + release: () => Promise<void> + [Symbol.asyncDispose]: () => Promise<void> + } + + function code(err: unknown) { + if (typeof err !== "object" || err === null || !("code" in err)) return + const value = err.code + if (typeof value !== "string") return + return value + } + + function sleep(ms: number, signal?: AbortSignal) { + return new Promise<void>((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason ?? new Error("Aborted")) + return + } + + let timer: NodeJS.Timeout | undefined + + const done = () => { + signal?.removeEventListener("abort", abort) + resolve() + } + + const abort = () => { + if (timer) { + clearTimeout(timer) + } + signal?.removeEventListener("abort", abort) + reject(signal?.reason ?? new Error("Aborted")) + } + + signal?.addEventListener("abort", abort, { once: true }) + timer = setTimeout(done, ms) + }) + } + + function jitter(ms: number) { + const j = Math.floor(ms * 0.3) + const d = Math.floor(Math.random() * (2 * j + 1)) - j + return Math.max(0, ms + d) + } + + function mono() { + return performance.now() + } + + function wall() { + return performance.timeOrigin + mono() + } + + async function stats(file: string) { + try { + return await stat(file) + } catch (err) { + const errCode = code(err) + if (errCode === "ENOENT" || errCode === "ENOTDIR") return + throw err + } + } + + async function stale(lockDir: string, heartbeatPath: string, metaPath: string, staleMs: number) { + // Stale detection allows automatic recovery after crashed owners. + const now = wall() + const heartbeat = await stats(heartbeatPath) + if (heartbeat) { + return now - heartbeat.mtimeMs > staleMs + } + + const meta = await stats(metaPath) + if (meta) { + return now - meta.mtimeMs > staleMs + } + + const dir = await stats(lockDir) + if (!dir) { + return false + } + + return now - dir.mtimeMs > staleMs + } + + async function tryAcquireLockDir(lockDir: string, opts: Opts): Promise<Owned | { acquired: false }> { + const token = randomUUID?.() ?? randomBytes(16).toString("hex") + const metaPath = path.join(lockDir, "meta.json") + const heartbeatPath = path.join(lockDir, "heartbeat") + + try { + await mkdir(lockDir, { mode: 0o700 }) + } catch (err) { + if (code(err) !== "EEXIST") { + throw err + } + + if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) { + return { acquired: false } + } + + const breakerPath = lockDir + ".breaker" + try { + await mkdir(breakerPath, { mode: 0o700 }) + } catch (claimErr) { + const errCode = code(claimErr) + if (errCode === "EEXIST") { + const breaker = await stats(breakerPath) + if (breaker && wall() - breaker.mtimeMs > opts.staleMs) { + await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined) + } + return { acquired: false } + } + + if (errCode === "ENOENT" || errCode === "ENOTDIR") { + return { acquired: false } + } + + throw claimErr + } + + try { + // Breaker ownership ensures only one contender performs stale cleanup. + if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) { + return { acquired: false } + } + + await rm(lockDir, { recursive: true, force: true }) + + try { + await mkdir(lockDir, { mode: 0o700 }) + } catch (retryErr) { + const errCode = code(retryErr) + if (errCode === "EEXIST" || errCode === "ENOTEMPTY") { + return { acquired: false } + } + throw retryErr + } + } finally { + await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined) + } + } + + const meta = { + token, + pid: process.pid, + hostname: os.hostname(), + createdAt: new Date().toISOString(), + } + + await writeFile(heartbeatPath, "", { flag: "wx" }).catch(async () => { + await rm(lockDir, { recursive: true, force: true }) + throw new Error("Lock acquired but heartbeat already existed (possible compromise).") + }) + + await writeFile(metaPath, JSON.stringify(meta, null, 2), { flag: "wx" }).catch(async () => { + await rm(lockDir, { recursive: true, force: true }) + throw new Error("Lock acquired but meta.json already existed (possible compromise).") + }) + + let timer: NodeJS.Timeout | undefined + + const startHeartbeat = (intervalMs = Math.max(100, Math.floor(opts.staleMs / 3))) => { + if (timer) return + // Heartbeat prevents long critical sections from being evicted as stale. + timer = setInterval(() => { + const t = new Date() + void utimes(heartbeatPath, t, t).catch(() => undefined) + }, intervalMs) + timer.unref?.() + } + + const release = async () => { + if (timer) { + clearInterval(timer) + timer = undefined + } + + const current = await readFile(metaPath, "utf8") + .then((raw) => { + const parsed = JSON.parse(raw) + if (!parsed || typeof parsed !== "object") return {} + return { + token: "token" in parsed && typeof parsed.token === "string" ? parsed.token : undefined, + } + }) + .catch((err) => { + const errCode = code(err) + if (errCode === "ENOENT" || errCode === "ENOTDIR") { + throw new Error("Refusing to release: lock is compromised (metadata missing).") + } + if (err instanceof SyntaxError) { + throw new Error("Refusing to release: lock is compromised (metadata invalid).") + } + throw err + }) + // Token check prevents deleting a lock that was re-acquired by another process. + if (current.token !== token) { + throw new Error("Refusing to release: lock token mismatch (not the owner).") + } + + await rm(lockDir, { recursive: true, force: true }) + } + + return { + acquired: true, + startHeartbeat, + release, + } + } + + async function acquireLockDir( + lockDir: string, + input: { key: string; onWait?: Wait; signal?: AbortSignal }, + opts: Opts, + ) { + const stop = mono() + opts.timeoutMs + let attempt = 0 + let waited = 0 + let delay = opts.baseDelayMs + + while (true) { + input.signal?.throwIfAborted() + + const res = await tryAcquireLockDir(lockDir, opts) + if (res.acquired) { + return res + } + + if (mono() > stop) { + throw new Error(`Timed out waiting for lock: ${input.key}`) + } + + attempt += 1 + const ms = jitter(delay) + await input.onWait?.({ + key: input.key, + attempt, + delay: ms, + waited, + }) + await sleep(ms, input.signal) + waited += ms + delay = Math.min(opts.maxDelayMs, Math.floor(delay * 1.7)) + } + } + + export async function acquire(key: string, input: Options = {}): Promise<Lease> { + input.signal?.throwIfAborted() + const cfg: Opts = { + staleMs: input.staleMs ?? defaultOpts.staleMs, + timeoutMs: input.timeoutMs ?? defaultOpts.timeoutMs, + baseDelayMs: input.baseDelayMs ?? defaultOpts.baseDelayMs, + maxDelayMs: input.maxDelayMs ?? defaultOpts.maxDelayMs, + } + const dir = input.dir ?? root() + + await mkdir(dir, { recursive: true }) + const lockfile = path.join(dir, Hash.fast(key) + ".lock") + const lock = await acquireLockDir( + lockfile, + { + key, + onWait: input.onWait, + signal: input.signal, + }, + cfg, + ) + lock.startHeartbeat() + + const release = () => lock.release() + return { + release, + [Symbol.asyncDispose]() { + return release() + }, + } + } + + export async function withLock<T>(key: string, fn: () => Promise<T>, input: Options = {}) { + await using _ = await acquire(key, input) + input.signal?.throwIfAborted() + return await fn() + } + + export const effect = Effect.fn("Flock.effect")(function* (key: string) { + return yield* Effect.acquireRelease( + Effect.promise((signal) => Flock.acquire(key, { signal })), + (foo) => Effect.promise(() => foo.release()), + ).pipe(Effect.asVoid) + }) +} diff --git a/packages/shared/src/util/hash.ts b/packages/shared/src/util/hash.ts new file mode 100644 index 000000000..680e0f40b --- /dev/null +++ b/packages/shared/src/util/hash.ts @@ -0,0 +1,7 @@ +import { createHash } from "crypto" + +export namespace Hash { + export function fast(input: string | Buffer): string { + return createHash("sha1").update(input).digest("hex") + } +} |
