summaryrefslogtreecommitdiffhomepage
path: root/packages/core/src/effect
diff options
context:
space:
mode:
Diffstat (limited to 'packages/core/src/effect')
-rw-r--r--packages/core/src/effect/logger.ts73
-rw-r--r--packages/core/src/effect/memo-map.ts3
-rw-r--r--packages/core/src/effect/observability.ts107
-rw-r--r--packages/core/src/effect/runtime.ts21
4 files changed, 204 insertions, 0 deletions
diff --git a/packages/core/src/effect/logger.ts b/packages/core/src/effect/logger.ts
new file mode 100644
index 000000000..69f9631e0
--- /dev/null
+++ b/packages/core/src/effect/logger.ts
@@ -0,0 +1,73 @@
+import { Cause, Effect, Logger, References } from "effect"
+import * as Log from "../util/log"
+
+type Fields = Record<string, unknown>
+
+const normalizeKey = (key: string) => (key === "sessionID" ? "session.id" : key)
+
+export interface Handle {
+ readonly debug: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
+ readonly info: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
+ readonly warn: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
+ readonly error: (msg?: unknown, extra?: Fields) => Effect.Effect<void>
+ readonly with: (extra: Fields) => Handle
+}
+
+const clean = (input?: Fields): Fields =>
+ Object.fromEntries(
+ Object.entries(input ?? {})
+ .filter((entry) => entry[1] !== undefined && entry[1] !== null)
+ .map(([key, value]) => [normalizeKey(key), value]),
+ )
+
+const text = (input: unknown): string => {
+ // oxlint-disable-next-line no-base-to-string
+ if (Array.isArray(input)) return input.map((item) => String(item)).join(" ")
+ // oxlint-disable-next-line no-base-to-string
+ return input === undefined ? "" : String(input)
+}
+
+const call = (run: (msg?: unknown) => Effect.Effect<void>, base: Fields, msg?: unknown, extra?: Fields) => {
+ const ann = clean({ ...base, ...extra })
+ const fx = run(msg)
+ return Object.keys(ann).length ? Effect.annotateLogs(fx, ann) : fx
+}
+
+export const logger = Logger.make((opts) => {
+ const extra = clean(opts.fiber.getRef(References.CurrentLogAnnotations))
+ const now = opts.date.getTime()
+ for (const [key, start] of opts.fiber.getRef(References.CurrentLogSpans)) {
+ extra[`logSpan.${key}`] = `${now - start}ms`
+ }
+ if (opts.cause.reasons.length > 0) {
+ extra.cause = Cause.pretty(opts.cause)
+ }
+
+ const svc = typeof extra.service === "string" ? extra.service : undefined
+ if (svc) delete extra.service
+ const log = svc ? Log.create({ service: svc }) : Log.Default
+ const msg = text(opts.message)
+
+ switch (opts.logLevel) {
+ case "Trace":
+ case "Debug":
+ return log.debug(msg, extra)
+ case "Warn":
+ return log.warn(msg, extra)
+ case "Error":
+ case "Fatal":
+ return log.error(msg, extra)
+ default:
+ return log.info(msg, extra)
+ }
+})
+
+export const layer = Logger.layer([logger], { mergeWithExisting: false })
+
+export const create = (base: Fields = {}): Handle => ({
+ debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra),
+ info: (msg, extra) => call((item) => Effect.logInfo(item), base, msg, extra),
+ warn: (msg, extra) => call((item) => Effect.logWarning(item), base, msg, extra),
+ error: (msg, extra) => call((item) => Effect.logError(item), base, msg, extra),
+ with: (extra) => create({ ...base, ...extra }),
+})
diff --git a/packages/core/src/effect/memo-map.ts b/packages/core/src/effect/memo-map.ts
new file mode 100644
index 000000000..c797dbf42
--- /dev/null
+++ b/packages/core/src/effect/memo-map.ts
@@ -0,0 +1,3 @@
+import { Layer } from "effect"
+
+export const memoMap = Layer.makeMemoMapUnsafe()
diff --git a/packages/core/src/effect/observability.ts b/packages/core/src/effect/observability.ts
new file mode 100644
index 000000000..0203079ab
--- /dev/null
+++ b/packages/core/src/effect/observability.ts
@@ -0,0 +1,107 @@
+import { Effect, Layer, Logger } from "effect"
+import { FetchHttpClient } from "effect/unstable/http"
+import { OtlpLogger, OtlpSerialization } from "effect/unstable/observability"
+import * as EffectLogger from "./logger"
+import { Flag } from "../flag/flag"
+import { InstallationChannel, InstallationVersion } from "../installation/version"
+import { ensureProcessMetadata } from "../util/opencode-process"
+
+const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT
+export const enabled = !!base
+const processID = crypto.randomUUID()
+
+const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS
+ ? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce(
+ (acc, x) => {
+ const [key, ...value] = x.split("=")
+ acc[key] = value.join("=")
+ return acc
+ },
+ {} as Record<string, string>,
+ )
+ : undefined
+
+export function resource(): { serviceName: string; serviceVersion: string; attributes: Record<string, string> } {
+ const processMetadata = ensureProcessMetadata("main")
+ const attributes: Record<string, string> = (() => {
+ const value = process.env.OTEL_RESOURCE_ATTRIBUTES
+ if (!value) return {}
+ try {
+ return Object.fromEntries(
+ value.split(",").map((entry) => {
+ const index = entry.indexOf("=")
+ if (index < 1) throw new Error("Invalid OTEL_RESOURCE_ATTRIBUTES entry")
+ return [decodeURIComponent(entry.slice(0, index)), decodeURIComponent(entry.slice(index + 1))]
+ }),
+ )
+ } catch {
+ return {}
+ }
+ })()
+
+ return {
+ serviceName: "opencode",
+ serviceVersion: InstallationVersion,
+ attributes: {
+ ...attributes,
+ "deployment.environment.name": InstallationChannel,
+ "opencode.client": Flag.OPENCODE_CLIENT,
+ "opencode.process_role": processMetadata.processRole,
+ "opencode.run_id": processMetadata.runID,
+ "service.instance.id": processID,
+ },
+ }
+}
+
+function logs() {
+ return Logger.layer(
+ [
+ EffectLogger.logger,
+ OtlpLogger.make({
+ url: `${base}/v1/logs`,
+ resource: resource(),
+ headers,
+ }),
+ ],
+ { mergeWithExisting: false },
+ ).pipe(Layer.provide(OtlpSerialization.layerJson), Layer.provide(FetchHttpClient.layer))
+}
+
+const traces = async () => {
+ const NodeSdk = await import("@effect/opentelemetry/NodeSdk")
+ const OTLP = await import("@opentelemetry/exporter-trace-otlp-http")
+ const SdkBase = await import("@opentelemetry/sdk-trace-base")
+
+ // @effect/opentelemetry creates a NodeTracerProvider but never calls
+ // register(), so the global @opentelemetry/api context manager stays
+ // as the no-op default. Non-Effect code (like the AI SDK) that calls
+ // tracer.startActiveSpan() relies on context.active() to find the
+ // parent span - without a real context manager every span starts a
+ // new trace. Registering AsyncLocalStorageContextManager fixes this.
+ const { AsyncLocalStorageContextManager } = await import("@opentelemetry/context-async-hooks")
+ const { context } = await import("@opentelemetry/api")
+ const mgr = new AsyncLocalStorageContextManager()
+ mgr.enable()
+ context.setGlobalContextManager(mgr)
+
+ return NodeSdk.layer(() => ({
+ resource: resource(),
+ spanProcessor: new SdkBase.BatchSpanProcessor(
+ new OTLP.OTLPTraceExporter({
+ url: `${base}/v1/traces`,
+ headers,
+ }),
+ ),
+ }))
+}
+
+export const layer = !base
+ ? EffectLogger.layer
+ : Layer.unwrap(
+ Effect.gen(function* () {
+ const trace = yield* Effect.promise(traces)
+ return Layer.mergeAll(trace, logs())
+ }),
+ )
+
+export const Observability = { enabled, layer }
diff --git a/packages/core/src/effect/runtime.ts b/packages/core/src/effect/runtime.ts
new file mode 100644
index 000000000..e4f682709
--- /dev/null
+++ b/packages/core/src/effect/runtime.ts
@@ -0,0 +1,21 @@
+import { Layer, type Context, ManagedRuntime, type Effect } from "effect"
+import { memoMap } from "./memo-map"
+import { Observability } from "./observability"
+
+export function makeRuntime<I, S, E>(service: Context.Service<I, S>, layer: Layer.Layer<I, E>) {
+ let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
+ const getRuntime = () =>
+ (rt ??= ManagedRuntime.make(Layer.provideMerge(layer, Observability.layer) as Layer.Layer<I, E>, {
+ memoMap,
+ }))
+
+ return {
+ runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
+ runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
+ getRuntime().runPromiseExit(service.use(fn), options),
+ runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
+ getRuntime().runPromise(service.use(fn), options),
+ runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
+ runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
+ }
+}