From 4ae7c77f8abda8d51ddf52ee6e07890fa19b6629 Mon Sep 17 00:00:00 2001 From: Dax Date: Wed, 15 Apr 2026 11:50:24 -0400 Subject: migrate: move flock and hash utilities to shared package (#22640) --- packages/shared/src/global.ts | 42 +++++ packages/shared/src/npm.ts | 247 ++++++++++++++++++++++++++ packages/shared/src/types.d.ts | 44 +++++ packages/shared/src/util/flock.ts | 354 ++++++++++++++++++++++++++++++++++++++ packages/shared/src/util/hash.ts | 7 + 5 files changed, 694 insertions(+) create mode 100644 packages/shared/src/global.ts create mode 100644 packages/shared/src/npm.ts create mode 100644 packages/shared/src/types.d.ts create mode 100644 packages/shared/src/util/flock.ts create mode 100644 packages/shared/src/util/hash.ts (limited to 'packages/shared/src') diff --git a/packages/shared/src/global.ts b/packages/shared/src/global.ts new file mode 100644 index 000000000..538cc091b --- /dev/null +++ b/packages/shared/src/global.ts @@ -0,0 +1,42 @@ +import path from "path" +import { xdgData, xdgCache, xdgConfig, xdgState } from "xdg-basedir" +import os from "os" +import { Context, Effect, Layer } from "effect" + +export namespace Global { + export class Service extends Context.Service()("@opencode/Global") {} + + export interface Interface { + readonly home: string + readonly data: string + readonly cache: string + readonly config: string + readonly state: string + readonly bin: string + readonly log: string + } + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const app = "opencode" + const home = process.env.OPENCODE_TEST_HOME ?? os.homedir() + const data = path.join(xdgData!, app) + const cache = path.join(xdgCache!, app) + const cfg = path.join(xdgConfig!, app) + const state = path.join(xdgState!, app) + const bin = path.join(cache, "bin") + const log = path.join(data, "log") + + return Service.of({ + home, + data, + cache, + config: cfg, + state, + bin, + log, + }) + }), + ) +} diff --git a/packages/shared/src/npm.ts b/packages/shared/src/npm.ts new file mode 100644 index 000000000..994ec04da --- /dev/null +++ b/packages/shared/src/npm.ts @@ -0,0 +1,247 @@ +import path from "path" +import semver from "semver" +import { Arborist } from "@npmcli/arborist" +import { Effect, Schema, Context, Layer, Option, FileSystem } from "effect" +import { NodeFileSystem } from "@effect/platform-node" +import { AppFileSystem } from "./filesystem" +import { Global } from "./global" +import { Flock } from "./util/flock" + +export namespace Npm { + export class InstallFailedError extends Schema.TaggedErrorClass()("NpmInstallFailedError", { + pkg: Schema.String, + cause: Schema.optional(Schema.Defect), + }) {} + + export interface EntryPoint { + readonly directory: string + readonly entrypoint: Option.Option + } + + export interface Interface { + readonly add: (pkg: string) => Effect.Effect + readonly install: (dir: string) => Effect.Effect + readonly outdated: (pkg: string, cachedVersion: string) => Effect.Effect + readonly which: (pkg: string) => Effect.Effect> + } + + export class Service extends Context.Service()("@opencode/Npm") {} + + const illegal = process.platform === "win32" ? new Set(["<", ">", ":", '"', "|", "?", "*"]) : undefined + + export function sanitize(pkg: string) { + if (!illegal) return pkg + return Array.from(pkg, (char) => (illegal.has(char) || char.charCodeAt(0) < 32 ? "_" : char)).join("") + } + + const resolveEntryPoint = (name: string, dir: string): EntryPoint => { + let entrypoint: Option.Option + try { + const resolved = typeof Bun !== "undefined" ? import.meta.resolve(name, dir) : import.meta.resolve(dir) + entrypoint = Option.some(resolved) + } catch { + entrypoint = Option.none() + } + return { + directory: dir, + entrypoint, + } + } + + interface ArboristNode { + name: string + path: string + } + + interface ArboristTree { + edgesOut: Map + } + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const afs = yield* AppFileSystem.Service + const global = yield* Global.Service + const fs = yield* FileSystem.FileSystem + const directory = (pkg: string) => path.join(global.cache, "packages", sanitize(pkg)) + + const outdated = Effect.fn("Npm.outdated")(function* (pkg: string, cachedVersion: string) { + const response = yield* Effect.tryPromise({ + try: () => fetch(`https://registry.npmjs.org/${pkg}`), + catch: () => undefined, + }).pipe(Effect.orElseSucceed(() => undefined)) + + if (!response || !response.ok) { + return false + } + + const data = yield* Effect.tryPromise({ + try: () => response.json() as Promise<{ "dist-tags"?: { latest?: string } }>, + catch: () => undefined, + }).pipe(Effect.orElseSucceed(() => undefined)) + + const latestVersion = data?.["dist-tags"]?.latest + if (!latestVersion) { + return false + } + + const range = /[\s^~*xX<>|=]/.test(cachedVersion) + if (range) return !semver.satisfies(latestVersion, cachedVersion) + + return semver.lt(cachedVersion, latestVersion) + }) + + const add = Effect.fn("Npm.add")(function* (pkg: string) { + const dir = directory(pkg) + yield* Flock.effect(`npm-install:${dir}`) + + const arborist = new Arborist({ + path: dir, + binLinks: true, + progress: false, + savePrefix: "", + ignoreScripts: true, + }) + + const tree = yield* Effect.tryPromise({ + try: () => arborist.loadVirtual().catch(() => undefined), + catch: () => undefined, + }).pipe(Effect.orElseSucceed(() => undefined)) as Effect.Effect + + if (tree) { + const first = tree.edgesOut.values().next().value?.to + if (first) { + return resolveEntryPoint(first.name, first.path) + } + } + + const result = yield* Effect.tryPromise({ + try: () => + arborist.reify({ + add: [pkg], + save: true, + saveType: "prod", + }), + catch: (cause) => new InstallFailedError({ pkg, cause }), + }) as Effect.Effect + + const first = result.edgesOut.values().next().value?.to + if (!first) { + return yield* new InstallFailedError({ pkg }) + } + + return resolveEntryPoint(first.name, first.path) + }, Effect.scoped) + + const install = Effect.fn("Npm.install")(function* (dir: string) { + yield* Flock.effect(`npm-install:${dir}`) + + const reify = Effect.fnUntraced(function* () { + const arb = new Arborist({ + path: dir, + binLinks: true, + progress: false, + savePrefix: "", + ignoreScripts: true, + }) + yield* Effect.tryPromise({ + try: () => arb.reify().catch(() => {}), + catch: () => {}, + }).pipe(Effect.orElseSucceed(() => {})) + }) + + const nodeModulesExists = yield* afs.existsSafe(path.join(dir, "node_modules")) + if (!nodeModulesExists) { + yield* reify() + return + } + + const pkg = yield* afs.readJson(path.join(dir, "package.json")).pipe(Effect.orElseSucceed(() => ({}))) + const lock = yield* afs.readJson(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => ({}))) + + const pkgAny = pkg as any + const lockAny = lock as any + + const declared = new Set([ + ...Object.keys(pkgAny?.dependencies || {}), + ...Object.keys(pkgAny?.devDependencies || {}), + ...Object.keys(pkgAny?.peerDependencies || {}), + ...Object.keys(pkgAny?.optionalDependencies || {}), + ]) + + const root = lockAny?.packages?.[""] || {} + const locked = new Set([ + ...Object.keys(root?.dependencies || {}), + ...Object.keys(root?.devDependencies || {}), + ...Object.keys(root?.peerDependencies || {}), + ...Object.keys(root?.optionalDependencies || {}), + ]) + + for (const name of declared) { + if (!locked.has(name)) { + yield* reify() + return + } + } + }, Effect.scoped) + + const which = Effect.fn("Npm.which")(function* (pkg: string) { + const dir = directory(pkg) + const binDir = path.join(dir, "node_modules", ".bin") + + const pick = Effect.fnUntraced(function* () { + const files = yield* fs.readDirectory(binDir).pipe(Effect.catch(() => Effect.succeed([] as string[]))) + + if (files.length === 0) return Option.none() + if (files.length === 1) return Option.some(files[0]) + + const pkgJson = yield* afs.readJson(path.join(dir, "node_modules", pkg, "package.json")).pipe(Effect.option) + + if (Option.isSome(pkgJson)) { + const parsed = pkgJson.value as { bin?: string | Record } + if (parsed?.bin) { + const unscoped = pkg.startsWith("@") ? pkg.split("/")[1] : pkg + const bin = parsed.bin + if (typeof bin === "string") return Option.some(unscoped) + const keys = Object.keys(bin) + if (keys.length === 1) return Option.some(keys[0]) + return bin[unscoped] ? Option.some(unscoped) : Option.some(keys[0]) + } + } + + return Option.some(files[0]) + }) + + return yield* Effect.gen(function* () { + const bin = yield* pick() + if (Option.isSome(bin)) { + return Option.some(path.join(binDir, bin.value)) + } + + yield* fs.remove(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => {})) + + yield* add(pkg) + + const resolved = yield* pick() + if (Option.isNone(resolved)) return Option.none() + return Option.some(path.join(binDir, resolved.value)) + }).pipe( + Effect.scoped, + Effect.orElseSucceed(() => Option.none()), + ) + }) + + return Service.of({ + add, + install, + outdated, + which, + }) + }), + ) + + export const defaultLayer = layer.pipe( + Layer.provide(AppFileSystem.layer), + Layer.provide(Global.layer), + Layer.provide(NodeFileSystem.layer), + ) +} diff --git a/packages/shared/src/types.d.ts b/packages/shared/src/types.d.ts new file mode 100644 index 000000000..b5d667f1d --- /dev/null +++ b/packages/shared/src/types.d.ts @@ -0,0 +1,44 @@ +declare module "@npmcli/arborist" { + export interface ArboristOptions { + path: string + binLinks?: boolean + progress?: boolean + savePrefix?: string + ignoreScripts?: boolean + } + + export interface ArboristNode { + name: string + path: string + } + + export interface ArboristEdge { + to?: ArboristNode + } + + export interface ArboristTree { + edgesOut: Map + } + + export interface ReifyOptions { + add?: string[] + save?: boolean + saveType?: "prod" | "dev" | "optional" | "peer" + } + + export class Arborist { + constructor(options: ArboristOptions) + loadVirtual(): Promise + reify(options?: ReifyOptions): Promise + } +} + +declare var Bun: + | { + file(path: string): { + text(): Promise + json(): Promise + } + write(path: string, content: string | Uint8Array): Promise + } + | undefined diff --git a/packages/shared/src/util/flock.ts b/packages/shared/src/util/flock.ts new file mode 100644 index 000000000..4a1df1dee --- /dev/null +++ b/packages/shared/src/util/flock.ts @@ -0,0 +1,354 @@ +import path from "path" +import os from "os" +import { randomBytes, randomUUID } from "crypto" +import { mkdir, readFile, rm, stat, utimes, writeFile } from "fs/promises" +import { Hash } from "./hash" +import { Effect } from "effect" + +export type FlockGlobal = { + state: string +} + +export namespace Flock { + let global: FlockGlobal | undefined + + export function setGlobal(g: FlockGlobal) { + global = g + } + + const root = () => { + if (!global) throw new Error("Flock global not set") + return path.join(global.state, "locks") + } + + // Defaults for callers that do not provide timing options. + const defaultOpts = { + staleMs: 60_000, + timeoutMs: 5 * 60_000, + baseDelayMs: 100, + maxDelayMs: 2_000, + } + + export interface WaitEvent { + key: string + attempt: number + delay: number + waited: number + } + + export type Wait = (input: WaitEvent) => void | Promise + + export interface Options { + dir?: string + signal?: AbortSignal + staleMs?: number + timeoutMs?: number + baseDelayMs?: number + maxDelayMs?: number + onWait?: Wait + } + + type Opts = { + staleMs: number + timeoutMs: number + baseDelayMs: number + maxDelayMs: number + } + + type Owned = { + acquired: true + startHeartbeat: (intervalMs?: number) => void + release: () => Promise + } + + export interface Lease { + release: () => Promise + [Symbol.asyncDispose]: () => Promise + } + + function code(err: unknown) { + if (typeof err !== "object" || err === null || !("code" in err)) return + const value = err.code + if (typeof value !== "string") return + return value + } + + function sleep(ms: number, signal?: AbortSignal) { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason ?? new Error("Aborted")) + return + } + + let timer: NodeJS.Timeout | undefined + + const done = () => { + signal?.removeEventListener("abort", abort) + resolve() + } + + const abort = () => { + if (timer) { + clearTimeout(timer) + } + signal?.removeEventListener("abort", abort) + reject(signal?.reason ?? new Error("Aborted")) + } + + signal?.addEventListener("abort", abort, { once: true }) + timer = setTimeout(done, ms) + }) + } + + function jitter(ms: number) { + const j = Math.floor(ms * 0.3) + const d = Math.floor(Math.random() * (2 * j + 1)) - j + return Math.max(0, ms + d) + } + + function mono() { + return performance.now() + } + + function wall() { + return performance.timeOrigin + mono() + } + + async function stats(file: string) { + try { + return await stat(file) + } catch (err) { + const errCode = code(err) + if (errCode === "ENOENT" || errCode === "ENOTDIR") return + throw err + } + } + + async function stale(lockDir: string, heartbeatPath: string, metaPath: string, staleMs: number) { + // Stale detection allows automatic recovery after crashed owners. + const now = wall() + const heartbeat = await stats(heartbeatPath) + if (heartbeat) { + return now - heartbeat.mtimeMs > staleMs + } + + const meta = await stats(metaPath) + if (meta) { + return now - meta.mtimeMs > staleMs + } + + const dir = await stats(lockDir) + if (!dir) { + return false + } + + return now - dir.mtimeMs > staleMs + } + + async function tryAcquireLockDir(lockDir: string, opts: Opts): Promise { + const token = randomUUID?.() ?? randomBytes(16).toString("hex") + const metaPath = path.join(lockDir, "meta.json") + const heartbeatPath = path.join(lockDir, "heartbeat") + + try { + await mkdir(lockDir, { mode: 0o700 }) + } catch (err) { + if (code(err) !== "EEXIST") { + throw err + } + + if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) { + return { acquired: false } + } + + const breakerPath = lockDir + ".breaker" + try { + await mkdir(breakerPath, { mode: 0o700 }) + } catch (claimErr) { + const errCode = code(claimErr) + if (errCode === "EEXIST") { + const breaker = await stats(breakerPath) + if (breaker && wall() - breaker.mtimeMs > opts.staleMs) { + await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined) + } + return { acquired: false } + } + + if (errCode === "ENOENT" || errCode === "ENOTDIR") { + return { acquired: false } + } + + throw claimErr + } + + try { + // Breaker ownership ensures only one contender performs stale cleanup. + if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) { + return { acquired: false } + } + + await rm(lockDir, { recursive: true, force: true }) + + try { + await mkdir(lockDir, { mode: 0o700 }) + } catch (retryErr) { + const errCode = code(retryErr) + if (errCode === "EEXIST" || errCode === "ENOTEMPTY") { + return { acquired: false } + } + throw retryErr + } + } finally { + await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined) + } + } + + const meta = { + token, + pid: process.pid, + hostname: os.hostname(), + createdAt: new Date().toISOString(), + } + + await writeFile(heartbeatPath, "", { flag: "wx" }).catch(async () => { + await rm(lockDir, { recursive: true, force: true }) + throw new Error("Lock acquired but heartbeat already existed (possible compromise).") + }) + + await writeFile(metaPath, JSON.stringify(meta, null, 2), { flag: "wx" }).catch(async () => { + await rm(lockDir, { recursive: true, force: true }) + throw new Error("Lock acquired but meta.json already existed (possible compromise).") + }) + + let timer: NodeJS.Timeout | undefined + + const startHeartbeat = (intervalMs = Math.max(100, Math.floor(opts.staleMs / 3))) => { + if (timer) return + // Heartbeat prevents long critical sections from being evicted as stale. + timer = setInterval(() => { + const t = new Date() + void utimes(heartbeatPath, t, t).catch(() => undefined) + }, intervalMs) + timer.unref?.() + } + + const release = async () => { + if (timer) { + clearInterval(timer) + timer = undefined + } + + const current = await readFile(metaPath, "utf8") + .then((raw) => { + const parsed = JSON.parse(raw) + if (!parsed || typeof parsed !== "object") return {} + return { + token: "token" in parsed && typeof parsed.token === "string" ? parsed.token : undefined, + } + }) + .catch((err) => { + const errCode = code(err) + if (errCode === "ENOENT" || errCode === "ENOTDIR") { + throw new Error("Refusing to release: lock is compromised (metadata missing).") + } + if (err instanceof SyntaxError) { + throw new Error("Refusing to release: lock is compromised (metadata invalid).") + } + throw err + }) + // Token check prevents deleting a lock that was re-acquired by another process. + if (current.token !== token) { + throw new Error("Refusing to release: lock token mismatch (not the owner).") + } + + await rm(lockDir, { recursive: true, force: true }) + } + + return { + acquired: true, + startHeartbeat, + release, + } + } + + async function acquireLockDir( + lockDir: string, + input: { key: string; onWait?: Wait; signal?: AbortSignal }, + opts: Opts, + ) { + const stop = mono() + opts.timeoutMs + let attempt = 0 + let waited = 0 + let delay = opts.baseDelayMs + + while (true) { + input.signal?.throwIfAborted() + + const res = await tryAcquireLockDir(lockDir, opts) + if (res.acquired) { + return res + } + + if (mono() > stop) { + throw new Error(`Timed out waiting for lock: ${input.key}`) + } + + attempt += 1 + const ms = jitter(delay) + await input.onWait?.({ + key: input.key, + attempt, + delay: ms, + waited, + }) + await sleep(ms, input.signal) + waited += ms + delay = Math.min(opts.maxDelayMs, Math.floor(delay * 1.7)) + } + } + + export async function acquire(key: string, input: Options = {}): Promise { + input.signal?.throwIfAborted() + const cfg: Opts = { + staleMs: input.staleMs ?? defaultOpts.staleMs, + timeoutMs: input.timeoutMs ?? defaultOpts.timeoutMs, + baseDelayMs: input.baseDelayMs ?? defaultOpts.baseDelayMs, + maxDelayMs: input.maxDelayMs ?? defaultOpts.maxDelayMs, + } + const dir = input.dir ?? root() + + await mkdir(dir, { recursive: true }) + const lockfile = path.join(dir, Hash.fast(key) + ".lock") + const lock = await acquireLockDir( + lockfile, + { + key, + onWait: input.onWait, + signal: input.signal, + }, + cfg, + ) + lock.startHeartbeat() + + const release = () => lock.release() + return { + release, + [Symbol.asyncDispose]() { + return release() + }, + } + } + + export async function withLock(key: string, fn: () => Promise, input: Options = {}) { + await using _ = await acquire(key, input) + input.signal?.throwIfAborted() + return await fn() + } + + export const effect = Effect.fn("Flock.effect")(function* (key: string) { + return yield* Effect.acquireRelease( + Effect.promise((signal) => Flock.acquire(key, { signal })), + (foo) => Effect.promise(() => foo.release()), + ).pipe(Effect.asVoid) + }) +} diff --git a/packages/shared/src/util/hash.ts b/packages/shared/src/util/hash.ts new file mode 100644 index 000000000..680e0f40b --- /dev/null +++ b/packages/shared/src/util/hash.ts @@ -0,0 +1,7 @@ +import { createHash } from "crypto" + +export namespace Hash { + export function fast(input: string | Buffer): string { + return createHash("sha1").update(input).digest("hex") + } +} -- cgit v1.2.3