diff options
| author | Dax <[email protected]> | 2026-04-25 10:59:17 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-04-25 10:59:17 -0400 |
| commit | 62ef2a220723a6d6cb050e523fcdfaa974dafdda (patch) | |
| tree | 214b03d016e18e4d8fe1bfc7209c1edd86547bbd /packages/shared/src/util/flock.ts | |
| parent | 37aa8442dc023fad250f2573c8235a544789900c (diff) | |
| download | opencode-62ef2a220723a6d6cb050e523fcdfaa974dafdda.tar.gz opencode-62ef2a220723a6d6cb050e523fcdfaa974dafdda.zip | |
refactor: rename shared package to core (#24309)
Diffstat (limited to 'packages/shared/src/util/flock.ts')
| -rw-r--r-- | packages/shared/src/util/flock.ts | 358 |
1 files changed, 0 insertions, 358 deletions
diff --git a/packages/shared/src/util/flock.ts b/packages/shared/src/util/flock.ts deleted file mode 100644 index 958bd9fd1..000000000 --- a/packages/shared/src/util/flock.ts +++ /dev/null @@ -1,358 +0,0 @@ -import path from "path" -import os from "os" -import { randomBytes, randomUUID } from "crypto" -import { mkdir, readFile, rm, stat, utimes, writeFile } from "fs/promises" -import { Hash } from "./hash" -import { Effect } from "effect" - -export type FlockGlobal = { - state: string -} - -export namespace Flock { - let global: FlockGlobal | undefined - - export function setGlobal(g: FlockGlobal) { - global = g - } - - const root = () => { - if (!global) throw new Error("Flock global not set") - return path.join(global.state, "locks") - } - - // Defaults for callers that do not provide timing options. - const defaultOpts = { - staleMs: 60_000, - timeoutMs: 5 * 60_000, - baseDelayMs: 100, - maxDelayMs: 2_000, - } - - export interface WaitEvent { - key: string - attempt: number - delay: number - waited: number - } - - export type Wait = (input: WaitEvent) => void | Promise<void> - - export interface Options { - dir?: string - signal?: AbortSignal - staleMs?: number - timeoutMs?: number - baseDelayMs?: number - maxDelayMs?: number - onWait?: Wait - } - - type Opts = { - staleMs: number - timeoutMs: number - baseDelayMs: number - maxDelayMs: number - } - - type Owned = { - acquired: true - startHeartbeat: (intervalMs?: number) => void - release: () => Promise<void> - } - - export interface Lease { - release: () => Promise<void> - [Symbol.asyncDispose]: () => Promise<void> - } - - function code(err: unknown) { - if (typeof err !== "object" || err === null || !("code" in err)) return - const value = err.code - if (typeof value !== "string") return - return value - } - - function sleep(ms: number, signal?: AbortSignal) { - return new Promise<void>((resolve, reject) => { - if (signal?.aborted) { - reject(signal.reason ?? new Error("Aborted")) - return - } - - let timer: NodeJS.Timeout | undefined - - const done = () => { - signal?.removeEventListener("abort", abort) - resolve() - } - - const abort = () => { - if (timer) { - clearTimeout(timer) - } - signal?.removeEventListener("abort", abort) - reject(signal?.reason ?? new Error("Aborted")) - } - - signal?.addEventListener("abort", abort, { once: true }) - timer = setTimeout(done, ms) - }) - } - - function jitter(ms: number) { - const j = Math.floor(ms * 0.3) - const d = Math.floor(Math.random() * (2 * j + 1)) - j - return Math.max(0, ms + d) - } - - function mono() { - return performance.now() - } - - function wall() { - return performance.timeOrigin + mono() - } - - async function stats(file: string) { - try { - return await stat(file) - } catch (err) { - const errCode = code(err) - if (errCode === "ENOENT" || errCode === "ENOTDIR") return - throw err - } - } - - async function stale(lockDir: string, heartbeatPath: string, metaPath: string, staleMs: number) { - // Stale detection allows automatic recovery after crashed owners. - const now = wall() - const heartbeat = await stats(heartbeatPath) - if (heartbeat) { - return now - heartbeat.mtimeMs > staleMs - } - - const meta = await stats(metaPath) - if (meta) { - return now - meta.mtimeMs > staleMs - } - - const dir = await stats(lockDir) - if (!dir) { - return false - } - - return now - dir.mtimeMs > staleMs - } - - async function tryAcquireLockDir(lockDir: string, opts: Opts): Promise<Owned | { acquired: false }> { - const token = randomUUID?.() ?? randomBytes(16).toString("hex") - const metaPath = path.join(lockDir, "meta.json") - const heartbeatPath = path.join(lockDir, "heartbeat") - - try { - await mkdir(lockDir, { mode: 0o700 }) - } catch (err) { - if (code(err) !== "EEXIST") { - throw err - } - - if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) { - return { acquired: false } - } - - const breakerPath = lockDir + ".breaker" - try { - await mkdir(breakerPath, { mode: 0o700 }) - } catch (claimErr) { - const errCode = code(claimErr) - if (errCode === "EEXIST") { - const breaker = await stats(breakerPath) - if (breaker && wall() - breaker.mtimeMs > opts.staleMs) { - await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined) - } - return { acquired: false } - } - - if (errCode === "ENOENT" || errCode === "ENOTDIR") { - return { acquired: false } - } - - throw claimErr - } - - try { - // Breaker ownership ensures only one contender performs stale cleanup. - if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) { - return { acquired: false } - } - - await rm(lockDir, { recursive: true, force: true }) - - try { - await mkdir(lockDir, { mode: 0o700 }) - } catch (retryErr) { - const errCode = code(retryErr) - if (errCode === "EEXIST" || errCode === "ENOTEMPTY") { - return { acquired: false } - } - throw retryErr - } - } finally { - await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined) - } - } - - const meta = { - token, - pid: process.pid, - hostname: os.hostname(), - createdAt: new Date().toISOString(), - } - - await writeFile(heartbeatPath, "", { flag: "wx" }).catch(async () => { - await rm(lockDir, { recursive: true, force: true }) - throw new Error("Lock acquired but heartbeat already existed (possible compromise).") - }) - - await writeFile(metaPath, JSON.stringify(meta, null, 2), { flag: "wx" }).catch(async () => { - await rm(lockDir, { recursive: true, force: true }) - throw new Error("Lock acquired but meta.json already existed (possible compromise).") - }) - - let timer: NodeJS.Timeout | undefined - - const startHeartbeat = (intervalMs = Math.max(100, Math.floor(opts.staleMs / 3))) => { - if (timer) return - // Heartbeat prevents long critical sections from being evicted as stale. - timer = setInterval(() => { - const t = new Date() - void utimes(heartbeatPath, t, t).catch(() => undefined) - }, intervalMs) - timer.unref?.() - } - - const release = async () => { - if (timer) { - clearInterval(timer) - timer = undefined - } - - const current = await readFile(metaPath, "utf8") - .then((raw) => { - const parsed = JSON.parse(raw) - if (!parsed || typeof parsed !== "object") return {} - return { - token: "token" in parsed && typeof parsed.token === "string" ? parsed.token : undefined, - } - }) - .catch((err) => { - const errCode = code(err) - if (errCode === "ENOENT" || errCode === "ENOTDIR") { - throw new Error("Refusing to release: lock is compromised (metadata missing).") - } - if (err instanceof SyntaxError) { - throw new Error("Refusing to release: lock is compromised (metadata invalid).") - } - throw err - }) - // Token check prevents deleting a lock that was re-acquired by another process. - if (current.token !== token) { - throw new Error("Refusing to release: lock token mismatch (not the owner).") - } - - await rm(lockDir, { recursive: true, force: true }) - } - - return { - acquired: true, - startHeartbeat, - release, - } - } - - async function acquireLockDir( - lockDir: string, - input: { key: string; onWait?: Wait; signal?: AbortSignal }, - opts: Opts, - ) { - const stop = mono() + opts.timeoutMs - let attempt = 0 - let waited = 0 - let delay = opts.baseDelayMs - - while (true) { - input.signal?.throwIfAborted() - - const res = await tryAcquireLockDir(lockDir, opts) - if (res.acquired) { - return res - } - - if (mono() > stop) { - throw new Error(`Timed out waiting for lock: ${input.key}`) - } - - attempt += 1 - const ms = jitter(delay) - await input.onWait?.({ - key: input.key, - attempt, - delay: ms, - waited, - }) - await sleep(ms, input.signal) - waited += ms - delay = Math.min(opts.maxDelayMs, Math.floor(delay * 1.7)) - } - } - - export async function acquire(key: string, input: Options = {}): Promise<Lease> { - input.signal?.throwIfAborted() - const cfg: Opts = { - staleMs: input.staleMs ?? defaultOpts.staleMs, - timeoutMs: input.timeoutMs ?? defaultOpts.timeoutMs, - baseDelayMs: input.baseDelayMs ?? defaultOpts.baseDelayMs, - maxDelayMs: input.maxDelayMs ?? defaultOpts.maxDelayMs, - } - const dir = input.dir ?? root() - - await mkdir(dir, { recursive: true }) - const lockfile = path.join(dir, Hash.fast(key) + ".lock") - const lock = await acquireLockDir( - lockfile, - { - key, - onWait: input.onWait, - signal: input.signal, - }, - cfg, - ) - lock.startHeartbeat() - - const release = () => lock.release() - return { - release, - [Symbol.asyncDispose]() { - return release() - }, - } - } - - export async function withLock<T>(key: string, fn: () => Promise<T>, input: Options = {}) { - await using _ = await acquire(key, input) - input.signal?.throwIfAborted() - return await fn() - } - - export const effect = Effect.fn("Flock.effect")(function* (key: string, input: Options = {}) { - return yield* Effect.acquireRelease( - Effect.promise((signal) => Flock.acquire(key, { ...input, signal })).pipe( - Effect.withSpan("Flock.acquire", { - attributes: { key }, - }), - ), - (lock) => Effect.promise(() => lock.release()).pipe(Effect.withSpan("Flock.release")), - ).pipe(Effect.asVoid) - }) -} |
