summaryrefslogtreecommitdiffhomepage
path: root/packages/shared/src/util
diff options
context:
space:
mode:
authorDax <[email protected]>2026-04-15 11:50:24 -0400
committerGitHub <[email protected]>2026-04-15 15:50:24 +0000
commit4ae7c77f8abda8d51ddf52ee6e07890fa19b6629 (patch)
treed1b2891cb58ffe0d7d2a9e3af67340921f9f9758 /packages/shared/src/util
parentf1751401aa2c53a4a0215c6deddf93df306aac8b (diff)
downloadopencode-4ae7c77f8abda8d51ddf52ee6e07890fa19b6629.tar.gz
opencode-4ae7c77f8abda8d51ddf52ee6e07890fa19b6629.zip
migrate: move flock and hash utilities to shared package (#22640)
Diffstat (limited to 'packages/shared/src/util')
-rw-r--r--packages/shared/src/util/flock.ts354
-rw-r--r--packages/shared/src/util/hash.ts7
2 files changed, 361 insertions, 0 deletions
diff --git a/packages/shared/src/util/flock.ts b/packages/shared/src/util/flock.ts
new file mode 100644
index 000000000..4a1df1dee
--- /dev/null
+++ b/packages/shared/src/util/flock.ts
@@ -0,0 +1,354 @@
+import path from "path"
+import os from "os"
+import { randomBytes, randomUUID } from "crypto"
+import { mkdir, readFile, rm, stat, utimes, writeFile } from "fs/promises"
+import { Hash } from "./hash"
+import { Effect } from "effect"
+
+export type FlockGlobal = {
+ state: string
+}
+
+export namespace Flock {
+ let global: FlockGlobal | undefined
+
+ export function setGlobal(g: FlockGlobal) {
+ global = g
+ }
+
+ const root = () => {
+ if (!global) throw new Error("Flock global not set")
+ return path.join(global.state, "locks")
+ }
+
+ // Defaults for callers that do not provide timing options.
+ const defaultOpts = {
+ staleMs: 60_000,
+ timeoutMs: 5 * 60_000,
+ baseDelayMs: 100,
+ maxDelayMs: 2_000,
+ }
+
+ export interface WaitEvent {
+ key: string
+ attempt: number
+ delay: number
+ waited: number
+ }
+
+ export type Wait = (input: WaitEvent) => void | Promise<void>
+
+ export interface Options {
+ dir?: string
+ signal?: AbortSignal
+ staleMs?: number
+ timeoutMs?: number
+ baseDelayMs?: number
+ maxDelayMs?: number
+ onWait?: Wait
+ }
+
+ type Opts = {
+ staleMs: number
+ timeoutMs: number
+ baseDelayMs: number
+ maxDelayMs: number
+ }
+
+ type Owned = {
+ acquired: true
+ startHeartbeat: (intervalMs?: number) => void
+ release: () => Promise<void>
+ }
+
+ export interface Lease {
+ release: () => Promise<void>
+ [Symbol.asyncDispose]: () => Promise<void>
+ }
+
+ function code(err: unknown) {
+ if (typeof err !== "object" || err === null || !("code" in err)) return
+ const value = err.code
+ if (typeof value !== "string") return
+ return value
+ }
+
+ function sleep(ms: number, signal?: AbortSignal) {
+ return new Promise<void>((resolve, reject) => {
+ if (signal?.aborted) {
+ reject(signal.reason ?? new Error("Aborted"))
+ return
+ }
+
+ let timer: NodeJS.Timeout | undefined
+
+ const done = () => {
+ signal?.removeEventListener("abort", abort)
+ resolve()
+ }
+
+ const abort = () => {
+ if (timer) {
+ clearTimeout(timer)
+ }
+ signal?.removeEventListener("abort", abort)
+ reject(signal?.reason ?? new Error("Aborted"))
+ }
+
+ signal?.addEventListener("abort", abort, { once: true })
+ timer = setTimeout(done, ms)
+ })
+ }
+
+ function jitter(ms: number) {
+ const j = Math.floor(ms * 0.3)
+ const d = Math.floor(Math.random() * (2 * j + 1)) - j
+ return Math.max(0, ms + d)
+ }
+
+ function mono() {
+ return performance.now()
+ }
+
+ function wall() {
+ return performance.timeOrigin + mono()
+ }
+
+ async function stats(file: string) {
+ try {
+ return await stat(file)
+ } catch (err) {
+ const errCode = code(err)
+ if (errCode === "ENOENT" || errCode === "ENOTDIR") return
+ throw err
+ }
+ }
+
+ async function stale(lockDir: string, heartbeatPath: string, metaPath: string, staleMs: number) {
+ // Stale detection allows automatic recovery after crashed owners.
+ const now = wall()
+ const heartbeat = await stats(heartbeatPath)
+ if (heartbeat) {
+ return now - heartbeat.mtimeMs > staleMs
+ }
+
+ const meta = await stats(metaPath)
+ if (meta) {
+ return now - meta.mtimeMs > staleMs
+ }
+
+ const dir = await stats(lockDir)
+ if (!dir) {
+ return false
+ }
+
+ return now - dir.mtimeMs > staleMs
+ }
+
+ async function tryAcquireLockDir(lockDir: string, opts: Opts): Promise<Owned | { acquired: false }> {
+ const token = randomUUID?.() ?? randomBytes(16).toString("hex")
+ const metaPath = path.join(lockDir, "meta.json")
+ const heartbeatPath = path.join(lockDir, "heartbeat")
+
+ try {
+ await mkdir(lockDir, { mode: 0o700 })
+ } catch (err) {
+ if (code(err) !== "EEXIST") {
+ throw err
+ }
+
+ if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) {
+ return { acquired: false }
+ }
+
+ const breakerPath = lockDir + ".breaker"
+ try {
+ await mkdir(breakerPath, { mode: 0o700 })
+ } catch (claimErr) {
+ const errCode = code(claimErr)
+ if (errCode === "EEXIST") {
+ const breaker = await stats(breakerPath)
+ if (breaker && wall() - breaker.mtimeMs > opts.staleMs) {
+ await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined)
+ }
+ return { acquired: false }
+ }
+
+ if (errCode === "ENOENT" || errCode === "ENOTDIR") {
+ return { acquired: false }
+ }
+
+ throw claimErr
+ }
+
+ try {
+ // Breaker ownership ensures only one contender performs stale cleanup.
+ if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) {
+ return { acquired: false }
+ }
+
+ await rm(lockDir, { recursive: true, force: true })
+
+ try {
+ await mkdir(lockDir, { mode: 0o700 })
+ } catch (retryErr) {
+ const errCode = code(retryErr)
+ if (errCode === "EEXIST" || errCode === "ENOTEMPTY") {
+ return { acquired: false }
+ }
+ throw retryErr
+ }
+ } finally {
+ await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined)
+ }
+ }
+
+ const meta = {
+ token,
+ pid: process.pid,
+ hostname: os.hostname(),
+ createdAt: new Date().toISOString(),
+ }
+
+ await writeFile(heartbeatPath, "", { flag: "wx" }).catch(async () => {
+ await rm(lockDir, { recursive: true, force: true })
+ throw new Error("Lock acquired but heartbeat already existed (possible compromise).")
+ })
+
+ await writeFile(metaPath, JSON.stringify(meta, null, 2), { flag: "wx" }).catch(async () => {
+ await rm(lockDir, { recursive: true, force: true })
+ throw new Error("Lock acquired but meta.json already existed (possible compromise).")
+ })
+
+ let timer: NodeJS.Timeout | undefined
+
+ const startHeartbeat = (intervalMs = Math.max(100, Math.floor(opts.staleMs / 3))) => {
+ if (timer) return
+ // Heartbeat prevents long critical sections from being evicted as stale.
+ timer = setInterval(() => {
+ const t = new Date()
+ void utimes(heartbeatPath, t, t).catch(() => undefined)
+ }, intervalMs)
+ timer.unref?.()
+ }
+
+ const release = async () => {
+ if (timer) {
+ clearInterval(timer)
+ timer = undefined
+ }
+
+ const current = await readFile(metaPath, "utf8")
+ .then((raw) => {
+ const parsed = JSON.parse(raw)
+ if (!parsed || typeof parsed !== "object") return {}
+ return {
+ token: "token" in parsed && typeof parsed.token === "string" ? parsed.token : undefined,
+ }
+ })
+ .catch((err) => {
+ const errCode = code(err)
+ if (errCode === "ENOENT" || errCode === "ENOTDIR") {
+ throw new Error("Refusing to release: lock is compromised (metadata missing).")
+ }
+ if (err instanceof SyntaxError) {
+ throw new Error("Refusing to release: lock is compromised (metadata invalid).")
+ }
+ throw err
+ })
+ // Token check prevents deleting a lock that was re-acquired by another process.
+ if (current.token !== token) {
+ throw new Error("Refusing to release: lock token mismatch (not the owner).")
+ }
+
+ await rm(lockDir, { recursive: true, force: true })
+ }
+
+ return {
+ acquired: true,
+ startHeartbeat,
+ release,
+ }
+ }
+
+ async function acquireLockDir(
+ lockDir: string,
+ input: { key: string; onWait?: Wait; signal?: AbortSignal },
+ opts: Opts,
+ ) {
+ const stop = mono() + opts.timeoutMs
+ let attempt = 0
+ let waited = 0
+ let delay = opts.baseDelayMs
+
+ while (true) {
+ input.signal?.throwIfAborted()
+
+ const res = await tryAcquireLockDir(lockDir, opts)
+ if (res.acquired) {
+ return res
+ }
+
+ if (mono() > stop) {
+ throw new Error(`Timed out waiting for lock: ${input.key}`)
+ }
+
+ attempt += 1
+ const ms = jitter(delay)
+ await input.onWait?.({
+ key: input.key,
+ attempt,
+ delay: ms,
+ waited,
+ })
+ await sleep(ms, input.signal)
+ waited += ms
+ delay = Math.min(opts.maxDelayMs, Math.floor(delay * 1.7))
+ }
+ }
+
+ export async function acquire(key: string, input: Options = {}): Promise<Lease> {
+ input.signal?.throwIfAborted()
+ const cfg: Opts = {
+ staleMs: input.staleMs ?? defaultOpts.staleMs,
+ timeoutMs: input.timeoutMs ?? defaultOpts.timeoutMs,
+ baseDelayMs: input.baseDelayMs ?? defaultOpts.baseDelayMs,
+ maxDelayMs: input.maxDelayMs ?? defaultOpts.maxDelayMs,
+ }
+ const dir = input.dir ?? root()
+
+ await mkdir(dir, { recursive: true })
+ const lockfile = path.join(dir, Hash.fast(key) + ".lock")
+ const lock = await acquireLockDir(
+ lockfile,
+ {
+ key,
+ onWait: input.onWait,
+ signal: input.signal,
+ },
+ cfg,
+ )
+ lock.startHeartbeat()
+
+ const release = () => lock.release()
+ return {
+ release,
+ [Symbol.asyncDispose]() {
+ return release()
+ },
+ }
+ }
+
+ export async function withLock<T>(key: string, fn: () => Promise<T>, input: Options = {}) {
+ await using _ = await acquire(key, input)
+ input.signal?.throwIfAborted()
+ return await fn()
+ }
+
+ export const effect = Effect.fn("Flock.effect")(function* (key: string) {
+ return yield* Effect.acquireRelease(
+ Effect.promise((signal) => Flock.acquire(key, { signal })),
+ (foo) => Effect.promise(() => foo.release()),
+ ).pipe(Effect.asVoid)
+ })
+}
diff --git a/packages/shared/src/util/hash.ts b/packages/shared/src/util/hash.ts
new file mode 100644
index 000000000..680e0f40b
--- /dev/null
+++ b/packages/shared/src/util/hash.ts
@@ -0,0 +1,7 @@
+import { createHash } from "crypto"
+
+export namespace Hash {
+ export function fast(input: string | Buffer): string {
+ return createHash("sha1").update(input).digest("hex")
+ }
+}