diff options
Diffstat (limited to 'packages/core/src/effect')
| -rw-r--r-- | packages/core/src/effect/logger.ts | 73 | ||||
| -rw-r--r-- | packages/core/src/effect/memo-map.ts | 3 | ||||
| -rw-r--r-- | packages/core/src/effect/observability.ts | 107 | ||||
| -rw-r--r-- | packages/core/src/effect/runtime.ts | 21 |
4 files changed, 204 insertions, 0 deletions
diff --git a/packages/core/src/effect/logger.ts b/packages/core/src/effect/logger.ts new file mode 100644 index 000000000..69f9631e0 --- /dev/null +++ b/packages/core/src/effect/logger.ts @@ -0,0 +1,73 @@ +import { Cause, Effect, Logger, References } from "effect" +import * as Log from "../util/log" + +type Fields = Record<string, unknown> + +const normalizeKey = (key: string) => (key === "sessionID" ? "session.id" : key) + +export interface Handle { + readonly debug: (msg?: unknown, extra?: Fields) => Effect.Effect<void> + readonly info: (msg?: unknown, extra?: Fields) => Effect.Effect<void> + readonly warn: (msg?: unknown, extra?: Fields) => Effect.Effect<void> + readonly error: (msg?: unknown, extra?: Fields) => Effect.Effect<void> + readonly with: (extra: Fields) => Handle +} + +const clean = (input?: Fields): Fields => + Object.fromEntries( + Object.entries(input ?? {}) + .filter((entry) => entry[1] !== undefined && entry[1] !== null) + .map(([key, value]) => [normalizeKey(key), value]), + ) + +const text = (input: unknown): string => { + // oxlint-disable-next-line no-base-to-string + if (Array.isArray(input)) return input.map((item) => String(item)).join(" ") + // oxlint-disable-next-line no-base-to-string + return input === undefined ? "" : String(input) +} + +const call = (run: (msg?: unknown) => Effect.Effect<void>, base: Fields, msg?: unknown, extra?: Fields) => { + const ann = clean({ ...base, ...extra }) + const fx = run(msg) + return Object.keys(ann).length ? Effect.annotateLogs(fx, ann) : fx +} + +export const logger = Logger.make((opts) => { + const extra = clean(opts.fiber.getRef(References.CurrentLogAnnotations)) + const now = opts.date.getTime() + for (const [key, start] of opts.fiber.getRef(References.CurrentLogSpans)) { + extra[`logSpan.${key}`] = `${now - start}ms` + } + if (opts.cause.reasons.length > 0) { + extra.cause = Cause.pretty(opts.cause) + } + + const svc = typeof extra.service === "string" ? extra.service : undefined + if (svc) delete extra.service + const log = svc ? Log.create({ service: svc }) : Log.Default + const msg = text(opts.message) + + switch (opts.logLevel) { + case "Trace": + case "Debug": + return log.debug(msg, extra) + case "Warn": + return log.warn(msg, extra) + case "Error": + case "Fatal": + return log.error(msg, extra) + default: + return log.info(msg, extra) + } +}) + +export const layer = Logger.layer([logger], { mergeWithExisting: false }) + +export const create = (base: Fields = {}): Handle => ({ + debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra), + info: (msg, extra) => call((item) => Effect.logInfo(item), base, msg, extra), + warn: (msg, extra) => call((item) => Effect.logWarning(item), base, msg, extra), + error: (msg, extra) => call((item) => Effect.logError(item), base, msg, extra), + with: (extra) => create({ ...base, ...extra }), +}) diff --git a/packages/core/src/effect/memo-map.ts b/packages/core/src/effect/memo-map.ts new file mode 100644 index 000000000..c797dbf42 --- /dev/null +++ b/packages/core/src/effect/memo-map.ts @@ -0,0 +1,3 @@ +import { Layer } from "effect" + +export const memoMap = Layer.makeMemoMapUnsafe() diff --git a/packages/core/src/effect/observability.ts b/packages/core/src/effect/observability.ts new file mode 100644 index 000000000..0203079ab --- /dev/null +++ b/packages/core/src/effect/observability.ts @@ -0,0 +1,107 @@ +import { Effect, Layer, Logger } from "effect" +import { FetchHttpClient } from "effect/unstable/http" +import { OtlpLogger, OtlpSerialization } from "effect/unstable/observability" +import * as EffectLogger from "./logger" +import { Flag } from "../flag/flag" +import { InstallationChannel, InstallationVersion } from "../installation/version" +import { ensureProcessMetadata } from "../util/opencode-process" + +const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT +export const enabled = !!base +const processID = crypto.randomUUID() + +const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS + ? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce( + (acc, x) => { + const [key, ...value] = x.split("=") + acc[key] = value.join("=") + return acc + }, + {} as Record<string, string>, + ) + : undefined + +export function resource(): { serviceName: string; serviceVersion: string; attributes: Record<string, string> } { + const processMetadata = ensureProcessMetadata("main") + const attributes: Record<string, string> = (() => { + const value = process.env.OTEL_RESOURCE_ATTRIBUTES + if (!value) return {} + try { + return Object.fromEntries( + value.split(",").map((entry) => { + const index = entry.indexOf("=") + if (index < 1) throw new Error("Invalid OTEL_RESOURCE_ATTRIBUTES entry") + return [decodeURIComponent(entry.slice(0, index)), decodeURIComponent(entry.slice(index + 1))] + }), + ) + } catch { + return {} + } + })() + + return { + serviceName: "opencode", + serviceVersion: InstallationVersion, + attributes: { + ...attributes, + "deployment.environment.name": InstallationChannel, + "opencode.client": Flag.OPENCODE_CLIENT, + "opencode.process_role": processMetadata.processRole, + "opencode.run_id": processMetadata.runID, + "service.instance.id": processID, + }, + } +} + +function logs() { + return Logger.layer( + [ + EffectLogger.logger, + OtlpLogger.make({ + url: `${base}/v1/logs`, + resource: resource(), + headers, + }), + ], + { mergeWithExisting: false }, + ).pipe(Layer.provide(OtlpSerialization.layerJson), Layer.provide(FetchHttpClient.layer)) +} + +const traces = async () => { + const NodeSdk = await import("@effect/opentelemetry/NodeSdk") + const OTLP = await import("@opentelemetry/exporter-trace-otlp-http") + const SdkBase = await import("@opentelemetry/sdk-trace-base") + + // @effect/opentelemetry creates a NodeTracerProvider but never calls + // register(), so the global @opentelemetry/api context manager stays + // as the no-op default. Non-Effect code (like the AI SDK) that calls + // tracer.startActiveSpan() relies on context.active() to find the + // parent span - without a real context manager every span starts a + // new trace. Registering AsyncLocalStorageContextManager fixes this. + const { AsyncLocalStorageContextManager } = await import("@opentelemetry/context-async-hooks") + const { context } = await import("@opentelemetry/api") + const mgr = new AsyncLocalStorageContextManager() + mgr.enable() + context.setGlobalContextManager(mgr) + + return NodeSdk.layer(() => ({ + resource: resource(), + spanProcessor: new SdkBase.BatchSpanProcessor( + new OTLP.OTLPTraceExporter({ + url: `${base}/v1/traces`, + headers, + }), + ), + })) +} + +export const layer = !base + ? EffectLogger.layer + : Layer.unwrap( + Effect.gen(function* () { + const trace = yield* Effect.promise(traces) + return Layer.mergeAll(trace, logs()) + }), + ) + +export const Observability = { enabled, layer } diff --git a/packages/core/src/effect/runtime.ts b/packages/core/src/effect/runtime.ts new file mode 100644 index 000000000..e4f682709 --- /dev/null +++ b/packages/core/src/effect/runtime.ts @@ -0,0 +1,21 @@ +import { Layer, type Context, ManagedRuntime, type Effect } from "effect" +import { memoMap } from "./memo-map" +import { Observability } from "./observability" + +export function makeRuntime<I, S, E>(service: Context.Service<I, S>, layer: Layer.Layer<I, E>) { + let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined + const getRuntime = () => + (rt ??= ManagedRuntime.make(Layer.provideMerge(layer, Observability.layer) as Layer.Layer<I, E>, { + memoMap, + })) + + return { + runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)), + runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => + getRuntime().runPromiseExit(service.use(fn), options), + runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => + getRuntime().runPromise(service.use(fn), options), + runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)), + runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)), + } +} |
