diff options
| author | Dax Raad <[email protected]> | 2026-04-25 14:22:49 -0400 |
|---|---|---|
| committer | Dax Raad <[email protected]> | 2026-04-25 14:23:17 -0400 |
| commit | 3eee2f6afa5c7fde20d0e838143832b681795d9f (patch) | |
| tree | e2ee4385e034a921504e782eba5cfb2822879b4b /packages/core | |
| parent | ff4b60e1f3d2ad83a74785443dd041ed4e7814bf (diff) | |
| download | opencode-3eee2f6afa5c7fde20d0e838143832b681795d9f.tar.gz opencode-3eee2f6afa5c7fde20d0e838143832b681795d9f.zip | |
core: move cross-spawn-spawner from opencode to core package
Moved the cross-spawn-spawner module from packages/opencode to packages/core
to enable code sharing across the monorepo. This consolidates the process
spawning infrastructure into the core package so other packages can use
cross-platform child process spawning without duplicating the implementation.
Updated all import statements across the codebase to reference the new
location (@opencode-ai/core/effect/cross-spawn-spawner). Removed the
local copy from the opencode package along with its tests.
Diffstat (limited to 'packages/core')
| -rw-r--r-- | packages/core/package.json | 6 | ||||
| -rw-r--r-- | packages/core/src/effect/cross-spawn-spawner.ts | 505 | ||||
| -rw-r--r-- | packages/core/test/effect/cross-spawn-spawner.test.ts | 423 |
3 files changed, 930 insertions, 4 deletions
diff --git a/packages/core/package.json b/packages/core/package.json index a244ea8b4..bd826de35 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -18,23 +18,21 @@ "imports": {}, "devDependencies": { "@tsconfig/bun": "catalog:", - "@types/semver": "catalog:", "@types/bun": "catalog:", - "@types/npmcli__arborist": "6.3.3" + "@types/cross-spawn": "catalog:" }, "dependencies": { "@effect/opentelemetry": "catalog:", "@effect/platform-node": "catalog:", - "@npmcli/arborist": "catalog:", "@opentelemetry/api": "1.9.0", "@opentelemetry/context-async-hooks": "2.6.1", "@opentelemetry/exporter-trace-otlp-http": "0.214.0", "@opentelemetry/sdk-trace-base": "2.6.1", "effect": "catalog:", + "cross-spawn": "catalog:", "glob": "13.0.5", "mime-types": "3.0.2", "minimatch": "10.2.5", - "semver": "catalog:", "xdg-basedir": "5.1.0", "zod": "catalog:" }, diff --git a/packages/core/src/effect/cross-spawn-spawner.ts b/packages/core/src/effect/cross-spawn-spawner.ts new file mode 100644 index 000000000..ad8d4126d --- /dev/null +++ b/packages/core/src/effect/cross-spawn-spawner.ts @@ -0,0 +1,505 @@ +import type * as Arr from "effect/Array" +import { NodeFileSystem, NodeSink, NodeStream } from "@effect/platform-node" +import * as NodePath from "@effect/platform-node/NodePath" +import * as Deferred from "effect/Deferred" +import * as Effect from "effect/Effect" +import * as Exit from "effect/Exit" +import * as FileSystem from "effect/FileSystem" +import * as Layer from "effect/Layer" +import * as Path from "effect/Path" +import * as PlatformError from "effect/PlatformError" +import * as Predicate from "effect/Predicate" +import type * as Scope from "effect/Scope" +import * as Sink from "effect/Sink" +import * as Stream from "effect/Stream" +import * as ChildProcess from "effect/unstable/process/ChildProcess" +import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner" +import { + ChildProcessSpawner, + ExitCode, + make as makeSpawner, + makeHandle, + ProcessId, +} from "effect/unstable/process/ChildProcessSpawner" +import * as NodeChildProcess from "node:child_process" +import { PassThrough } from "node:stream" +import launch from "cross-spawn" + +const toError = (err: unknown): Error => (err instanceof globalThis.Error ? err : new globalThis.Error(String(err))) + +const toTag = (err: NodeJS.ErrnoException): PlatformError.SystemErrorTag => { + switch (err.code) { + case "ENOENT": + return "NotFound" + case "EACCES": + return "PermissionDenied" + case "EEXIST": + return "AlreadyExists" + case "EISDIR": + return "BadResource" + case "ENOTDIR": + return "BadResource" + case "EBUSY": + return "Busy" + case "ELOOP": + return "BadResource" + default: + return "Unknown" + } +} + +const flatten = (command: ChildProcess.Command) => { + const commands: Array<ChildProcess.StandardCommand> = [] + const opts: Array<ChildProcess.PipeOptions> = [] + + const walk = (cmd: ChildProcess.Command): void => { + switch (cmd._tag) { + case "StandardCommand": + commands.push(cmd) + return + case "PipedCommand": + walk(cmd.left) + opts.push(cmd.options) + walk(cmd.right) + return + } + } + + walk(command) + if (commands.length === 0) throw new Error("flatten produced empty commands array") + const [head, ...tail] = commands + return { + commands: [head, ...tail] as Arr.NonEmptyReadonlyArray<ChildProcess.StandardCommand>, + opts, + } +} + +const toPlatformError = ( + method: string, + err: NodeJS.ErrnoException, + command: ChildProcess.Command, +): PlatformError.PlatformError => { + const cmd = flatten(command) + .commands.map((x) => `${x.command} ${x.args.join(" ")}`) + .join(" | ") + return PlatformError.systemError({ + _tag: toTag(err), + module: "ChildProcess", + method, + pathOrDescriptor: cmd, + syscall: err.syscall, + cause: err, + }) +} + +type ExitSignal = Deferred.Deferred<readonly [code: number | null, signal: NodeJS.Signals | null]> + +export const make = Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem + const path = yield* Path.Path + + const cwd = Effect.fnUntraced(function* (opts: ChildProcess.CommandOptions) { + if (Predicate.isUndefined(opts.cwd)) return undefined + yield* fs.access(opts.cwd) + return path.resolve(opts.cwd) + }) + + const env = (opts: ChildProcess.CommandOptions) => + opts.extendEnv ? { ...globalThis.process.env, ...opts.env } : opts.env + + const input = (x: ChildProcess.CommandInput | undefined): NodeChildProcess.IOType | undefined => + Stream.isStream(x) ? "pipe" : x + + const output = (x: ChildProcess.CommandOutput | undefined): NodeChildProcess.IOType | undefined => + Sink.isSink(x) ? "pipe" : x + + const stdin = (opts: ChildProcess.CommandOptions): ChildProcess.StdinConfig => { + const cfg: ChildProcess.StdinConfig = { stream: "pipe", encoding: "utf-8", endOnDone: true } + if (Predicate.isUndefined(opts.stdin)) return cfg + if (typeof opts.stdin === "string") return { ...cfg, stream: opts.stdin } + if (Stream.isStream(opts.stdin)) return { ...cfg, stream: opts.stdin } + return { + stream: opts.stdin.stream, + encoding: opts.stdin.encoding ?? cfg.encoding, + endOnDone: opts.stdin.endOnDone ?? cfg.endOnDone, + } + } + + const stdio = (opts: ChildProcess.CommandOptions, key: "stdout" | "stderr"): ChildProcess.StdoutConfig => { + const cfg = opts[key] + if (Predicate.isUndefined(cfg)) return { stream: "pipe" } + if (typeof cfg === "string") return { stream: cfg } + if (Sink.isSink(cfg)) return { stream: cfg } + return { stream: cfg.stream } + } + + const fds = (opts: ChildProcess.CommandOptions) => { + if (Predicate.isUndefined(opts.additionalFds)) return [] + return Object.entries(opts.additionalFds) + .flatMap(([name, config]) => { + const fd = ChildProcess.parseFdName(name) + return Predicate.isUndefined(fd) ? [] : [{ fd, config }] + }) + .toSorted((a, b) => a.fd - b.fd) + } + + const stdios = ( + sin: ChildProcess.StdinConfig, + sout: ChildProcess.StdoutConfig, + serr: ChildProcess.StderrConfig, + extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>, + ): NodeChildProcess.StdioOptions => { + const pipe = (x: NodeChildProcess.IOType | undefined) => + process.platform === "win32" && x === "pipe" ? "overlapped" : x + const arr: Array<NodeChildProcess.IOType | undefined> = [ + pipe(input(sin.stream)), + pipe(output(sout.stream)), + pipe(output(serr.stream)), + ] + if (extra.length === 0) return arr as NodeChildProcess.StdioOptions + const max = extra.reduce((acc, x) => Math.max(acc, x.fd), 2) + for (let i = 3; i <= max; i++) arr[i] = "ignore" + for (const x of extra) arr[x.fd] = pipe("pipe") + return arr as NodeChildProcess.StdioOptions + } + + const setupFds = Effect.fnUntraced(function* ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>, + ) { + if (extra.length === 0) { + return { + getInputFd: () => Sink.drain, + getOutputFd: () => Stream.empty, + } + } + + const ins = new Map<number, Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError>>() + const outs = new Map<number, Stream.Stream<Uint8Array, PlatformError.PlatformError>>() + + for (const x of extra) { + const node = proc.stdio[x.fd] + switch (x.config.type) { + case "input": { + let sink: Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError> = Sink.drain + if (node && "write" in node) { + sink = NodeSink.fromWritable({ + evaluate: () => node, + onError: (err) => toPlatformError(`fromWritable(fd${x.fd})`, toError(err), command), + endOnDone: true, + }) + } + if (x.config.stream) yield* Effect.forkScoped(Stream.run(x.config.stream, sink)) + ins.set(x.fd, sink) + break + } + case "output": { + let stream: Stream.Stream<Uint8Array, PlatformError.PlatformError> = Stream.empty + if (node && "read" in node) { + const tap = new PassThrough() + node.on("error", (err) => tap.destroy(toError(err))) + node.pipe(tap) + stream = NodeStream.fromReadable({ + evaluate: () => tap, + onError: (err) => toPlatformError(`fromReadable(fd${x.fd})`, toError(err), command), + }) + } + if (x.config.sink) stream = Stream.transduce(stream, x.config.sink) + outs.set(x.fd, stream) + break + } + } + } + + return { + getInputFd: (fd: number) => ins.get(fd) ?? Sink.drain, + getOutputFd: (fd: number) => outs.get(fd) ?? Stream.empty, + } + }) + + const setupStdin = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + cfg: ChildProcess.StdinConfig, + ) => + Effect.suspend(() => { + let sink: Sink.Sink<void, unknown, never, PlatformError.PlatformError> = Sink.drain + if (Predicate.isNotNull(proc.stdin)) { + sink = NodeSink.fromWritable({ + evaluate: () => proc.stdin!, + onError: (err) => toPlatformError("fromWritable(stdin)", toError(err), command), + endOnDone: cfg.endOnDone, + encoding: cfg.encoding, + }) + } + if (Stream.isStream(cfg.stream)) return Effect.as(Effect.forkScoped(Stream.run(cfg.stream, sink)), sink) + return Effect.succeed(sink) + }) + + const setupOutput = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + out: ChildProcess.StdoutConfig, + err: ChildProcess.StderrConfig, + ) => { + let stdout = proc.stdout + ? NodeStream.fromReadable({ + evaluate: () => proc.stdout!, + onError: (cause) => toPlatformError("fromReadable(stdout)", toError(cause), command), + }) + : Stream.empty + let stderr = proc.stderr + ? NodeStream.fromReadable({ + evaluate: () => proc.stderr!, + onError: (cause) => toPlatformError("fromReadable(stderr)", toError(cause), command), + }) + : Stream.empty + + if (Sink.isSink(out.stream)) stdout = Stream.transduce(stdout, out.stream) + if (Sink.isSink(err.stream)) stderr = Stream.transduce(stderr, err.stream) + + return { stdout, stderr, all: Stream.merge(stdout, stderr) } + } + + const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) => + Effect.callback<readonly [NodeChildProcess.ChildProcess, ExitSignal], PlatformError.PlatformError>((resume) => { + const signal = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>() + const proc = launch(command.command, command.args, opts) + let end = false + let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined + proc.on("error", (err) => { + resume(Effect.fail(toPlatformError("spawn", err, command))) + }) + proc.on("exit", (...args) => { + exit = args + }) + proc.on("close", (...args) => { + if (end) return + end = true + Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args)) + }) + proc.on("spawn", () => { + resume(Effect.succeed([proc, signal])) + }) + return Effect.sync(() => { + proc.kill("SIGTERM") + }) + }) + + const killGroup = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + signal: NodeJS.Signals, + ) => { + if (globalThis.process.platform === "win32") { + return Effect.callback<void, PlatformError.PlatformError>((resume) => { + NodeChildProcess.exec(`taskkill /pid ${proc.pid} /T /F`, { windowsHide: true }, (err) => { + if (err) return resume(Effect.fail(toPlatformError("kill", toError(err), command))) + resume(Effect.void) + }) + }) + } + + return Effect.try({ + try: () => { + globalThis.process.kill(-proc.pid!, signal) + }, + catch: (err) => toPlatformError("kill", toError(err), command), + }) + } + + const killOne = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + signal: NodeJS.Signals, + ) => + Effect.suspend(() => { + if (proc.kill(signal)) return Effect.void + return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command)) + }) + + const timeout = + ( + proc: NodeChildProcess.ChildProcess, + command: ChildProcess.StandardCommand, + opts: ChildProcess.KillOptions | undefined, + ) => + <A, E, R>( + f: ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + signal: NodeJS.Signals, + ) => Effect.Effect<A, E, R>, + ) => { + const signal = opts?.killSignal ?? "SIGTERM" + if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal) + return Effect.timeoutOrElse(f(command, proc, signal), { + duration: opts.forceKillAfter, + orElse: () => f(command, proc, "SIGKILL"), + }) + } + + const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => { + const opt = from ?? "stdout" + switch (opt) { + case "stdout": + return handle.stdout + case "stderr": + return handle.stderr + case "all": + return handle.all + default: { + const fd = ChildProcess.parseFdName(opt) + return Predicate.isNotUndefined(fd) ? handle.getOutputFd(fd) : handle.stdout + } + } + } + + const spawnCommand: ( + command: ChildProcess.Command, + ) => Effect.Effect<ChildProcessHandle, PlatformError.PlatformError, Scope.Scope> = Effect.fnUntraced( + function* (command) { + switch (command._tag) { + case "StandardCommand": { + const sin = stdin(command.options) + const sout = stdio(command.options, "stdout") + const serr = stdio(command.options, "stderr") + const extra = fds(command.options) + const dir = yield* cwd(command.options) + + const [proc, signal] = yield* Effect.acquireRelease( + spawn(command, { + cwd: dir, + env: env(command.options), + stdio: stdios(sin, sout, serr, extra), + detached: command.options.detached ?? process.platform !== "win32", + shell: command.options.shell, + windowsHide: process.platform === "win32", + }), + Effect.fnUntraced(function* ([proc, signal]) { + const done = yield* Deferred.isDone(signal) + const kill = timeout(proc, command, command.options) + if (done) { + const [code] = yield* Deferred.await(signal) + if (process.platform === "win32") return yield* Effect.void + if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup)) + return yield* Effect.void + } + const send = (s: NodeJS.Signals) => + Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s)) + const sig = command.options.killSignal ?? "SIGTERM" + const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid) + const escalated = command.options.forceKillAfter + ? Effect.timeoutOrElse(attempt, { + duration: command.options.forceKillAfter, + orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid), + }) + : attempt + return yield* Effect.ignore(escalated) + }), + ) + + const fd = yield* setupFds(command, proc, extra) + const out = setupOutput(command, proc, sout, serr) + let ref = true + return makeHandle({ + pid: ProcessId(proc.pid!), + stdin: yield* setupStdin(command, proc, sin), + stdout: out.stdout, + stderr: out.stderr, + all: out.all, + getInputFd: fd.getInputFd, + getOutputFd: fd.getOutputFd, + isRunning: Effect.map(Deferred.isDone(signal), (done) => !done), + exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => { + if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code)) + return Effect.fail( + toPlatformError( + "exitCode", + new Error(`Process interrupted due to receipt of signal: '${signal}'`), + command, + ), + ) + }), + kill: (opts?: ChildProcess.KillOptions) => { + const sig = opts?.killSignal ?? "SIGTERM" + const send = (s: NodeJS.Signals) => + Effect.catch(killGroup(command, proc, s), () => killOne(command, proc, s)) + const attempt = send(sig).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid) + if (!opts?.forceKillAfter) return attempt + return Effect.timeoutOrElse(attempt, { + duration: opts.forceKillAfter, + orElse: () => send("SIGKILL").pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid), + }) + }, + unref: Effect.sync(() => { + if (ref) { + proc.unref() + ref = false + } + return Effect.sync(() => { + if (!ref) { + proc.ref() + ref = true + } + }) + }), + }) + } + case "PipedCommand": { + const flat = flatten(command) + const [head, ...tail] = flat.commands + let handle = spawnCommand(head) + for (let i = 0; i < tail.length; i++) { + const next = tail[i] + const opts = flat.opts[i] ?? {} + const sin = stdin(next.options) + const stream = Stream.unwrap(Effect.map(handle, (x) => source(x, opts.from))) + const to = opts.to ?? "stdin" + if (to === "stdin") { + handle = spawnCommand( + ChildProcess.make(next.command, next.args, { + ...next.options, + stdin: { ...sin, stream }, + }), + ) + continue + } + const fd = ChildProcess.parseFdName(to) + if (Predicate.isUndefined(fd)) { + handle = spawnCommand( + ChildProcess.make(next.command, next.args, { + ...next.options, + stdin: { ...sin, stream }, + }), + ) + continue + } + handle = spawnCommand( + ChildProcess.make(next.command, next.args, { + ...next.options, + additionalFds: { + ...next.options.additionalFds, + [ChildProcess.fdName(fd) as `fd${number}`]: { type: "input", stream }, + }, + }), + ) + } + return yield* handle + } + } + }, + ) + + return makeSpawner(spawnCommand) +}) + +export const layer: Layer.Layer<ChildProcessSpawner, never, FileSystem.FileSystem | Path.Path> = Layer.effect( + ChildProcessSpawner, + make, +) + +export const defaultLayer = layer.pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer)) + +export * as CrossSpawnSpawner from "./cross-spawn-spawner" diff --git a/packages/core/test/effect/cross-spawn-spawner.test.ts b/packages/core/test/effect/cross-spawn-spawner.test.ts new file mode 100644 index 000000000..d53725797 --- /dev/null +++ b/packages/core/test/effect/cross-spawn-spawner.test.ts @@ -0,0 +1,423 @@ +import { describe, expect } from "bun:test" +import fs from "node:fs/promises" +import os from "node:os" +import path from "node:path" +import { Effect, Exit, Stream } from "effect" +import type * as PlatformError from "effect/PlatformError" +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" +import { CrossSpawnSpawner } from "@opencode-ai/core/effect/cross-spawn-spawner" +import { testEffect } from "../lib/effect" + +const live = CrossSpawnSpawner.defaultLayer +const fx = testEffect(live) + +function js(code: string, opts?: ChildProcess.CommandOptions) { + return ChildProcess.make("node", ["-e", code], opts) +} + +function decodeByteStream(stream: Stream.Stream<Uint8Array, PlatformError.PlatformError>) { + return Stream.runCollect(stream).pipe( + Effect.map((chunks) => { + const total = chunks.reduce((acc, x) => acc + x.length, 0) + const out = new Uint8Array(total) + let off = 0 + for (const chunk of chunks) { + out.set(chunk, off) + off += chunk.length + } + return new TextDecoder("utf-8").decode(out).trim() + }), + ) +} + +function alive(pid: number) { + try { + process.kill(pid, 0) + return true + } catch { + return false + } +} + +async function tmpdir() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "opencode-core-test-")) + return { + path: dir, + async [Symbol.asyncDispose]() { + await fs.rm(dir, { recursive: true, force: true }) + }, + } +} + +async function gone(pid: number, timeout = 5_000) { + const end = Date.now() + timeout + while (Date.now() < end) { + if (!alive(pid)) return true + await new Promise((resolve) => setTimeout(resolve, 50)) + } + return !alive(pid) +} + +describe("cross-spawn spawner", () => { + describe("basic spawning", () => { + fx.effect( + "captures stdout", + Effect.gen(function* () { + const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.string(ChildProcess.make(process.execPath, ["-e", 'process.stdout.write("ok")'])), + ) + expect(out).toBe("ok") + }), + ) + + fx.effect( + "captures multiple lines", + Effect.gen(function* () { + const handle = yield* js('console.log("line1"); console.log("line2"); console.log("line3")') + const out = yield* decodeByteStream(handle.stdout) + expect(out).toBe("line1\nline2\nline3") + }), + ) + + fx.effect( + "returns exit code", + Effect.gen(function* () { + const handle = yield* js("process.exit(0)") + const code = yield* handle.exitCode + expect(code).toBe(ChildProcessSpawner.ExitCode(0)) + }), + ) + + fx.effect( + "returns non-zero exit code", + Effect.gen(function* () { + const handle = yield* js("process.exit(42)") + const code = yield* handle.exitCode + expect(code).toBe(ChildProcessSpawner.ExitCode(42)) + }), + ) + }) + + describe("cwd option", () => { + fx.effect( + "uses cwd when spawning commands", + Effect.gen(function* () { + const tmp = yield* Effect.acquireRelease( + Effect.promise(() => tmpdir()), + (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), + ) + const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.string( + ChildProcess.make(process.execPath, ["-e", "process.stdout.write(process.cwd())"], { cwd: tmp.path }), + ), + ) + expect(out).toBe(tmp.path) + }), + ) + + fx.effect( + "fails for invalid cwd", + Effect.gen(function* () { + const exit = yield* Effect.exit( + ChildProcess.make("echo", ["test"], { cwd: "/nonexistent/directory/path" }).asEffect(), + ) + expect(Exit.isFailure(exit)).toBe(true) + }), + ) + }) + + describe("env option", () => { + fx.effect( + "passes environment variables with extendEnv", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write(process.env.TEST_VAR ?? "")', { + env: { TEST_VAR: "test_value" }, + extendEnv: true, + }) + const out = yield* decodeByteStream(handle.stdout) + expect(out).toBe("test_value") + }), + ) + + fx.effect( + "passes multiple environment variables", + Effect.gen(function* () { + const handle = yield* js( + "process.stdout.write(`${process.env.VAR1}-${process.env.VAR2}-${process.env.VAR3}`)", + { + env: { VAR1: "one", VAR2: "two", VAR3: "three" }, + extendEnv: true, + }, + ) + const out = yield* decodeByteStream(handle.stdout) + expect(out).toBe("one-two-three") + }), + ) + }) + + describe("stderr", () => { + fx.effect( + "captures stderr output", + Effect.gen(function* () { + const handle = yield* js('process.stderr.write("error message")') + const err = yield* decodeByteStream(handle.stderr) + expect(err).toBe("error message") + }), + ) + + fx.effect( + "captures both stdout and stderr", + Effect.gen(function* () { + const handle = yield* js( + [ + "let pending = 2", + "const done = () => {", + " pending -= 1", + " if (pending === 0) setTimeout(() => process.exit(0), 0)", + "}", + 'process.stdout.write("stdout\\n", done)', + 'process.stderr.write("stderr\\n", done)', + ].join("\n"), + ) + const [stdout, stderr] = yield* Effect.all([decodeByteStream(handle.stdout), decodeByteStream(handle.stderr)], { + concurrency: 2, + }) + expect(stdout).toBe("stdout") + expect(stderr).toBe("stderr") + }), + ) + }) + + describe("combined output (all)", () => { + fx.effect( + "captures stdout via .all when no stderr", + Effect.gen(function* () { + const handle = yield* ChildProcess.make("echo", ["hello from stdout"]) + const all = yield* decodeByteStream(handle.all) + expect(all).toBe("hello from stdout") + }), + ) + + fx.effect( + "captures stderr via .all when no stdout", + Effect.gen(function* () { + const handle = yield* js('process.stderr.write("hello from stderr")') + const all = yield* decodeByteStream(handle.all) + expect(all).toBe("hello from stderr") + }), + ) + }) + + describe("stdin", () => { + fx.effect( + "allows providing standard input to a command", + Effect.gen(function* () { + const input = "a b c" + const stdin = Stream.make(Buffer.from(input, "utf-8")) + const handle = yield* js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))', + { stdin }, + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("a b c") + }), + ) + }) + + describe("process control", () => { + fx.effect( + "kills a running process", + Effect.gen(function* () { + const exit = yield* Effect.exit( + Effect.gen(function* () { + const handle = yield* js("setTimeout(() => {}, 10_000)") + yield* handle.kill() + return yield* handle.exitCode + }), + ) + expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true) + }), + ) + + fx.effect( + "kills a child when scope exits", + Effect.gen(function* () { + const pid = yield* Effect.scoped( + Effect.gen(function* () { + const handle = yield* js("setInterval(() => {}, 10_000)") + return Number(handle.pid) + }), + ) + const done = yield* Effect.promise(() => gone(pid)) + expect(done).toBe(true) + }), + ) + + fx.effect( + "forceKillAfter escalates for stubborn processes", + Effect.gen(function* () { + if (process.platform === "win32") return + + const started = Date.now() + const exit = yield* Effect.exit( + Effect.gen(function* () { + const handle = yield* js('process.on("SIGTERM", () => {}); setInterval(() => {}, 10_000)') + yield* handle.kill({ forceKillAfter: 100 }) + return yield* handle.exitCode + }), + ) + + expect(Date.now() - started).toBeLessThan(1_000) + expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true) + }), + ) + + fx.effect( + "isRunning reflects process state", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("done")') + yield* handle.exitCode + const running = yield* handle.isRunning + expect(running).toBe(false) + }), + ) + }) + + describe("error handling", () => { + fx.effect( + "fails for invalid command", + Effect.gen(function* () { + const exit = yield* Effect.exit( + Effect.gen(function* () { + const handle = yield* ChildProcess.make("nonexistent-command-12345") + return yield* handle.exitCode + }), + ) + expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true) + }), + ) + }) + + describe("pipeline", () => { + fx.effect( + "pipes stdout of one command to stdin of another", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("hello world")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))', + ), + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("HELLO WORLD") + }), + ) + + fx.effect( + "three-stage pipeline", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("hello world")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))', + ), + ), + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.replaceAll(" ", "-")))', + ), + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("HELLO-WORLD") + }), + ) + + fx.effect( + "pipes stderr with { from: 'stderr' }", + Effect.gen(function* () { + const handle = yield* js('process.stderr.write("error")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))', + ), + { from: "stderr" }, + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("error") + }), + ) + + fx.effect( + "pipes combined output with { from: 'all' }", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))', + ), + { from: "all" }, + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toContain("stdout") + expect(out).toContain("stderr") + }), + ) + }) + + describe("Windows-specific", () => { + fx.effect( + "uses shell routing on Windows", + Effect.gen(function* () { + if (process.platform !== "win32") return + + const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.string( + ChildProcess.make("set", ["OPENCODE_TEST_SHELL"], { + shell: true, + extendEnv: true, + env: { OPENCODE_TEST_SHELL: "ok" }, + }), + ), + ) + expect(out).toContain("OPENCODE_TEST_SHELL=ok") + }), + ) + + fx.effect( + "runs cmd scripts with spaces on Windows without shell", + Effect.gen(function* () { + if (process.platform !== "win32") return + + const tmp = yield* Effect.acquireRelease( + Effect.promise(() => tmpdir()), + (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), + ) + const dir = path.join(tmp.path, "with space") + const file = path.join(dir, "echo cmd.cmd") + + yield* Effect.promise(() => fs.mkdir(dir, { recursive: true })) + yield* Effect.promise(() => fs.writeFile(file, "@echo off\r\nif %~1==--stdio exit /b 0\r\nexit /b 7\r\n")) + + const code = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.exitCode( + ChildProcess.make(file, ["--stdio"], { + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }), + ), + ) + expect(code).toBe(ChildProcessSpawner.ExitCode(0)) + }), + ) + }) +}) |
