summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
committerAdam Malczewski <[email protected]>2026-06-05 13:07:23 +0900
commitc48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch)
tree1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b
parent94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff)
downloaddispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz
dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file. Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span. journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn. Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11). Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array. Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
-rw-r--r--.dispatch/rules/isolation-over-dry.md12
-rw-r--r--.gitignore3
-rw-r--r--ORCHESTRATOR.md18
-rw-r--r--bun.lock10
-rw-r--r--notes/observability-design.md619
-rw-r--r--packages/host-bin/package.json3
-rw-r--r--packages/host-bin/src/main.ts22
-rw-r--r--packages/journal-sink/package.json11
-rw-r--r--packages/journal-sink/src/index.ts2
-rw-r--r--packages/journal-sink/src/journal-sink.test.ts309
-rw-r--r--packages/journal-sink/src/journal-sink.ts171
-rw-r--r--packages/journal-sink/tsconfig.json6
-rw-r--r--packages/kernel/src/bus/bus.test.ts23
-rw-r--r--packages/kernel/src/bus/pure.ts8
-rw-r--r--packages/kernel/src/contracts/extension.ts14
-rw-r--r--packages/kernel/src/contracts/index.ts20
-rw-r--r--packages/kernel/src/contracts/logging.ts461
-rw-r--r--packages/kernel/src/contracts/runtime.ts8
-rw-r--r--packages/kernel/src/contracts/tool.ts9
-rw-r--r--packages/kernel/src/host/host.test.ts229
-rw-r--r--packages/kernel/src/host/host.ts21
-rw-r--r--packages/kernel/src/runtime/dispatch.ts30
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts226
-rw-r--r--packages/kernel/src/runtime/run-turn.ts214
-rw-r--r--packages/session-orchestrator/src/extension.ts1
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts5
-rw-r--r--packages/tool-read-file/src/read-file.test.ts7
-rw-r--r--tasks.md36
-rw-r--r--tsconfig.json1
29 files changed, 2412 insertions, 87 deletions
diff --git a/.dispatch/rules/isolation-over-dry.md b/.dispatch/rules/isolation-over-dry.md
new file mode 100644
index 0000000..55ac13a
--- /dev/null
+++ b/.dispatch/rules/isolation-over-dry.md
@@ -0,0 +1,12 @@
+# Rule: isolation over DRY
+
+Prefer self-contained, even DUPLICATED, code in each extension over a shared helper
+that two+ extensions import. A shared module wired between sibling features is a
+coupling smell — it breaks feature-as-a-library (P1) and ties their fates together.
+Get consistency by sharing KNOWLEDGE in the harness (`.dispatch/rules`, skills,
+`GLOSSARY.md`), never shared runtime code.
+The ONLY sanctioned shared surfaces are the kernel ABI (host-provided, injected) and
+typed contracts — NOT a new utility module between features.
+When tempted to "extract a helper for consistency," stop: that is reputation-driven
+DRY (P4). Duplicating a small function across the few features that need it is the
+intended trade.
diff --git a/.gitignore b/.gitignore
index b2eb550..2c6bb6d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,6 @@ reports/
# Local runtime DB
.dispatch-data/
+
+# Local observability journal (runtime artifact)
+.dispatch/journal/
diff --git a/ORCHESTRATOR.md b/ORCHESTRATOR.md
index afb5520..fdeb859 100644
--- a/ORCHESTRATOR.md
+++ b/ORCHESTRATOR.md
@@ -114,9 +114,10 @@ log into context as a hard failure.
Write self-contained prompts. Structure:
1. **Role:** "You are the owner-agent for <unit>."
-2. **Read first (ordered):** `AGENTS.md`, `.dispatch/rules/`, `GLOSSARY.md`, the
- relevant `notes/restructure-plan.md` §-sections, and **the exact contract
- files under `packages/kernel/src/contracts/` it builds against**.
+2. **Read first (ordered):** `AGENTS.md`, the **scoped `.dispatch/rules/`** for this
+ unit's layer (the scoping map is below the recipe), `GLOSSARY.md`, the relevant
+ `notes/restructure-plan.md` §-sections, and **the exact contract files under
+ `packages/kernel/src/contracts/` it builds against**.
3. **Ownership (strict):** the EXACT files it may create/edit, and an explicit
"do not touch anything else; if you need a change elsewhere, write a change-
request in your report — do NOT edit it."
@@ -138,6 +139,17 @@ Write self-contained prompts. Structure:
Keep the prompt scoped (P6): don't restate what a frontier model knows; do state
the project-specific, non-inferable rules.
+**`.dispatch/rules/` scoping map** — include ONLY the rows matching the unit (per §0
+"scoped rules beat general rules"); do NOT dump every rule on every agent:
+- **Every agent:** `one-owner.md`, `isolation-over-dry.md`.
+- **Kernel unit:** `kernel-purity.md` + `pure-core.md` + `no-internal-mocks.md`.
+- **Pure-core unit:** `pure-core.md` + `no-internal-mocks.md`.
+- **Any extension coupling via hooks/services:** `typed-handles.md`.
+- **Any extension that emits logs/spans (≈ all of them):** `extension-logging.md`
+ *(pending — authored with the observability substrate, see
+ `notes/observability-design.md` §9; keystone: each extension self-redacts its OWN
+ secrets in its OWN code — NO shared redaction helper).*
+
---
## 4. Verification (the orchestrator's trust protocol)
diff --git a/bun.lock b/bun.lock
index 0c5b609..d5446fe 100644
--- a/bun.lock
+++ b/bun.lock
@@ -31,6 +31,7 @@
"dependencies": {
"@dispatch/auth-apikey": "workspace:*",
"@dispatch/conversation-store": "workspace:*",
+ "@dispatch/journal-sink": "workspace:*",
"@dispatch/kernel": "workspace:*",
"@dispatch/provider-openai-compat": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
@@ -39,6 +40,13 @@
"@dispatch/transport-http": "workspace:*",
},
},
+ "packages/journal-sink": {
+ "name": "@dispatch/journal-sink",
+ "version": "0.0.0",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ },
+ },
"packages/kernel": {
"name": "@dispatch/kernel",
"version": "0.0.0",
@@ -107,6 +115,8 @@
"@dispatch/host-bin": ["@dispatch/host-bin@workspace:packages/host-bin"],
+ "@dispatch/journal-sink": ["@dispatch/journal-sink@workspace:packages/journal-sink"],
+
"@dispatch/kernel": ["@dispatch/kernel@workspace:packages/kernel"],
"@dispatch/provider-openai-compat": ["@dispatch/provider-openai-compat@workspace:packages/provider-openai-compat"],
diff --git a/notes/observability-design.md b/notes/observability-design.md
new file mode 100644
index 0000000..6c09099
--- /dev/null
+++ b/notes/observability-design.md
@@ -0,0 +1,619 @@
+# Observability & Debugging — Design Scratch
+
+> **Status:** IDEATION / scratch. NOT the plan, NOT decided-to-build. Captures a
+> live design discussion so decisions don't live only in chat (per ORCHESTRATOR
+> "write up before pivoting"). Promote settled parts into
+> `notes/restructure-plan.md` + `GLOSSARY.md` when we commit to building.
+> **Scope:** backend only. **Driver:** old Dispatch had bugs that were near-
+> impossible to diagnose because an agent had *no real data to read*. This fixes
+> that at the root.
+>
+> **Read order (fresh agent picking this up):** `ORCHESTRATOR.md` →
+> `notes/restructure-plan.md` (the §-refs below point into it) → `GLOSSARY.md` →
+> this file. **Mode = IDEATION with the user** (discuss/design, do NOT build yet);
+> the user owns the boundary + vocabulary calls (§5.2 / §5.6).
+> **Where we left off:** **Logger ABI = A, LOCKED** (evolve in place, §8). **Now
+> PLANNING Phase A** (logging substrate, §10): A1 kernel Logger/Span ABI → A2
+> `journal-sink` package (in-process, host-bin-injected) → A3 runtime span
+> instrumentation. Phase A captures the **"BEFORE"** (verbatim pre-mutation prompt on
+> the step span, via a new `LogRecord.body` blob); the **"AFTER"** (provider.request
+> verbatim, D5) is the **next step** → completes full round-trip rebuild + the
+> before↔after diff. **2 processes total, no more:** process 1 = main app *incl. the
+> in-process journal sink* + the journal file; process 2 = the **collector** (Phase B).
+> A package ≠ a process — the sink runs in-process (cheap non-blocking append); the
+> file-seam + out-of-process collector is what delivers the crash-safety.
+> **Redaction = each ext self-redacts IN ISOLATION** (no shared helper; P1/P7).
+> **D10** = logs are one-way. **§9** extension-logging rule → `.dispatch/rules/`
+> (placement decided; authored with the substrate). Crash-safety: journal file IS the
+> durable queue (§6/D3). **Mode: PLANNING Phase A (not building yet).** Settled:
+> D1–D10 (§2); open per-phase: §6.
+
+---
+
+## 0. Goal in one paragraph
+
+First-class, **agent-first** debugging. When something goes wrong with a chat or
+feature, you grab an **ID** and hand it to a **debugging agent**; it pinpoints the
+*source* of the problem from a **complete, queryable trace** of what actually
+happened, then hands its findings up to the orchestrator, which dispatches fixers.
+The debugging agent is trained ONLY to find root cause — it does not fix.
+
+---
+
+## 1. Principles specific to this subsystem
+
+- **Agent-first.** The primary trace *consumer is an LLM*, not a human dashboard.
+ This single fact killed the OpenTelemetry path (see D1): its entire payoff is
+ human visualization, which is useless to the agent. Every field we capture is
+ justified by *"what query must the debugging agent run?"*
+- **"Complete" = completely *captured & queryable*, never completely *loaded into
+ context*.** Noise reduction is done by deterministic **query/filter tools**
+ (plain code), so the agent always reads a small, targeted slice (a timeline
+ skeleton → drill into one span; a diff; a validator verdict). **Store fat, serve
+ thin.** This is the harness's progressive-disclosure idea (P5) applied to
+ runtime data, and it's what protects context + usage.
+- **Subordinate & fail-safe.** Observability must NEVER break or slow a turn. A
+ tracing error, full disk, or downed collector → drop/sample + warn, never stall.
+ (The §3.7 fault-containment principle: the turn is sovereign.)
+- **The debugger must survive the crash it's debugging** (see D3). Robustness of
+ the collector is paramount — it's the explicit reason it gets an exception to
+ the in-process architecture.
+
+---
+
+## 2. Decisions settled so far (with the named problem each solves — P4)
+
+### D1 — Agent-first; OpenTelemetry dropped
+No OTLP exporter, no Jaeger/Tempo/Grafana, no OTel SDK, no "stay compatible"
+constraint. Its value is human dashboards + cross-service fleet tracing — we are
+one process whose reader is an agent. **Kept** (not as "OTel" but because it's the
+right abstraction for *"what caused X"*): the **causal-trace model** — spans with
+**parent + links + attributes**. Designed for agent queryability, not rendering.
+- *Rejected by a named tradeoff, not NIH:* the OTel SDK's ambient "current span"
+ (`AsyncLocalStorage`) is exactly the hidden global P3 bans; we pass correlation
+ explicitly. Dropping it removes that tax + a heavy dep tree.
+
+### D2 — Boundary: kernel owns types+mechanism; one `observability` extension owns the rest
+(User's granularity call, §5.2.)
+- **Kernel (ABI):** the structured-log + span **types**, the scoped logger/trace
+ **handle** shape, and the **`onAny`** firehose (reserved in §5.4 / ORCHESTRATOR
+ §6, **not yet built**). Kernel still touches no I/O and names no feature.
+- **`observability` extension:** capture (onAny tap + AgentEvent tap + the scoped
+ handles), the store, the query/filter tools, redaction, and the debugging-agent
+ harness (P7).
+
+### D3 — The collector runs OUT OF PROCESS (the sanctioned exception)
+Single-process app (§3.7): a buggy extension can crash/`exit`/OOM everything. An
+in-process collector buffering in memory loses its evidence **at the exact moment
+you need it** (the crash). So the **sink** behind the logging handle is a separate
+OS process. Three agent-first wins only this enables:
+1. **Query a chat that killed the main process** — the collector stays alive, so
+ the agent can debug a dead app. (Decisive.)
+2. **Attribute the crash** — collector sees pipe EOF / heartbeat loss → stamps a
+ synthetic terminal span "process exited unexpectedly after span X." In-process
+ that final fact dies with the recorder.
+3. **Isolate the query workload** from the live turn.
+
+**Robust shape (write-ahead logging for telemetry):**
+```
+main process ──(cheap, non-blocking, append-only line)──▶ journal file (durable buffer)
+ │ tails
+ standalone collector process
+ normalize → SQLite trace store → query API ◀── debugging agent
+```
+- Hot path only **appends a line to an OS-buffered fd** — no SQLite, no blocking;
+ buffer full → drop/sample (fail-safe).
+- The **journal file is the queue**: if both processes die, flushed bytes are on
+ disk; collector resumes tailing on restart. Worst case loses the last partial
+ line (the §3.4 "last in-flight" bound).
+- Collector is **deliberately dead-simple** (read→normalize→store→serve): the
+ less it does, the less it can crash.
+- **Exception scoped tightly:** extensions still emit through a normal
+ `host.logger`/trace handle — *the contract is unchanged*; only the sink's
+ *implementation* is out-of-process. This is the first real use of §3.7's
+ pre-designed "move an extension to a worker without a rewrite" (serializable
+ payloads). It fits the architecture rather than fighting it.
+- *Cost (honest):* a second process host-bin spawns first / drains last + a tiny
+ line protocol. Justified by the "must be robust" priority.
+
+### D4 — Correlation model (reuses existing keys — P8, no invented id)
+- **turn = one trace** (`turnId` ≈ trace id).
+- **conversationId = an indexed attribute** grouping a thread's turn-traces.
+- **span = one occurrence** of work (its own span id), with `parentSpanId`.
+- **extension.id = an auto-stamped attribute** — the host hands each extension a
+ handle pre-tagged with its `manifest.id`; nobody can spoof or invent an id
+ scheme. (This is the clean version of "each extension broadcasts its id.")
+- **Cross-feature causality = span links**, recorded **at the handoff moment**
+ (enqueue→dequeue, summon→subagent-turn) — *causal edges, not co-occurrence.*
+- Every span carries the correlation keys of what it acts on (e.g. a cross-chat
+ op records both `source.conversationId` and `target.conversationId`).
+
+### D5 — Verbatim provider I/O capture (the highest-value capture)
+The cure for "the agent had no real data." Rule:
+- **Capture the request verbatim at the fetch edge — AFTER all transforms** (OAuth
+ body rewrite, tool-name prefixing, normalization). Corrupted-history bugs are
+ often *introduced by* a transform; capture before the bytes hit the wire = ground
+ truth (URL, model, params, full messages array, tool schemas, cache markers).
+- **Capture the raw response/error too** (status + error JSON — it usually *names*
+ the defect, e.g. `MissingToolResultsError`).
+- **Redact only secrets** (auth headers, vault-injected fields) — never the body.
+ **Technique = partial masking, NOT removal** (user-decided): reveal a few first +
+ last real chars with `…redacted…` between (e.g. `sk-…redacted…f9a`), **graduated by
+ length** so short secrets reveal less (tiers in §6). The field stays present and
+ **diffable** (tells you if a secret changed/rotated — serves the §3.1 cache-bust
+ diff) without storing the live value.
+ **Mechanism (user-decided): each extension self-redacts IN ISOLATION — no shared
+ helper.** Logs are self-generated, so they are self-redacted: the extension that
+ owns the data — it alone knows what is secret and *how* to censor it — masks the
+ value in **its own code, at the log call-site**, before the record is built. There
+ is **no shared `redact()` utility and nothing in the kernel** for this: a shared
+ helper would couple every secret-handling extension to one algorithm (violates
+ **P1** feature-as-a-library + kernel-minimalism) and contradicts "the code decides
+ *how* to censor." Consistency comes from the **harness, not shared code** (**P7**):
+ the §9 rule makes self-redaction mandatory and documents the default tiers; **each
+ extension reimplements them locally**. Duplicating a tiny mask across the few
+ secret-handling exts (providers, auth) is **intentional — isolation over DRY**.
+ Result: raw secrets never leave the producing extension's code — they reach neither
+ the sink nor the journal.
+ *(Supersedes the earlier "shared helper / mask at mint" sketches. Refines D2: the
+ observability ext owns redaction POLICY/config + store/query; the redaction ACT is
+ each source extension's own, self-contained.)*
+ **Caveat (honest):** no scheme can mask a secret nobody marked — so the real
+ enforcement is the §9 harness rule; an optional cheap sink-side pattern-scan
+ (`sk-…`) may backstop obvious leaks but is not relied upon.
+- **Retain successful requests in-window** (not error-only) so "diff failing vs a
+ known-good chat" works. Cost: bodies are large → retention/rotation + compression.
+- **Volume control (cache warming):** stamp the cheap `prefix.fingerprint` +
+ cache-token counts on *every* request, but persist the **full body only when the
+ fingerprint changed** (or on error) — keeps the bust findable without storing
+ every warming send verbatim.
+- **Store fat, serve thin:** the blob lives in the store; the agent gets a **diff**
+ or a **validator verdict**, never the raw 200KB.
+- *Future affordance (free from the data):* replay a stored request to reproduce.
+
+### D6 — Per-extension observability: free by default, rich by opt-in (P4)
+- **Default = free:** the auto-scoped structured handle (D4) makes *every*
+ extension observable for nothing — no per-extension logging contract to write.
+- **Opt-in = a typed debug surface** only where an extension has domain-specific
+ diagnostics worth standardizing (queue's enqueue/dequeue/deliver lifecycle).
+ Lives in that extension's own **P7 harness**, loaded by the debugging agent only
+ when investigating that extension. Mandating a hand-written contract per
+ extension would be the boilerplate P4 warns against.
+- **Worked boundary (who computes an attribute):** the `prefix.fingerprint` (§3.1)
+ can't be computed by the generic collector — only the cache-warming/provider
+ extension knows where the cacheable prefix ends. So that extension stamps the
+ attribute; the collector just stores it. The template for every "who computes
+ this attribute?" question.
+
+### D7 — Performance posture
+Negligible. An agent turn is **LLM-network-bound (seconds)**; span/attribute work
+is microseconds — in the noise. The only real cost is **write I/O + volume**,
+solved by: non-blocking append (D3) + out-of-process normalize + **don't trace
+token deltas** (high-frequency *and* redundant — the chunk log already has final
+text) + levels/sampling + fail-safe drop.
+
+### D8 — The "easy view": one compact projection, served to agent AND human
+A single-line-per-event **transcript skeleton** of how a chat was built — prompt
+assembly, thinking, tool calls, timings, sizes — each line collapsing to a summary
+(`ai thought 1.8s`, `tool read_file ran 0.5s → ok (2.0k)`), expandable to the
+verbatim span. Key realizations:
+- **It's a *projection*, not new infra** — a pure `spans → text` formatter in the
+ query layer (P2: zero I/O, exhaustively unit-testable). NOT a frontend, NOT a
+ graphical timeline (that's out-of-scope frontend later, which can consume the
+ same projection).
+- **Same artifact for both consumers** — this compact skeleton IS the
+ "store-fat-serve-thin" *thin overview* the agent reads first before drilling
+ into a span. The human "easy view" is that same text rendered to a terminal/
+ markdown. So it *reinforces* agent-first; it does NOT reopen the human-viz
+ question (D1) — no heavy infra, no design deformation.
+- **One genuinely new capture it forces: prompt-assembly provenance** (see §4) —
+ the context-filter chain (§3.2) records each contribution as a *segment* so we
+ can show `persona(1180) · tool:read_file(300) · skill:web(400) · user(412)`.
+ Cheap: *segmentation metadata over the verbatim body we already store* (D5),
+ not a copy.
+- **Interlocks with cache debugging (§3.1):** a prefix-fingerprint bust attributes
+ to the exact contributor whose segment changed ("skill:web 400→420 between warm
+ and real send").
+- **Doubles as a completeness test:** if the easy view reconstructs how the chat
+ was built end-to-end, the §4 taxonomy captured enough — every skeleton line must
+ be backed by a real span/segment.
+
+### D9 — Optimization analytics = derived aggregates, NOT a separate metrics pillar
+A third consumption pattern over the SAME spans (after incident-debugging and the
+D8 easy view): longitudinal roll-ups to tune Dispatch itself — token size per turn,
+model generation time, tool-call durations, tool/skill usage frequency, error
+rates. **P4 call: these are `GROUP BY` queries over the span store, not a separate
+counters/histograms pipeline.** Everything needed is already a span attribute (turn
+tokens, step/tool durations, tool-call counts, `isError`, D8 segment sizes) — no
+new capture. (The "metrics pillar" machinery earns nothing here: no dashboards, no
+alerting — agent-first.)
+- **Cost/benefit interlock (the high-value one):** cross D8 `prompt.assembly`
+ segment sizes — the *standing cost* a tool/skill/persona imposes on EVERY turn
+ (context budget + cache pressure) — against usage frequency + success — the
+ *realized benefit*. Ranks contributors by cost/benefit → directly answers
+ "should we keep this tool in the definitions?" A rarely-used tool with a fat
+ definition is pure overhead and a cache-bust risk.
+- **Objective vs. interpreted:** counts/durations/error-rates are objective SQL.
+ "Is the tool's description good enough for the model to know when to use it?" is
+ a *hypothesis* an analysis agent forms from those signals + samples (defined but
+ never called; called then result ignored; high input-schema error rate) — not a
+ pure metric. Agent-interpreted, consistent with agent-first.
+- **Retention asymmetry (the one real new requirement):** fat verbatim data (D5)
+ rotates out fast; cheap aggregates are rolled up and kept long for trend signal.
+ A periodic rollup `scheduledJob` (§2.3 scheduler) writes compact daily/weekly
+ summaries that survive raw-trace pruning.
+- **Three consumers, one capture:** incident-debug (one trace) · D8 easy view (one
+ chat) · D9 analytics (many turns) — all the same spans. The capture is the asset;
+ every consumer is a cheap projection/aggregation. Validates the D2 boundary.
+
+### D10 — Logs/spans are a one-way emission, not a feature channel (clarifies D2/D3/§5.4)
+Answering "do logs pass through other extensions?": **no.** Two distinct flows that
+never mix:
+- **Regular (feature) data** — extension→extension via typed contracts / hooks /
+ services (§3.5, §5.4): in-band where it matters (filters awaited, services return
+ values, the provider stream consumed by the kernel), type-anchored, **affects the
+ turn outcome**.
+- **Logged (observability) data** — ANY extension → its auto-scoped `host.logger`/
+ span → kernel mints the record (stamps `extensionId`/ids/timing; secrets are
+ already self-redacted by the source, D5) → `sink.emit` → **journal** →
+ out-of-process **collector** → store →
+ debugging agent. **One-way, fire-and-forget** (D3/D7): never awaited, never changes
+ a turn, swallowed on error.
+
+**Each extension authors its OWN logs** (handle auto-stamped with its id, D4). A
+sibling feature extension **never receives** another's logs — that would be feature
+coupling *through logs* (an anti-pattern; cross-feature reaction is what hooks are
+for). The **sole reader** is the `observability` extension, off the (out-of-process)
+collector/store + the in-process **`onAny`** firehose + the AgentEvent tap — the
+§5.4 "observability only, never feature code" exception. **Many independent one-way
+producers, exactly one privileged consumer** — so logging can never become a
+backdoor feature bus.
+
+---
+
+## 3. Validation — the bug catalog (P4: earn it against real failures)
+
+Each row is a real (old-Dispatch) failure → the query/diff that finds it. This is
+the acid test that the design pays for itself.
+
+| Failure | How the trace finds it |
+|---|---|
+| **Chat crashed** | Spans written incrementally → trace ends at the last span before death; collector's EOF stamp shows "process exited after span X". Read the tail. |
+| **API rejected corrupted history** | `provider.request` span holds the exact post-transform body + raw rejection; an incomplete tool call is literally a tool-call span with no matching tool-result span; a `reconcile.repair` span shows any auto-fix. |
+| **Queued message shown twice** | Two `deliver` spans for one message id (a count query). |
+| **Queued message in wrong chat** | The enqueue span's `target.conversationId` ≠ where it delivered; filter by message id across conversations → the misroute is explicit. |
+| **Prompt cache returns 0% hit** *(new)* | See §3.1 — diff consecutive verbatim requests' cacheable prefixes; a `prefix.fingerprint` attribute flags the bust; span timestamps reveal a cache-warming gap > TTL. |
+
+### 3.1 Worked example — the prompt-cache 0%-hit bug (cache warming)
+**The system (cache warming):** provider prompt caches expire ~5 min. To keep a
+chat's cache warm without a user message, periodically **resend the (rewound)
+conversation** to refresh the cached prefix — staying warm until the real next
+message lands a hit.
+**The bug:** occasionally the API reported **0% cache read** with no way to debug
+why. Cache hits depend on **byte-exact prefix identity**, so a miss has many silent
+causes, all invisible before now:
+- the cacheable prefix changed (tool schemas reserialized in different key order; a
+ volatile value crept into the system prompt — timestamp/date/nonce; a
+ skill/agent injection changed; attachment re-encoded);
+- the `cache_control` breakpoint moved or wasn't set;
+- the **cache-warming request's prefix wasn't byte-identical** to the prefix the
+ next real message extends → you warmed the *wrong* cache;
+- provider-side: the warming send fired late (gap > TTL), eviction, model change.
+
+**Why this design nails it:**
+- Verbatim capture (D5) of **every** provider request — *including cache-warming
+ sends, flagged `warm` vs `real`* — plus **cache-token counts from the response**
+ (`cache_read`, `cache_creation`).
+- A **`prefix.fingerprint`** attribute (hash of the cacheable prefix up to the
+ `cache_control` breakpoint): a cache bust = the fingerprint changed unexpectedly
+ between a warming send and the next real send. One grouped query flags it; then
+ diff the two bodies to see *which bytes* diverged.
+- **Bonus insight this surfaces:** it makes **non-deterministic serialization** (a
+ Map's key order, unstable tool-schema generation) — an otherwise invisible
+ cache-killer — show up as a visible diff. Capture turns "0% and no idea why" into
+ "these 14 bytes of the prefix changed, introduced by transform Y."
+
+---
+
+## 4. Capture taxonomy (draft — the §-next thread)
+
+Span kinds, driven by the bug list (attributes are illustrative, not final):
+- **turn** (root) — `conversationId`, `turnId`, model, status, token usage.
+- **step** — one LLM round-trip within a turn.
+- **provider.request** *(the star, D5)* — verbatim body (post-transform), headers
+ (redacted), raw response/error, `cache_control` presence, `prefix.fingerprint`,
+ `warm|real`, cache-read/creation tokens, latency.
+- **prompt.assembly** *(D8)* — ordered composition segments of the request:
+ `{contributor extension.id, kind (persona|tool-def|skill|agent-profile|history|
+ user-msg), role, length, contentRef→verbatim body}`. Powers the easy-view
+ prompt-assembly render + per-contributor cache-bust attribution.
+- **tool-call** — `toolCallId`, name, input, result, isError, duration.
+- **reconcile.repair** *(§3.4)* — what the load-time repair changed.
+- **queue.enqueue / dequeue / deliver** — message id, source/target conversation,
+ links to the turn it caused. *(queue/session-features ext not built yet.)*
+- **process.lifecycle** — boot / shutdown / **crash** (collector-synthesized).
+- **structured log** — leveled, attributed, correlated line (the evolved Logger).
+
+---
+
+## 5. Vocabulary (user-approved; promote to GLOSSARY.md when this lands)
+
+Approved this session: **trace**, **span**, **attribute**, **structured log**,
+**observability**, **redaction**, **debugging agent**. Standard/training-baked —
+they cost zero glossary justification (P6).
+- **trace** — the full correlated record of one operation (≠ *history*/chunk log).
+- **span** — one timed unit of work in a trace (parent + links + attributes).
+- **attribute** — typed key/value on a span/log. *(aliases to avoid: tag, field, metadata)*
+- **structured log** — a leveled, attributed, correlated record. *(avoid: debug message)*
+- **span link** — a causal edge to another span/trace (cross-feature).
+- **collector** — the out-of-process sink that normalizes + stores + serves traces.
+- **journal** — the append-only durable buffer file between app and collector.
+- **cache warming** — periodically resending a (rewound) conversation to keep its
+ provider prompt cache warm within the ~5-min TTL. *(aliases to avoid: reheat,
+ cache reheating)* — **user-decided.** Distinct from **wake** (the Claude
+ wake-probe scheduler in old code) — keep the two concepts separate.
+- **redaction** — replacing a secret's middle with `…redacted…`, keeping the first
+ 3 + last 3 real chars (partial masking — keeps the field present + diffable; never
+ dropped). *(aliases to avoid: censoring, scrubbing, masking-as-removal)* —
+ technique **user-decided.**
+
+---
+
+## 6. Open threads (not yet decided)
+
+- **Span/attribute vocabulary** — finalize §4 (next up).
+- **Structured-`Logger` ABI change** — today it's unstructured `(message,
+ ...args)`; evolve to leveled + attributed + auto-scoped + correlated. Kernel
+ owns `Logger` → this is an ABI change with `lsp references` fan-out. Decide:
+ evolve in place vs. add a parallel structured channel. **Proposed shape: §8**
+ (awaiting the A/B call).
+- **Retention/rotation sizing** — verbatim bodies are large; TTL + size cap +
+ compression; keep successes long enough for diffing.
+- **Journal / IPC line protocol** — NDJSON to a pipe vs. unix socket vs. append
+ file; framing; backpressure (drop-oldest/sample). **The journal file IS the durable
+ queue** — there is **no in-memory log queue** in either process (that's the
+ crash-safety win): a normal app crash/OOM does NOT lose already-`write()`-en lines
+ (bytes live in the OS buffer, not the process heap); the collector resumes tailing
+ from its last offset on restart. *Open:* **fsync cadence** — per-line (durable,
+ slow) vs. periodic vs. none (fast); only a kernel-panic/power-loss risks the last
+ unflushed line (§3.4 bound).
+- **Redaction policy** — *DECIDED:* (b) **short-secret guard** = graduated tiers by
+ length: **≥13 → reveal 3** each side · **11–12 → 2** · **8–10 → 1** · **≤7 → full
+ mask** (the `10` overlap resolved conservatively to reveal-1; `…redacted…` token is
+ fixed-width so it never leaks the hidden length). (c) **who redacts** = **each
+ extension self-redacts in isolation — NO shared helper, nothing in the kernel**
+ (D5/§9): the producing ext masks in its own code at the call-site. *Still open:*
+ (a) the exact secret-field/header list — declared **per-extension** (it knows its
+ own; e.g. openai-compat: `authorization` header + any vault-injected body field).
+- **Cache-token fields** — *RESOLVED by inspection:* `Usage` already carries
+ `cacheReadTokens?` / `cacheWriteTokens?` (`packages/kernel/src/contracts/provider.ts`)
+ — **no provider CR needed**; only confirm the openai-compat provider *populates*
+ them (instrumentation detail, not a contract gap).
+- **Collector supervision** — host-bin spawns first / drains last; restart +
+ resume-tail; what if the collector dies.
+- **Levels & default capture set** — what's on by default (deltas off).
+- **Easy-view rendering (D8)** — the projection format + delivery surface (CLI
+ command vs. transport route returning markdown; frontend later), and char-counts
+ (free) vs token-counts (needs a tokenizer) in the skeleton.
+- **Analytics roll-ups (D9)** — rollup table shape + retention asymmetry (raw
+ traces short, aggregates long); `GROUP BY` indexes (tool_name, model, kind, time).
+ *(The periodic-job mechanism already exists: `host.scheduler.register` —
+ `packages/kernel/src/contracts/extension.ts`; only the table shape + retention +
+ indexes remain to design.)*
+- **`onAny` firehose (kernel)** — reserved (§5.4) but unbuilt: confirm it's the
+ capture tap and define its shape (one listener; payload = hook id + payload +
+ correlation).
+- **Debugging-agent delivery + the "grab an ID" entry point** — how an id is handed
+ to the agent; rides on the (unbuilt) `agents` extension + its P7 harness.
+- **Sequencing / dependencies** — build the substrate now (Logger ABI, onAny,
+ collector, store, core span kinds); instrument the rest as their features land
+ (queue spans ← session-features; `prompt.assembly` ← context-filter chain;
+ debugging agent ← agents ext).
+- **Extension-logging rule placement** — *DECIDED:* `.dispatch/rules/extension-logging.md`
+ (user's call); wired into the ORCHESTRATOR.md §3 scoping map for extension agents.
+ Content drafted (§9, tribal-knowledge only); authored when the substrate is built.
+
+## 7. Deferred / out of scope
+- Replay-to-reproduce (the data supports it; build later).
+- Adversary/multi-tenant isolation (we defend faults, not adversaries — §3.7).
+- Human dashboards / metrics viz (agent-first; revisit only if a human need
+ appears).
+
+---
+
+## 8. Logger ABI — proposed shape (resolving open-question #1; awaiting A/B)
+
+> **STATUS: proposal, NOT yet decided.** Forced by: structured records ·
+> auto-scoped to extension (D4/D6) · explicit correlation / no ambient (P3) ·
+> spans (D8/D9) · fire-and-forget non-blocking emit (D3) · sink-injected (purity).
+
+### Proposed types (kernel ABI)
+```ts
+type Level = "debug" | "info" | "warn" | "error"; // keep 4 (P6)
+type Attributes = Readonly<Record<string, string | number | boolean | null>>; // flat: serializable (D3) + queryable (D9)
+
+interface LogContext {
+ readonly extensionId: string; // auto-stamped by host (D6) — not caller-supplied
+ readonly conversationId?: string;
+ readonly turnId?: string;
+ readonly spanId?: string;
+}
+
+interface Logger {
+ readonly debug: (msg: string, attrs?: Attributes) => void;
+ readonly info: (msg: string, attrs?: Attributes) => void;
+ readonly warn: (msg: string, attrs?: Attributes) => void;
+ readonly error: (msg: string, attrs?: Attributes & { err?: unknown }) => void;
+ readonly child: (ctx: Partial<LogContext> & { attrs?: Attributes }) => Logger; // explicit value, passed down (P3)
+ readonly span: (name: string, attrs?: Attributes) => Span;
+}
+
+interface Span {
+ readonly id: string;
+ readonly log: Logger; // pre-bound to this span
+ readonly setAttributes: (attrs: Attributes) => void;
+ readonly addLink: (target: { spanId: string; turnId?: string }, reason?: string) => void; // D4 causal edges
+ readonly child: (name: string, attrs?: Attributes) => Span; // step → tool-call nesting
+ readonly end: (outcome?: { err?: unknown; attrs?: Attributes }) => void; // records duration + status
+}
+```
+
+### Why this is P3-safe (the OTel contrast)
+`child()` / `span()` return **explicit values you pass down** (orchestrator →
+`runTurn` → `ctx.log` into tools). No hidden "current span"; correlation travels
+as an argument — exactly why we could drop OTel's `AsyncLocalStorage`.
+
+### Purity story (a "kernel logger" that touches no I/O)
+`Logger`/`Span` are **pure record-builders over an injected `LogSink {
+emit(record): void }`**. The kernel mints records (auto-stamps `extensionId`,
+ids/timing) and calls `sink.emit`; the **sink is a host-bin bootstrap dependency**
+(like the storage factory — available before any extension activates, per the
+"Logger always available" contract). The sink writes the **journal** (D3); the
+observability **collector owns the other end**. Kernel testable with a fake sink,
+zero mocks.
+
+### The fork — DECIDE THIS (open-question #1)
+- **A — evolve `Logger` in place (recommended).** Replace the string logger; the
+ simple call `info("msg")` still compiles (attrs optional) but flows through the
+ same correlated logger → **A subsumes B's ergonomics**. Con: ABI fan-out — but
+ tiny now (a few `logger.error("...", err)` sites → `{ err }`); only grows later.
+- **B — parallel structured channel, keep the string logger.** Zero churn, but two
+ paths → drift, and casual `logger.info` stays uncorrelated / invisible to the
+ store — defeats "everything queryable" and grates against P8.
+
+### Sub-decisions baked into the sketch
+- Inject `{ sink, now, newId }` — deterministic ids/timing in tests (§3.6).
+- Flat scalar attributes (nested → stringify) — keeps D9 `GROUP BY`/indexes clean.
+- `error(msg, { err })` normalizes today's positional `Error` arg.
+- **Downstream contract change:** tools get **`ctx.log`** (span-bound) so they log
+ correlated without a global (P3) — a `ToolContract` ctx addition (§3.3 ctx
+ already carries `signal`/`onOutput`).
+
+---
+
+## 9. Harness artifact — the extension-logging rule (tribal-knowledge ONLY, P6/P7)
+
+> **Status:** planned deliverable (user-requested). Extension-scoped (P7): the
+> knowledge lives in the **harness**, and **each extension implements logging +
+> redaction in its OWN isolated code** — there is NO shared logging/redaction helper
+> to couple them (P1; isolation over DRY). Loaded by every extension-author agent.
+> **Placement DECIDED** (user): `.dispatch/rules/extension-logging.md`, wired into the
+> ORCHESTRATOR.md §3 scoping map for extension agents. **Write it when we build the
+> substrate, not before.** P6 governs hard: state ONLY what a frontier model would
+> get wrong about THIS system; omit anything inferable.
+
+**MUST state (non-inferable, project-specific):**
+- **Self-redact your own secrets before logging — in your own code.** There is no
+ shared `redact()` and nothing in the kernel does it for you. YOU decide what is
+ secret and how to censor it. Default censoring = the §6 tiered partial mask
+ (reimplement it locally; deviate if your secret type warrants).
+- Use the injected `host.logger` / the span / `ctx.log` you're handed — never
+ `console.*`, never a hand-rolled logger or your own correlation ids.
+- **Don't set `extensionId` / invent an id scheme** — it is auto-stamped (D4).
+- **Attributes are flat scalars** (nested → stringify) — D9 GROUP BY + journal
+ serialization need it.
+- **Don't log token deltas / per-chunk streaming** (D7) — redundant + noisy.
+- **Logs are one-way (D10)** — never read another extension's logs; cross-feature
+ reaction is a hook, not a log.
+- Edge I/O (providers): capture the request **verbatim, post-transform, at the fetch
+ edge** (D5), self-redacting secret headers/fields in your own code.
+
+**Must NOT state (inferable — omit, P6):**
+- What logging/levels are for; info/warn/error semantics; "log your errors."
+- How to call `logger.info("msg", {attrs})` (obvious from the type).
+- Generic "don't log sensitive data" platitudes (replaced by the concrete
+ self-redaction rule) or any SQL/GROUP BY explanation.
+
+---
+
+## 10. Build plan — Phase A (logging substrate)
+
+> **Status:** PLANNED, prompts drafted (`prompts/phase-a-{kernel-logging,journal-sink}.md`),
+> awaiting final user review before summon. **Goal:** every extension + every turn
+> emits structured records durably into the **journal file**. Phase A = **1 process**
+> (main app + in-process sink); the collector is process 2 (Phase B). Logger ABI = **A**.
+
+### Record contract — frozen FIRST (both units depend on it)
+`packages/kernel/src/contracts/logging.ts` (kernel owns it). Agent finalizes the
+exact shape; load-bearing constraints:
+- **`LogRecord`** = flat, JSON-serializable **discriminated union**: `log` line |
+ `span-open` | `span-close`. Spans emitted **incrementally** (open at `span()`,
+ close at `end()`) so a crashed turn is reconstructable from the journal (D3).
+ Every variant carries correlation (`extensionId` auto-stamped + optional
+ `conversationId`/`turnId`/`spanId`/`parentSpanId`), `timestamp`, flat `attributes`
+ (queryable scalars), **and an optional `body` blob** (string) for large verbatim
+ payloads — the pre-mutation prompt now, the verbatim provider request later
+ (store-fat-serve-thin: query tools serve diffs/slices, not the raw blob).
+- **`LogSink { emit(record): void }`** — fire-and-forget; the kernel never lets a
+ sink error escape into a turn (D7).
+- **`Logger` / `Span`** — the §8 shapes (leveled/attributed/auto-scoped/correlated;
+ `child()`/`span()` return explicit values — P3, no ambient).
+
+### Unit 1 — `kernel-logging` (ONE coordinated kernel owner; single-writer over kernel)
+Interlocked ABI change (cf. the `tabId→conversationId` rename). Stage commits
+(**contract checkpoint → host → runtime**) so the contract freezes early for Unit 2.
+Owns: `contracts/logging.ts` (new), `contracts/extension.ts` (evolve `Logger`),
+`contracts/tool.ts` (+ `ctx.log`), `contracts/index.ts` (re-export), `host/host.ts`
+(+ `logSink` on `HostDeps`, mirroring `storageFactory` @ host.ts:34/60; build each
+extension's auto-scoped logger that stamps `manifest.id`) + `host.test.ts`,
+`runtime/{run-turn,dispatch}.ts` (open/close turn/step/tool-call spans; thread
+`ctx.log`) + tests. **The `step` span records the verbatim pre-mutation prompt**
+(messages + tools + opts) in its `body` — the **"BEFORE"** capture. Pure
+record-builder injected with `{ now, newId }`.
+
+### Unit 2 — `journal-sink` (bootstrap library, NOT an extension)
+`packages/journal-sink/`. Implements kernel `LogSink`: pure `record → one NDJSON
+line` core + thin fs append edge + rotation/backpressure (drop-oldest + warn — D3
+fail-safe). Imports the frozen `LogRecord`/`LogSink` TYPES; never redefines them.
+
+### host-bin wiring (orchestrator CR)
+Construct the sink, add `logSink` to `deps: HostDeps` (main.ts:77) so `host.logger`
+works before any extension activates. Root tsconfig ref + host-bin dep on
+`@dispatch/journal-sink` + `bun install` + biome import-sort (same wiring as
+tool-read-file, Step 2).
+
+### Order & parallelism
+Sequential for safety (record type is a hard dep): **Unit 1 (freeze contract →
+finish) → Unit 2 → host-bin wiring**. Optional overlap: start Unit 2 once Unit 1's
+`logging.ts` checkpoint is committed.
+
+### Open sub-decisions (lock while finalizing prompts)
+- Package name `journal-sink` (vs `log-journal`). · fsync cadence (default periodic).
+- Journal file path + rotation policy (size cap; on-disk location).
+**Verify:** `typecheck`/`test`/`check` clean; live boot → `host.logger` lines land in
+the journal file.
+
+### Next step (after Phase A) — the "AFTER" capture
+`provider.request` verbatim post-transform (D5) inside `provider-openai-compat`: the
+exact serialized request bytes + raw response/error + cache tokens, auth header
+self-redacted. This **completes full round-trip rebuild** and unlocks the
+**before↔after diff** (kernel "before" vs wire "after" → pinpoints transform bugs).
+Depends only on the Phase A Logger ABI; lives entirely in the provider extension.
+*(Full per-extension prompt-segment provenance — D8 — comes later, with the
+context-filter chain.)*
+
+---
+
+## 11. Phase B preview — collector flush into SQLite (NOT built; model only)
+
+> Captured so it's not lost; the exact knobs are open Phase B sub-decisions.
+
+The flush into the SQLite **store** is the **collector's** job (process 2), async +
+continuous, **decoupled from turns** — the app only appends to the journal and never
+waits. Per tick the collector:
+1. **Tails** the journal — reads new complete lines since its last committed
+ byte-offset (short poll loop or fs-watch).
+2. **Batches** them into **one SQLite transaction**, commits.
+3. **Advances a persisted consume-offset.**
+
+- **Cadence:** per batch/tick, bounded by *interval OR batch-size, whichever first*
+ (e.g. ~250 ms or N records) — never one-txn-per-record (avoids per-record fsync),
+ never per-turn. Sub-second / near-real-time.
+- **Crash-safety = at-least-once + idempotent:** resume from the persisted offset on
+ restart → may re-read a few lines → writes are idempotent (`INSERT OR IGNORE` on a
+ record/span id). No loss; harmless reprocess. This is what lets you query a chat
+ **after the app that produced it crashed** (D3).
+- **Queryability lag** ≈ one batch interval; post-mortem still works (journal is
+ durable; the collector consumes whenever it's up).
+- **Open (Phase B):** poll vs fs-watch; interval/batch-size; offset storage (store
+ metadata vs sidecar); dedup key; + the store schema/indexes (§6).
diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json
index 12ae633..a3e24c8 100644
--- a/packages/host-bin/package.json
+++ b/packages/host-bin/package.json
@@ -11,6 +11,7 @@
"@dispatch/provider-openai-compat": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
"@dispatch/transport-http": "workspace:*",
- "@dispatch/tool-read-file": "workspace:*"
+ "@dispatch/tool-read-file": "workspace:*",
+ "@dispatch/journal-sink": "workspace:*"
}
}
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index 0cb0d37..0c795b8 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -2,14 +2,16 @@ import { mkdirSync } from "node:fs";
import { dirname } from "node:path";
import { extension as authApikeyExt } from "@dispatch/auth-apikey";
import { extension as conversationStoreExt } from "@dispatch/conversation-store";
+import { createJournalSink } from "@dispatch/journal-sink";
import {
type ConfigAccess,
createBus,
createHost,
+ createLogger,
type EventsEmitter,
type Extension,
type HostDeps,
- type Logger,
+ type LogDeps,
type PermissionGate,
type ScheduledJob,
type SecretsAccess,
@@ -22,15 +24,6 @@ import { extension as toolReadFileExt } from "@dispatch/tool-read-file";
import { createServer, extension as transportHttpExt } from "@dispatch/transport-http";
import { configMapToAccess, envToConfigMap } from "./config.js";
-function createConsoleLogger(): Logger {
- return {
- debug: (message: string, ...args: unknown[]) => console.debug(`[debug] ${message}`, ...args),
- info: (message: string, ...args: unknown[]) => console.info(`[info] ${message}`, ...args),
- warn: (message: string, ...args: unknown[]) => console.warn(`[warn] ${message}`, ...args),
- error: (message: string, ...args: unknown[]) => console.error(`[error] ${message}`, ...args),
- };
-}
-
function createEmptySecrets(): SecretsAccess {
return {
get: async () => null,
@@ -64,7 +57,11 @@ const CORE_EXTENSIONS: readonly Extension[] = [
];
async function boot(): Promise<void> {
- const logger = createConsoleLogger();
+ const journalPath = process.env.DISPATCH_JOURNAL ?? "./.dispatch/journal/app.ndjson";
+ mkdirSync(dirname(journalPath), { recursive: true });
+ const logSink = createJournalSink({ path: journalPath });
+ const logDeps: LogDeps = { now: () => Date.now(), newId: () => crypto.randomUUID() };
+ const logger = createLogger({ extensionId: "host-bin" }, logSink, logDeps);
const dbPath = process.env.DISPATCH_DB ?? "./.dispatch-data/dispatch.db";
mkdirSync(dirname(dbPath), { recursive: true });
@@ -83,6 +80,8 @@ async function boot(): Promise<void> {
scheduler: createNoopScheduler(),
bus: createBus(logger),
events: createNoopEvents(),
+ logSink,
+ logDeps,
};
const host = createHost(CORE_EXTENSIONS, deps);
@@ -102,6 +101,7 @@ async function boot(): Promise<void> {
const port = Number(process.env.BACKEND_PORT) || Number(process.env.PORT) || 24203;
const server = Bun.serve({ fetch: app.fetch, port });
logger.info(`Dispatch listening on http://localhost:${server.port}`);
+ console.info(`Dispatch listening on http://localhost:${server.port}`);
}
boot().catch((err) => {
diff --git a/packages/journal-sink/package.json b/packages/journal-sink/package.json
new file mode 100644
index 0000000..f17d476
--- /dev/null
+++ b/packages/journal-sink/package.json
@@ -0,0 +1,11 @@
+{
+ "name": "@dispatch/journal-sink",
+ "version": "0.0.0",
+ "type": "module",
+ "private": true,
+ "main": "dist/index.js",
+ "types": "dist/index.d.ts",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*"
+ }
+}
diff --git a/packages/journal-sink/src/index.ts b/packages/journal-sink/src/index.ts
new file mode 100644
index 0000000..02e3e2c
--- /dev/null
+++ b/packages/journal-sink/src/index.ts
@@ -0,0 +1,2 @@
+export type { ClockOps, FsOps, JournalSinkOpts } from "./journal-sink.js";
+export { createJournalSink, serialize } from "./journal-sink.js";
diff --git a/packages/journal-sink/src/journal-sink.test.ts b/packages/journal-sink/src/journal-sink.test.ts
new file mode 100644
index 0000000..a6531c7
--- /dev/null
+++ b/packages/journal-sink/src/journal-sink.test.ts
@@ -0,0 +1,309 @@
+import { mkdtemp, readFile, rm, stat } from "node:fs/promises";
+import { tmpdir } from "node:os";
+import { join } from "node:path";
+import type { LogRecord } from "@dispatch/kernel";
+import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
+import type { ClockOps, FsOps } from "./journal-sink.js";
+import { createJournalSink, serialize } from "./journal-sink.js";
+
+// --- Fixtures: one per LogRecord variant ---
+
+const logRecord: LogRecord = {
+ kind: "log",
+ level: "info",
+ msg: "hello world",
+ timestamp: 1700000000000,
+ extensionId: "test-ext",
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ spanId: "span-1",
+ attributes: { key: "value" },
+};
+
+const logRecordMinimal: LogRecord = {
+ kind: "log",
+ level: "debug",
+ msg: "minimal",
+ timestamp: 1700000000001,
+ extensionId: "ext-min",
+};
+
+const spanOpenRecord: LogRecord = {
+ kind: "span-open",
+ spanId: "span-2",
+ name: "provider.request",
+ timestamp: 1700000000100,
+ extensionId: "test-ext",
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ parentSpanId: "span-1",
+ attributes: { model: "gpt-4" },
+ links: [{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }],
+ body: "verbatim request body",
+};
+
+const spanOpenRecordMinimal: LogRecord = {
+ kind: "span-open",
+ spanId: "span-3",
+ name: "tool.call",
+ timestamp: 1700000000200,
+ extensionId: "ext-tool",
+};
+
+const spanCloseRecord: LogRecord = {
+ kind: "span-close",
+ spanId: "span-2",
+ name: "provider.request",
+ timestamp: 1700000000500,
+ durationMs: 400,
+ status: "ok",
+ extensionId: "test-ext",
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ parentSpanId: "span-1",
+ attributes: { cacheHit: true },
+ links: [{ spanId: "span-0" }],
+};
+
+const spanCloseRecordError: LogRecord = {
+ kind: "span-close",
+ spanId: "span-4",
+ name: "failing-step",
+ timestamp: 1700000000600,
+ durationMs: 100,
+ status: "error",
+ extensionId: "ext-err",
+};
+
+// --- Pure core: serialize ---
+
+describe("serialize", () => {
+ const allRecords = [
+ { name: "log (full)", record: logRecord },
+ { name: "log (minimal)", record: logRecordMinimal },
+ { name: "span-open (full)", record: spanOpenRecord },
+ { name: "span-open (minimal)", record: spanOpenRecordMinimal },
+ { name: "span-close (full)", record: spanCloseRecord },
+ { name: "span-close (error)", record: spanCloseRecordError },
+ ];
+
+ for (const { name, record } of allRecords) {
+ it(`produces exactly one NDJSON line for ${name}`, () => {
+ const line = serialize(record);
+ expect(line.endsWith("\n")).toBe(true);
+ expect(line.endsWith("\n\n")).toBe(false);
+ const parsed = JSON.parse(line);
+ expect(parsed).toEqual(record);
+ });
+
+ it(`round-trips ${name} through JSON.parse`, () => {
+ const line = serialize(record);
+ const roundTripped: LogRecord = JSON.parse(line);
+ expect(roundTripped).toEqual(record);
+ expect(roundTripped.kind).toBe(record.kind);
+ });
+ }
+
+ it("preserves all LogRecord variant kinds", () => {
+ expect(JSON.parse(serialize(logRecord)).kind).toBe("log");
+ expect(JSON.parse(serialize(spanOpenRecord)).kind).toBe("span-open");
+ expect(JSON.parse(serialize(spanCloseRecord)).kind).toBe("span-close");
+ });
+
+ it("preserves optional fields when present", () => {
+ const parsed = JSON.parse(serialize(spanOpenRecord));
+ expect(parsed.body).toBe("verbatim request body");
+ expect(parsed.links).toEqual([{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }]);
+ expect(parsed.attributes).toEqual({ model: "gpt-4" });
+ });
+
+ it("omits optional fields when absent (no undefined in output)", () => {
+ const parsed = JSON.parse(serialize(logRecordMinimal));
+ expect(parsed.conversationId).toBeUndefined();
+ expect(parsed.turnId).toBeUndefined();
+ expect(parsed.spanId).toBeUndefined();
+ expect(parsed.attributes).toBeUndefined();
+ expect(parsed.body).toBeUndefined();
+ expect("conversationId" in parsed).toBe(false);
+ });
+});
+
+// --- Imperative shell: createJournalSink (fs integration) ---
+
+let tmpDir: string;
+
+beforeEach(async () => {
+ tmpDir = await mkdtemp(join(tmpdir(), "journal-sink-test-"));
+});
+
+afterEach(async () => {
+ await rm(tmpDir, { recursive: true, force: true });
+});
+
+describe("createJournalSink", () => {
+ it("emits records that can be read back as NDJSON", async () => {
+ const path = join(tmpDir, "journal.log");
+ const sink = createJournalSink({ path, fsync: "none" });
+
+ const records = [logRecord, spanOpenRecord, spanCloseRecord];
+ for (const r of records) {
+ sink.emit(r);
+ }
+
+ const content = await readFile(path, "utf8");
+ const lines = content.split("\n").filter((l) => l.length > 0);
+ expect(lines).toHaveLength(3);
+
+ for (let i = 0; i < lines.length; i++) {
+ const parsed: LogRecord = JSON.parse(lines[i] ?? "");
+ expect(parsed).toEqual(records[i]);
+ }
+ });
+
+ it("warns and does NOT throw when writing to a bad path", () => {
+ const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
+ const badPath = join(tmpDir, "nonexistent", "deep", "journal.log");
+ const sink = createJournalSink({ path: badPath, fsync: "none" });
+
+ expect(() => sink.emit(logRecord)).not.toThrow();
+ expect(warnSpy).toHaveBeenCalled();
+ expect(warnSpy.mock.calls[0]?.[0]).toContain("[journal-sink]");
+
+ warnSpy.mockRestore();
+ });
+
+ it("appends to existing file on creation", async () => {
+ const path = join(tmpDir, "journal.log");
+ const { writeFileSync } = await import("node:fs");
+ writeFileSync(path, serialize(logRecord));
+
+ const sink = createJournalSink({ path, fsync: "none" });
+ sink.emit(spanOpenRecord);
+
+ const content = await readFile(path, "utf8");
+ const lines = content.split("\n").filter((l) => l.length > 0);
+ expect(lines).toHaveLength(2);
+ expect(JSON.parse(lines[0] ?? "").kind).toBe("log");
+ expect(JSON.parse(lines[1] ?? "").kind).toBe("span-open");
+ });
+
+ it("warns and drops records when fs.write throws mid-emit", () => {
+ const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
+ const brokenFs: FsOps = {
+ open: () => 1,
+ write: () => {
+ throw new Error("disk full");
+ },
+ close: () => {},
+ rename: () => {},
+ statSize: () => 0,
+ fsync: () => {},
+ };
+ const sink = createJournalSink({ path: join(tmpDir, "j.log"), fs: brokenFs, fsync: "none" });
+
+ expect(() => sink.emit(logRecord)).not.toThrow();
+ expect(warnSpy).toHaveBeenCalled();
+ expect(warnSpy.mock.calls[0]?.[1]).toBeInstanceOf(Error);
+
+ warnSpy.mockRestore();
+ });
+});
+
+// --- Rotation ---
+
+describe("rotation", () => {
+ it("rotates when file exceeds maxBytes", async () => {
+ const path = join(tmpDir, "journal.log");
+ const sink = createJournalSink({ path, maxBytes: 100, fsync: "none" });
+
+ sink.emit(logRecord);
+ sink.emit(logRecordMinimal);
+ sink.emit(spanOpenRecord);
+
+ const content = await readFile(path, "utf8");
+ const lines = content.split("\n").filter((l) => l.length > 0);
+ expect(lines.length).toBeGreaterThan(0);
+
+ let rotatedExists = false;
+ try {
+ await stat(`${path}.1`);
+ rotatedExists = true;
+ } catch {
+ // May not exist if rotation hasn't triggered yet.
+ }
+ expect(rotatedExists).toBe(true);
+
+ const rotatedContent = await readFile(`${path}.1`, "utf8");
+ const allLines = [...rotatedContent.split("\n").filter((l) => l.length > 0), ...lines];
+ for (const line of allLines) {
+ const parsed = JSON.parse(line);
+ expect(["log", "span-open", "span-close"]).toContain(parsed.kind);
+ }
+ });
+});
+
+// --- Fsync ---
+
+describe("fsync", () => {
+ it("calls fsync periodically when mode is periodic", () => {
+ let fsyncCalls = 0;
+ let currentTime = 0;
+ const mockFs: FsOps = {
+ open: () => 1,
+ write: () => {},
+ close: () => {},
+ rename: () => {},
+ statSize: () => 0,
+ fsync: () => {
+ fsyncCalls++;
+ },
+ };
+ const mockClock: ClockOps = {
+ now: () => currentTime,
+ };
+ const sink = createJournalSink({
+ path: join(tmpDir, "j.log"),
+ fs: mockFs,
+ clock: mockClock,
+ fsync: "periodic",
+ });
+
+ sink.emit(logRecord);
+ expect(fsyncCalls).toBe(0);
+
+ currentTime = 6_000;
+ sink.emit(logRecordMinimal);
+ expect(fsyncCalls).toBe(1);
+
+ sink.emit(spanOpenRecord);
+ expect(fsyncCalls).toBe(1);
+
+ currentTime = 12_000;
+ sink.emit(spanCloseRecord);
+ expect(fsyncCalls).toBe(2);
+ });
+
+ it("never calls fsync when mode is none", () => {
+ let fsyncCalls = 0;
+ const mockFs: FsOps = {
+ open: () => 1,
+ write: () => {},
+ close: () => {},
+ rename: () => {},
+ statSize: () => 0,
+ fsync: () => {
+ fsyncCalls++;
+ },
+ };
+ const sink = createJournalSink({
+ path: join(tmpDir, "j.log"),
+ fs: mockFs,
+ fsync: "none",
+ });
+
+ sink.emit(logRecord);
+ sink.emit(logRecord);
+ sink.emit(logRecord);
+ expect(fsyncCalls).toBe(0);
+ });
+});
diff --git a/packages/journal-sink/src/journal-sink.ts b/packages/journal-sink/src/journal-sink.ts
new file mode 100644
index 0000000..8e36fd6
--- /dev/null
+++ b/packages/journal-sink/src/journal-sink.ts
@@ -0,0 +1,171 @@
+import { closeSync, fsyncSync, openSync, renameSync, statSync, writeSync } from "node:fs";
+import type { LogRecord, LogSink } from "@dispatch/kernel";
+
+// --- Pure core (no I/O) ---
+
+/**
+ * Serialize a LogRecord to exactly one NDJSON line (record JSON + newline).
+ * Pure function — no I/O, no side effects.
+ */
+export function serialize(record: LogRecord): string {
+ return `${JSON.stringify(record)}\n`;
+}
+
+// --- Imperative shell (fs edge) ---
+
+/** Injectable fs operations — confined to the edge. */
+export interface FsOps {
+ readonly open: (path: string) => number;
+ readonly write: (fd: number, data: string) => void;
+ readonly close: (fd: number) => void;
+ readonly rename: (oldPath: string, newPath: string) => void;
+ readonly statSize: (path: string) => number;
+ readonly fsync: (fd: number) => void;
+}
+
+/** Clock injection for fsync interval. */
+export interface ClockOps {
+ readonly now: () => number;
+}
+
+export interface JournalSinkOpts {
+ readonly path: string;
+ readonly maxBytes?: number;
+ readonly fsync?: "periodic" | "none";
+ readonly fs?: FsOps;
+ readonly clock?: ClockOps;
+}
+
+const DEFAULT_MAX_BYTES = 50 * 1024 * 1024; // 50 MB
+const FSYNC_INTERVAL_MS = 5_000; // 5 seconds
+
+/**
+ * Create a LogSink that durably appends each record to the journal file
+ * as one NDJSON line. Fail-safe: write errors drop + warn, never throw.
+ * Rotates when file exceeds maxBytes (rename → .1, reopen fresh).
+ */
+export function createJournalSink(opts: JournalSinkOpts): LogSink {
+ const filePath = opts.path;
+ const maxBytes = opts.maxBytes ?? DEFAULT_MAX_BYTES;
+ const syncMode = opts.fsync ?? "periodic";
+ const fs = opts.fs ?? createDefaultFsOps();
+ const clock = opts.clock ?? { now: () => Date.now() };
+
+ const NO_FD = -1;
+ let fd: number;
+ let bytesWritten: number;
+ try {
+ fd = fs.open(filePath);
+ bytesWritten = fs.statSize(filePath);
+ } catch {
+ fd = NO_FD;
+ bytesWritten = 0;
+ }
+ let lastFsyncAt = clock.now();
+
+ function tryOpen(): boolean {
+ if (fd !== NO_FD) return true;
+ try {
+ fd = fs.open(filePath);
+ bytesWritten = 0;
+ return true;
+ } catch {
+ return false;
+ }
+ }
+
+ function rotate(): void {
+ if (fd !== NO_FD) {
+ try {
+ fs.close(fd);
+ } catch {
+ // Ignore close errors during rotation.
+ }
+ }
+ const rotatedPath = `${filePath}.1`;
+ try {
+ fs.rename(filePath, rotatedPath);
+ } catch {
+ // If rename fails, just truncate by reopening.
+ }
+ try {
+ fd = fs.open(filePath);
+ } catch {
+ fd = NO_FD;
+ }
+ bytesWritten = 0;
+ }
+
+ function maybeFsync(): void {
+ if (syncMode !== "periodic" || fd === NO_FD) return;
+ const now = clock.now();
+ if (now - lastFsyncAt >= FSYNC_INTERVAL_MS) {
+ try {
+ fs.fsync(fd);
+ } catch {
+ // Swallow — fail-safe.
+ }
+ lastFsyncAt = now;
+ }
+ }
+
+ const sink: LogSink = {
+ emit(record: LogRecord): void {
+ try {
+ if (!tryOpen()) {
+ console.warn("[journal-sink] cannot open journal, dropping record");
+ return;
+ }
+
+ const line = serialize(record);
+ const lineBytes = Buffer.byteLength(line, "utf8");
+
+ if (bytesWritten + lineBytes > maxBytes) {
+ rotate();
+ if (fd === NO_FD) {
+ console.warn("[journal-sink] rotation failed, dropping record");
+ return;
+ }
+ }
+
+ fs.write(fd, line);
+ bytesWritten += lineBytes;
+ maybeFsync();
+ } catch (err) {
+ // Fail-safe: drop + warn, never throw to the caller (D3/D7).
+ console.warn("[journal-sink] write failed, dropping record:", err);
+ }
+ },
+ };
+
+ return sink;
+}
+
+// --- Default fs ops (real Bun/Node I/O) ---
+
+function createDefaultFsOps(): FsOps {
+ return {
+ open(path: string): number {
+ return openSync(path, "a");
+ },
+ write(fd: number, data: string): void {
+ writeSync(fd, data);
+ },
+ close(fd: number): void {
+ closeSync(fd);
+ },
+ rename(oldPath: string, newPath: string): void {
+ renameSync(oldPath, newPath);
+ },
+ statSize(path: string): number {
+ try {
+ return statSync(path).size;
+ } catch {
+ return 0;
+ }
+ },
+ fsync(fd: number): void {
+ fsyncSync(fd);
+ },
+ };
+}
diff --git a/packages/journal-sink/tsconfig.json b/packages/journal-sink/tsconfig.json
new file mode 100644
index 0000000..ff99a43
--- /dev/null
+++ b/packages/journal-sink/tsconfig.json
@@ -0,0 +1,6 @@
+{
+ "extends": "../../tsconfig.base.json",
+ "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true },
+ "include": ["src/**/*.ts"],
+ "references": [{ "path": "../kernel" }]
+}
diff --git a/packages/kernel/src/bus/bus.test.ts b/packages/kernel/src/bus/bus.test.ts
index 7366b50..05cf875 100644
--- a/packages/kernel/src/bus/bus.test.ts
+++ b/packages/kernel/src/bus/bus.test.ts
@@ -1,6 +1,6 @@
import { beforeEach, describe, expect, it } from "vitest";
-import type { Logger } from "../contracts/extension.js";
import { defineEventHook, defineFilter, defineService } from "../contracts/hooks.js";
+import type { Logger, Span } from "../contracts/logging.js";
import { type Bus, createBus } from "./bus.js";
import { applyFilterChain, dispatchEventSync, sortFilters } from "./pure.js";
@@ -10,15 +10,30 @@ interface FakeLogger extends Logger {
function createFakeLogger(): FakeLogger {
const errors: Array<{ message: string; args: unknown[] }> = [];
- return {
+ const logger: FakeLogger = {
errors,
debug: () => {},
info: () => {},
warn: () => {},
- error: (message: string, ...args: unknown[]) => {
- errors.push({ message, args });
+ error: (message, attrs) => {
+ errors.push({ message, args: attrs === undefined ? [] : [attrs] });
},
+ child: () => logger,
+ span: () => makeNoopSpan(logger),
};
+ return logger;
+}
+
+function makeNoopSpan(log: Logger): Span {
+ const span: Span = {
+ id: "noop",
+ log,
+ setAttributes: () => {},
+ addLink: () => {},
+ child: () => span,
+ end: () => {},
+ };
+ return span;
}
describe("event hooks", () => {
diff --git a/packages/kernel/src/bus/pure.ts b/packages/kernel/src/bus/pure.ts
index 7cd9143..4d90fc6 100644
--- a/packages/kernel/src/bus/pure.ts
+++ b/packages/kernel/src/bus/pure.ts
@@ -12,11 +12,11 @@ export function dispatchEventSync<T>(
const result = handler(payload);
if (result instanceof Promise) {
result.catch((err: unknown) => {
- logger.error(`Event hook "${hookId}" handler rejected`, err);
+ logger.error(`Event hook "${hookId}" handler rejected`, { err });
});
}
} catch (err) {
- logger.error(`Event hook "${hookId}" handler threw`, err);
+ logger.error(`Event hook "${hookId}" handler threw`, { err });
}
}
}
@@ -32,7 +32,7 @@ export async function dispatchEventAsync<T>(
try {
await handler(payload);
} catch (err) {
- logger.error(`Event hook "${hookId}" handler threw`, err);
+ logger.error(`Event hook "${hookId}" handler threw`, { err });
}
});
@@ -76,7 +76,7 @@ export async function applyFilterChain<T>(
current = await fn(current);
} catch (err) {
if (failClosed) throw err;
- logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, err);
+ logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, { err });
}
}
return current;
diff --git a/packages/kernel/src/contracts/extension.ts b/packages/kernel/src/contracts/extension.ts
index 424f714..00b41f1 100644
--- a/packages/kernel/src/contracts/extension.ts
+++ b/packages/kernel/src/contracts/extension.ts
@@ -15,6 +15,10 @@ import type {
FilterHandler,
ServiceHandle,
} from "./hooks.js";
+import type { Logger } from "./logging.js";
+
+export type { Logger } from "./logging.js";
+
import type { ProviderContract } from "./provider.js";
import type { ToolContract } from "./tool.js";
@@ -132,15 +136,7 @@ export interface ScheduledJob {
readonly execute: () => void | Promise<void>;
}
-// --- Logger ---
-
-/** Logger interface available to every extension via the Host API. */
-export interface Logger {
- readonly debug: (message: string, ...args: unknown[]) => void;
- readonly info: (message: string, ...args: unknown[]) => void;
- readonly warn: (message: string, ...args: unknown[]) => void;
- readonly error: (message: string, ...args: unknown[]) => void;
-}
+// --- Logger is re-exported from logging.ts (structured, correlated) ---
// --- Config ---
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index 0b578cb..e4eba87 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -45,7 +45,6 @@ export type {
EventsEmitter,
Extension,
HostAPI,
- Logger,
Manifest,
ManifestCapabilities,
ManifestContributions,
@@ -57,7 +56,6 @@ export type {
StorageNamespace,
TrustLevel,
} from "./extension.js";
-
export type {
EventHandler,
EventHookDescriptor,
@@ -66,9 +64,25 @@ export type {
HookDescriptor,
ServiceHandle,
} from "./hooks.js";
-
export { defineEventHook, defineFilter, defineService } from "./hooks.js";
export type {
+ Attributes,
+ ErrorAttributes,
+ Level,
+ LogContext,
+ LogDeps,
+ Logger,
+ LogLineRecord,
+ LogRecord,
+ LogSink,
+ Span,
+ SpanCloseRecord,
+ SpanLink,
+ SpanOpenRecord,
+ SpanStatus,
+} from "./logging.js";
+export { createLogger } from "./logging.js";
+export type {
FinishEvent,
ProviderContract,
ProviderErrorEvent,
diff --git a/packages/kernel/src/contracts/logging.ts b/packages/kernel/src/contracts/logging.ts
new file mode 100644
index 0000000..8e1eef3
--- /dev/null
+++ b/packages/kernel/src/contracts/logging.ts
@@ -0,0 +1,461 @@
+/**
+ * Logging contract — structured, correlated, span-capable Logger/Span ABI.
+ *
+ * The kernel owns types + pure record-builders. NO I/O — the LogSink is
+ * injected by the host-bin. Logger/Span mint records and call sink.emit;
+ * sink errors are swallowed (D7 — the turn is sovereign).
+ *
+ * Key properties:
+ * - P3-safe: correlation flows via explicit child()/span() values, no ambient.
+ * - P2: record-builder is pure ({ now, newId } injected).
+ * - D3: spans emitted incrementally (open at span(), close at end()).
+ * - D6: extensionId auto-stamped by host, not caller-supplied.
+ * - Flat scalar attributes (serializable D3, queryable D9).
+ */
+
+// --- Levels ---
+
+export type Level = "debug" | "info" | "warn" | "error";
+
+// --- Attributes ---
+
+/** Flat, serializable key/value pairs. Caller stringifies nested objects. */
+export type Attributes = Readonly<Record<string, string | number | boolean | null>>;
+
+// --- LogContext (correlation) ---
+
+/** Correlation context carried on every log record and span. */
+export interface LogContext {
+ /** Auto-stamped by host from manifest.id (D6) — never caller-supplied. */
+ readonly extensionId: string;
+ readonly conversationId?: string;
+ readonly turnId?: string;
+ readonly spanId?: string;
+ readonly parentSpanId?: string;
+}
+
+// --- Span ---
+
+/**
+ * A timed unit of work within a trace. Opened via `logger.span(name)`,
+ * closed via `span.end()`. Emits incremental open/close records so a
+ * crashed turn is reconstructable from the journal (D3).
+ */
+export interface Span {
+ readonly id: string;
+ /** Pre-bound Logger scoped to this span's correlation. */
+ readonly log: Logger;
+ /** Add or overwrite attributes on this span. */
+ readonly setAttributes: (attrs: Attributes) => void;
+ /** Record a causal link to another span (D4 cross-feature causality). */
+ readonly addLink: (
+ target: { readonly spanId: string; readonly turnId?: string },
+ reason?: string,
+ ) => void;
+ /** Open a child span nested under this one. */
+ readonly child: (name: string, attrs?: Attributes) => Span;
+ /**
+ * Close this span. Records duration + status. Optionally records an
+ * error and/or additional attributes.
+ */
+ readonly end: (outcome?: { readonly err?: unknown; readonly attrs?: Attributes }) => void;
+}
+
+// --- Logger ---
+
+/**
+ * Structured, correlated logger. The host auto-scopes each extension's
+ * logger with its manifest.id as extensionId (D6).
+ *
+ * `info("msg")` must still compile — attrs is optional (backward compat).
+ */
+export interface Logger {
+ readonly debug: (msg: string, attrs?: Attributes) => void;
+ readonly info: (msg: string, attrs?: Attributes) => void;
+ readonly warn: (msg: string, attrs?: Attributes) => void;
+ readonly error: (msg: string, attrs?: ErrorAttributes) => void;
+ /**
+ * Create a child logger with additional correlation context.
+ * Explicit values passed down (P3 — no ambient state).
+ */
+ readonly child: (ctx: Partial<LogContext> & { readonly attrs?: Attributes }) => Logger;
+ /** Open a new span. Emits a `span-open` record immediately (D3). */
+ readonly span: (name: string, attrs?: Attributes) => Span;
+}
+
+/**
+ * Attributes for error log calls. The `err` field is `unknown` (not
+ * constrained to Attributes' scalar index signature) so callers can
+ * pass `error("msg", { err })` directly.
+ */
+export interface ErrorAttributes {
+ readonly err?: unknown;
+ readonly [key: string]: unknown;
+}
+
+// --- LogRecord (discriminated union) ---
+
+/**
+ * Status of a span. "ok" is the default when no error is recorded.
+ */
+export type SpanStatus = "ok" | "error";
+
+/**
+ * A link to another span, recorded at a handoff moment (D4).
+ */
+export interface SpanLink {
+ readonly spanId: string;
+ readonly turnId?: string;
+ readonly reason?: string;
+}
+
+/**
+ * Flat, JSON-serializable discriminated union. Spans are emitted
+ * incrementally (open at span(), close at end()) so a crashed turn is
+ * reconstructable from the journal (D3).
+ *
+ * Every variant carries correlation keys + timestamp. An optional `body`
+ * field (string) holds large verbatim payloads (store-fat-serve-thin).
+ */
+export type LogRecord = LogLineRecord | SpanOpenRecord | SpanCloseRecord;
+
+/** A structured log line (debug/info/warn/error). */
+export interface LogLineRecord {
+ readonly kind: "log";
+ readonly level: Level;
+ readonly msg: string;
+ readonly timestamp: number;
+ readonly extensionId: string;
+ readonly conversationId?: string;
+ readonly turnId?: string;
+ readonly spanId?: string;
+ readonly parentSpanId?: string;
+ readonly attributes?: Attributes;
+ /** Optional large verbatim payload (store-fat, serve-thin). */
+ readonly body?: string;
+}
+
+/** Emitted when a span is opened (at `logger.span(name)`). */
+export interface SpanOpenRecord {
+ readonly kind: "span-open";
+ readonly spanId: string;
+ readonly name: string;
+ readonly timestamp: number;
+ readonly extensionId: string;
+ readonly conversationId?: string;
+ readonly turnId?: string;
+ readonly parentSpanId?: string;
+ readonly attributes?: Attributes;
+ readonly links?: readonly SpanLink[];
+ readonly body?: string;
+}
+
+/** Emitted when a span is closed (at `span.end()`). Carries duration + status. */
+export interface SpanCloseRecord {
+ readonly kind: "span-close";
+ readonly spanId: string;
+ readonly name: string;
+ readonly timestamp: number;
+ readonly durationMs: number;
+ readonly status: SpanStatus;
+ readonly extensionId: string;
+ readonly conversationId?: string;
+ readonly turnId?: string;
+ readonly parentSpanId?: string;
+ readonly attributes?: Attributes;
+ readonly links?: readonly SpanLink[];
+ readonly body?: string;
+}
+
+// --- LogSink ---
+
+/**
+ * Fire-and-forget sink. The kernel calls emit(); the host-bin injects
+ * a concrete implementation. Kernel never lets sink errors escape (D7).
+ */
+export interface LogSink {
+ readonly emit: (record: LogRecord) => void;
+}
+
+// --- Deterministic helpers (injected for testability) ---
+
+/** Clock + id generator injected into the logger factory. */
+export interface LogDeps {
+ readonly now: () => number;
+ readonly newId: () => string;
+}
+
+// --- Pure record builder (no I/O) ---
+
+/**
+ * Internal state carried by a logger instance. Built by createLogger;
+ * never exposed outside this module.
+ */
+interface LoggerState {
+ readonly ctx: LogContext;
+ readonly attrs: Attributes | undefined;
+ readonly deps: LogDeps;
+ readonly sink: LogSink;
+}
+
+function mergeAttributes(
+ base: Attributes | undefined,
+ extra: Attributes | undefined,
+): Attributes | undefined {
+ if (base === undefined && extra === undefined) return undefined;
+ if (base === undefined) return extra;
+ if (extra === undefined) return base;
+ return { ...base, ...extra };
+}
+
+function isScalarAttr(value: unknown): value is string | number | boolean | null {
+ const t = typeof value;
+ return t === "string" || t === "number" || t === "boolean" || value === null;
+}
+
+function emitLog(state: LoggerState, level: Level, msg: string, attrs?: Attributes): void {
+ const merged = mergeAttributes(state.attrs, attrs);
+ const base = {
+ kind: "log" as const,
+ level,
+ msg,
+ timestamp: state.deps.now(),
+ extensionId: state.ctx.extensionId,
+ };
+ const record: LogLineRecord =
+ state.ctx.conversationId !== undefined ||
+ state.ctx.turnId !== undefined ||
+ state.ctx.spanId !== undefined ||
+ state.ctx.parentSpanId !== undefined ||
+ merged !== undefined
+ ? {
+ ...base,
+ ...(state.ctx.conversationId !== undefined
+ ? { conversationId: state.ctx.conversationId }
+ : {}),
+ ...(state.ctx.turnId !== undefined ? { turnId: state.ctx.turnId } : {}),
+ ...(state.ctx.spanId !== undefined ? { spanId: state.ctx.spanId } : {}),
+ ...(state.ctx.parentSpanId !== undefined ? { parentSpanId: state.ctx.parentSpanId } : {}),
+ ...(merged !== undefined ? { attributes: merged } : {}),
+ }
+ : base;
+ try {
+ state.sink.emit(record);
+ } catch {
+ // Swallow — D7: the turn is sovereign (never break the caller).
+ }
+}
+
+function buildSpanOpen(
+ state: LoggerState,
+ name: string,
+ spanId: string,
+ attrs?: Attributes,
+): SpanOpenRecord {
+ const base = {
+ kind: "span-open" as const,
+ spanId,
+ name,
+ timestamp: state.deps.now(),
+ extensionId: state.ctx.extensionId,
+ };
+ const merged = mergeAttributes(state.attrs, attrs);
+ return {
+ ...base,
+ ...(state.ctx.conversationId !== undefined ? { conversationId: state.ctx.conversationId } : {}),
+ ...(state.ctx.turnId !== undefined ? { turnId: state.ctx.turnId } : {}),
+ ...(state.ctx.parentSpanId !== undefined ? { parentSpanId: state.ctx.parentSpanId } : {}),
+ ...(merged !== undefined ? { attributes: merged } : {}),
+ };
+}
+
+function buildSpanLink(
+ target: { readonly spanId: string; readonly turnId?: string },
+ reason?: string,
+): SpanLink {
+ return {
+ spanId: target.spanId,
+ ...(target.turnId !== undefined ? { turnId: target.turnId } : {}),
+ ...(reason !== undefined ? { reason } : {}),
+ };
+}
+
+/**
+ * Create a structured Logger. Pure factory — all I/O goes through the
+ * injected sink. `{ now, newId }` are injected for deterministic tests.
+ *
+ * @param ctx Initial correlation context (extensionId + optional ids).
+ * @param sink Fire-and-forget record sink.
+ * @param deps Clock + id generator.
+ * @param attrs Optional default attributes (from child()).
+ */
+export function createLogger(
+ ctx: LogContext,
+ sink: LogSink,
+ deps: LogDeps,
+ attrs?: Attributes,
+): Logger {
+ const state: LoggerState = { ctx, attrs, deps, sink };
+
+ function makeSpan(name: string, spanAttrs?: Attributes, parentSpanId?: string): Span {
+ const spanId = deps.newId();
+ const mergedParent = parentSpanId ?? state.ctx.spanId;
+ const spanCtx: LogContext = {
+ extensionId: ctx.extensionId,
+ ...(ctx.conversationId !== undefined ? { conversationId: ctx.conversationId } : {}),
+ ...(ctx.turnId !== undefined ? { turnId: ctx.turnId } : {}),
+ spanId,
+ ...(mergedParent !== undefined ? { parentSpanId: mergedParent } : {}),
+ };
+
+ const openRecord = buildSpanOpen(state, name, spanId, spanAttrs);
+ const spanAttrsMutable: Record<string, string | number | boolean | null> =
+ spanAttrs !== undefined ? { ...spanAttrs } : {};
+ const links: SpanLink[] = [];
+ const openedAt = deps.now();
+
+ try {
+ sink.emit(openRecord);
+ } catch {
+ // Swallow — D7.
+ }
+
+ const spanLogger = createLogger(spanCtx, sink, deps, state.attrs);
+
+ const span: Span = {
+ id: spanId,
+ log: spanLogger,
+ setAttributes(newAttrs: Attributes): void {
+ for (const [key, value] of Object.entries(newAttrs)) {
+ spanAttrsMutable[key] = value;
+ }
+ },
+ addLink(target, reason): void {
+ links.push(buildSpanLink(target, reason));
+ },
+ child(childName: string, childAttrs?: Attributes): Span {
+ return makeSpan(childName, childAttrs, spanId);
+ },
+ end(outcome?): void {
+ const closedAt = deps.now();
+ const err = outcome?.err;
+ let status: SpanStatus = "ok";
+ if (err !== undefined && err !== null) {
+ status = "error";
+ const errMsg = err instanceof Error ? err.message : String(err);
+ spanAttrsMutable["error.message"] = errMsg;
+ if (err instanceof Error && err.stack !== undefined) {
+ spanAttrsMutable["error.stack"] = err.stack;
+ }
+ }
+ if (outcome?.attrs !== undefined) {
+ for (const [key, value] of Object.entries(outcome.attrs)) {
+ spanAttrsMutable[key] = value;
+ }
+ }
+
+ const hasAttrs = Object.keys(spanAttrsMutable).length > 0;
+ const hasLinks = links.length > 0;
+ const base = {
+ kind: "span-close" as const,
+ spanId,
+ name,
+ timestamp: closedAt,
+ durationMs: closedAt - openedAt,
+ status,
+ extensionId: ctx.extensionId,
+ };
+ const closeRecord: SpanCloseRecord = {
+ ...base,
+ ...(ctx.conversationId !== undefined ? { conversationId: ctx.conversationId } : {}),
+ ...(ctx.turnId !== undefined ? { turnId: ctx.turnId } : {}),
+ ...(mergedParent !== undefined ? { parentSpanId: mergedParent } : {}),
+ ...(hasAttrs ? { attributes: { ...spanAttrsMutable } } : {}),
+ ...(hasLinks ? { links: [...links] } : {}),
+ };
+ try {
+ sink.emit(closeRecord);
+ } catch {
+ // Swallow — D7.
+ }
+ },
+ };
+
+ return span;
+ }
+
+ const logger: Logger = {
+ debug(msg: string, attrs?: Attributes): void {
+ emitLog(state, "debug", msg, attrs);
+ },
+ info(msg: string, attrs?: Attributes): void {
+ emitLog(state, "info", msg, attrs);
+ },
+ warn(msg: string, attrs?: Attributes): void {
+ emitLog(state, "warn", msg, attrs);
+ },
+ error(msg: string, attrs?: ErrorAttributes): void {
+ const err = attrs?.err;
+ if (err !== undefined && err !== null) {
+ // Extract scalar attributes (everything except err).
+ const scalarAttrs: Record<string, string | number | boolean | null> = {};
+ if (attrs !== undefined) {
+ for (const [key, value] of Object.entries(attrs)) {
+ if (key !== "err" && isScalarAttr(value)) {
+ scalarAttrs[key] = value;
+ }
+ }
+ }
+ const merged = mergeAttributes(
+ state.attrs,
+ Object.keys(scalarAttrs).length > 0 ? scalarAttrs : undefined,
+ );
+ const errMsg = err instanceof Error ? err.message : String(err);
+ const errorAttrs: Record<string, string | number | boolean | null> = {
+ ...(merged ?? {}),
+ "error.message": errMsg,
+ };
+ if (err instanceof Error && err.stack !== undefined) {
+ errorAttrs["error.stack"] = err.stack;
+ }
+ emitLog(state, "error", msg, errorAttrs as Attributes);
+ } else {
+ // No err field — filter to scalar attributes only.
+ const scalarAttrs: Record<string, string | number | boolean | null> = {};
+ if (attrs !== undefined) {
+ for (const [key, value] of Object.entries(attrs)) {
+ if (isScalarAttr(value)) {
+ scalarAttrs[key] = value;
+ }
+ }
+ }
+ emitLog(
+ state,
+ "error",
+ msg,
+ Object.keys(scalarAttrs).length > 0 ? (scalarAttrs as Attributes) : undefined,
+ );
+ }
+ },
+ child(childCtx: Partial<LogContext> & { readonly attrs?: Attributes }): Logger {
+ const convId = childCtx.conversationId ?? ctx.conversationId;
+ const tId = childCtx.turnId ?? ctx.turnId;
+ const sId = childCtx.spanId ?? ctx.spanId;
+ const pId = childCtx.parentSpanId ?? ctx.parentSpanId;
+ const newCtx: LogContext = {
+ extensionId: ctx.extensionId,
+ ...(convId !== undefined ? { conversationId: convId } : {}),
+ ...(tId !== undefined ? { turnId: tId } : {}),
+ ...(sId !== undefined ? { spanId: sId } : {}),
+ ...(pId !== undefined ? { parentSpanId: pId } : {}),
+ };
+ const newAttrs = mergeAttributes(state.attrs, childCtx.attrs);
+ return createLogger(newCtx, sink, deps, newAttrs);
+ },
+ span(name: string, attrs?: Attributes): Span {
+ return makeSpan(name, attrs);
+ },
+ };
+
+ return logger;
+}
diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts
index 68c0444..1e8f14f 100644
--- a/packages/kernel/src/contracts/runtime.ts
+++ b/packages/kernel/src/contracts/runtime.ts
@@ -10,6 +10,7 @@
import type { ChatMessage } from "./conversation.js";
import type { ToolDispatchPolicy } from "./dispatch.js";
import type { AgentEvent } from "./events.js";
+import type { Logger } from "./logging.js";
import type { ProviderContract, ProviderStreamOptions, Usage } from "./provider.js";
import type { ToolContract } from "./tool.js";
@@ -73,6 +74,13 @@ export interface RunTurnInput {
/** Cancellation signal for the entire turn. */
readonly signal?: AbortSignal;
+
+ /**
+ * Optional logger for structured span instrumentation. The runtime opens
+ * turn/step/tool-call spans using this logger. If omitted, no spans are
+ * emitted (backward-compatible with callers that don't yet pass a logger).
+ */
+ readonly logger?: Logger;
}
/**
diff --git a/packages/kernel/src/contracts/tool.ts b/packages/kernel/src/contracts/tool.ts
index f74ce77..0699d05 100644
--- a/packages/kernel/src/contracts/tool.ts
+++ b/packages/kernel/src/contracts/tool.ts
@@ -7,6 +7,8 @@
* Extensions may use zod internally and convert to this shape.
*/
+import type { Logger } from "./logging.js";
+
/**
* Structural JSON Schema subset for tool parameter declarations.
* The kernel does not validate against this — the provider serializes it for
@@ -53,6 +55,13 @@ export interface ToolExecuteContext {
* can clean up rather than leak.
*/
readonly signal: AbortSignal;
+
+ /**
+ * Pre-bound Logger scoped to this tool-call span. Tools log correlated
+ * without a global (P3). The kernel stamps extensionId, conversationId,
+ * turnId, and spanId automatically.
+ */
+ readonly log: Logger;
}
/**
diff --git a/packages/kernel/src/host/host.test.ts b/packages/kernel/src/host/host.test.ts
index 11c2356..7688366 100644
--- a/packages/kernel/src/host/host.test.ts
+++ b/packages/kernel/src/host/host.test.ts
@@ -6,7 +6,6 @@ import type {
EventsEmitter,
Extension,
HostAPI,
- Logger,
Manifest,
ManifestContributions,
PermissionDecision,
@@ -17,33 +16,92 @@ import type {
StorageNamespace,
} from "../contracts/extension.js";
import { defineEventHook, defineService } from "../contracts/hooks.js";
+import type {
+ Attributes,
+ ErrorAttributes,
+ LogDeps,
+ Logger,
+ LogRecord,
+ LogSink,
+} from "../contracts/logging.js";
import type { ProviderContract } from "../contracts/provider.js";
import type { ToolContract } from "../contracts/tool.js";
import { createHost, type HostDeps, KERNEL_API_VERSION } from "./host.js";
interface FakeLogger extends Logger {
- readonly logs: Array<{ level: string; message: string; args: unknown[] }>;
+ readonly logs: Array<{ level: string; message: string; attrs?: Attributes | ErrorAttributes }>;
}
function createFakeLogger(): FakeLogger {
- const logs: Array<{ level: string; message: string; args: unknown[] }> = [];
+ const logs: Array<{ level: string; message: string; attrs?: Attributes | ErrorAttributes }> = [];
return {
logs,
- debug: (message: string, ...args: unknown[]) => {
- logs.push({ level: "debug", message, args });
+ debug: (message: string, attrs?: Attributes) => {
+ if (attrs !== undefined) {
+ logs.push({ level: "debug", message, attrs });
+ } else {
+ logs.push({ level: "debug", message });
+ }
+ },
+ info: (message: string, attrs?: Attributes) => {
+ if (attrs !== undefined) {
+ logs.push({ level: "info", message, attrs });
+ } else {
+ logs.push({ level: "info", message });
+ }
+ },
+ warn: (message: string, attrs?: Attributes) => {
+ if (attrs !== undefined) {
+ logs.push({ level: "warn", message, attrs });
+ } else {
+ logs.push({ level: "warn", message });
+ }
},
- info: (message: string, ...args: unknown[]) => {
- logs.push({ level: "info", message, args });
+ error: (message: string, attrs?: ErrorAttributes) => {
+ if (attrs !== undefined) {
+ logs.push({ level: "error", message, attrs });
+ } else {
+ logs.push({ level: "error", message });
+ }
},
- warn: (message: string, ...args: unknown[]) => {
- logs.push({ level: "warn", message, args });
+ child(
+ _ctx: Partial<import("../contracts/logging.js").LogContext> & { readonly attrs?: Attributes },
+ ): Logger {
+ return createFakeLogger();
},
- error: (message: string, ...args: unknown[]) => {
- logs.push({ level: "error", message, args });
+ span(_name: string, _attrs?: Attributes): import("../contracts/logging.js").Span {
+ return {
+ id: "fake-span",
+ log: createFakeLogger(),
+ setAttributes() {},
+ addLink() {},
+ child() {
+ return this;
+ },
+ end() {},
+ };
},
};
}
+function createFakeLogSink(): LogSink & { readonly records: LogRecord[] } {
+ const records: LogRecord[] = [];
+ return {
+ records,
+ emit: (record: LogRecord) => {
+ records.push(record);
+ },
+ };
+}
+
+function createFakeLogDeps(): LogDeps {
+ let idCounter = 0;
+ return {
+ now: () => 1000 + idCounter * 100,
+ newId: () => `span-${++idCounter}`,
+ };
+}
+
function createFakeConfig(): ConfigAccess {
return {
get: () => undefined,
@@ -176,12 +234,16 @@ function createFakeAuth(id: string): AuthContract {
describe("createHost", () => {
let logger: FakeLogger;
+ let logSink: ReturnType<typeof createFakeLogSink>;
+ let logDeps: LogDeps;
let deps: HostDeps;
let scheduler: ReturnType<typeof createFakeScheduler>;
let events: ReturnType<typeof createFakeEvents>;
beforeEach(() => {
logger = createFakeLogger();
+ logSink = createFakeLogSink();
+ logDeps = createFakeLogDeps();
scheduler = createFakeScheduler();
events = createFakeEvents();
deps = {
@@ -193,6 +255,8 @@ describe("createHost", () => {
scheduler,
bus: createBus(logger),
events,
+ logSink,
+ logDeps,
};
});
@@ -529,6 +593,7 @@ describe("createHost", () => {
expect(order).toEqual(["deactivate-c", "deactivate-a"]);
const errors = logger.logs.filter((l) => l.level === "error");
expect(errors.some((e) => e.message.includes("deactivate"))).toBe(true);
+ expect(errors.some((e) => (e.attrs as { err?: unknown })?.err instanceof Error)).toBe(true);
});
});
@@ -750,4 +815,146 @@ describe("createHost", () => {
);
});
});
+
+ describe("auto-scoped logger (D6)", () => {
+ it("each extension's logger stamps its own manifest.id as extensionId", async () => {
+ let extALogger: Logger | undefined;
+ let extBLogger: Logger | undefined;
+
+ const a = createExtension("ext-a", {
+ activate: (host) => {
+ extALogger = host.logger;
+ },
+ });
+ const b = createExtension("ext-b", {
+ activate: (host) => {
+ extBLogger = host.logger;
+ },
+ });
+
+ const host = createHost([a, b], deps);
+ await host.activate();
+
+ extALogger?.info("from-a");
+ extBLogger?.info("from-b");
+
+ const logRecords = logSink.records.filter((r) => r.kind === "log");
+ expect(logRecords).toHaveLength(2);
+ if (logRecords[0]?.kind === "log") {
+ expect(logRecords[0].extensionId).toBe("ext-a");
+ expect(logRecords[0].msg).toBe("from-a");
+ }
+ if (logRecords[1]?.kind === "log") {
+ expect(logRecords[1].extensionId).toBe("ext-b");
+ expect(logRecords[1].msg).toBe("from-b");
+ }
+ });
+
+ it("an extension cannot spoof extensionId — it is auto-stamped", async () => {
+ let extLogger: Logger | undefined;
+
+ const ext = createExtension("real-id", {
+ activate: (host) => {
+ extLogger = host.logger;
+ },
+ });
+
+ const host = createHost([ext], deps);
+ await host.activate();
+
+ // child() cannot override extensionId
+ const child = extLogger?.child({ extensionId: "spoofed" });
+ child?.info("msg");
+
+ const logRecords = logSink.records.filter((r) => r.kind === "log");
+ expect(logRecords).toHaveLength(1);
+ if (logRecords[0]?.kind === "log") {
+ expect(logRecords[0].extensionId).toBe("real-id");
+ }
+ });
+
+ it("host.logger.error uses structured { err } shape", async () => {
+ let extLogger: Logger | undefined;
+
+ const ext = createExtension("ext", {
+ activate: (host) => {
+ extLogger = host.logger;
+ },
+ });
+
+ const host = createHost([ext], deps);
+ await host.activate();
+
+ extLogger?.error("something broke", { err: new Error("boom") });
+
+ const logRecords = logSink.records.filter((r) => r.kind === "log");
+ expect(logRecords).toHaveLength(1);
+ if (logRecords[0]?.kind === "log") {
+ expect(logRecords[0].level).toBe("error");
+ expect(logRecords[0].msg).toBe("something broke");
+ expect(logRecords[0].attributes?.["error.message"]).toBe("boom");
+ }
+ });
+
+ it("a throwing sink does NOT break the caller", async () => {
+ const brokenSink: LogSink = {
+ emit() {
+ throw new Error("sink down");
+ },
+ };
+ const brokenDeps: HostDeps = {
+ ...deps,
+ logSink: brokenSink,
+ };
+
+ let extLogger: Logger | undefined;
+ const ext = createExtension("ext", {
+ activate: (host) => {
+ extLogger = host.logger;
+ },
+ });
+
+ const host = createHost([ext], brokenDeps);
+ await host.activate();
+
+ // Should not throw
+ expect(() => extLogger?.info("msg")).not.toThrow();
+ });
+
+ it("span() + end() emit incremental span-open and span-close records", async () => {
+ let extLogger: Logger | undefined;
+
+ const ext = createExtension("ext", {
+ activate: (host) => {
+ extLogger = host.logger;
+ },
+ });
+
+ const host = createHost([ext], deps);
+ await host.activate();
+
+ const span = extLogger?.span("my-span", { key: "value" });
+ span?.setAttributes({ extra: "attr" });
+ span?.end({ attrs: { result: "ok" } });
+
+ const spanOpens = logSink.records.filter((r) => r.kind === "span-open");
+ const spanCloses = logSink.records.filter((r) => r.kind === "span-close");
+
+ expect(spanOpens).toHaveLength(1);
+ expect(spanCloses).toHaveLength(1);
+
+ if (spanOpens[0]?.kind === "span-open") {
+ expect(spanOpens[0].name).toBe("my-span");
+ expect(spanOpens[0].extensionId).toBe("ext");
+ expect(spanOpens[0].attributes?.key).toBe("value");
+ }
+ if (spanCloses[0]?.kind === "span-close") {
+ expect(spanCloses[0].name).toBe("my-span");
+ expect(spanCloses[0].status).toBe("ok");
+ expect(spanCloses[0].durationMs).toBeGreaterThanOrEqual(0);
+ expect(spanCloses[0].attributes?.extra).toBe("attr");
+ expect(spanCloses[0].attributes?.result).toBe("ok");
+ }
+ });
+ });
});
diff --git a/packages/kernel/src/host/host.ts b/packages/kernel/src/host/host.ts
index dd61f9f..c7ec7a9 100644
--- a/packages/kernel/src/host/host.ts
+++ b/packages/kernel/src/host/host.ts
@@ -5,7 +5,6 @@ import type {
EventsEmitter,
Extension,
HostAPI,
- Logger,
Manifest,
PermissionGate,
ScheduledJob,
@@ -19,6 +18,8 @@ import type {
FilterHandler,
ServiceHandle,
} from "../contracts/hooks.js";
+import type { LogDeps, Logger, LogSink } from "../contracts/logging.js";
+import { createLogger } from "../contracts/logging.js";
import type { ProviderContract } from "../contracts/provider.js";
import type { ToolContract } from "../contracts/tool.js";
import { resolveActivationOrder } from "./dag.js";
@@ -40,6 +41,8 @@ export interface HostDeps {
readonly scheduler: { readonly register: (job: ScheduledJob) => void };
readonly bus: Bus;
readonly events: EventsEmitter;
+ readonly logSink: LogSink;
+ readonly logDeps: LogDeps;
}
export interface Host {
@@ -96,8 +99,12 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho
}
}
- function buildHostAPI(opts?: { readonly registrationClosed?: boolean }): HostAPI {
+ function buildHostAPI(
+ extensionId: string,
+ opts?: { readonly registrationClosed?: boolean },
+ ): HostAPI {
const closed = opts?.registrationClosed ?? false;
+ const extLogger = createLogger({ extensionId }, deps.logSink, deps.logDeps);
return {
defineTool(tool: ToolContract) {
if (closed) throw new Error("Registration not available after activation");
@@ -130,7 +137,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho
secrets: deps.secrets,
permissions: deps.permissions,
events: deps.events,
- logger: deps.logger,
+ logger: extLogger,
getProviders() {
return providers;
},
@@ -156,7 +163,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho
async activate() {
for (const ext of compatible) {
try {
- await ext.activate(buildHostAPI());
+ await ext.activate(buildHostAPI(ext.manifest.id));
activated.push(ext);
deps.logger.info(`Extension "${ext.manifest.id}" activated`);
} catch (err) {
@@ -164,7 +171,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho
manifest: ext.manifest,
reason: `Activation failed: ${err instanceof Error ? err.message : String(err)}`,
});
- deps.logger.error(`Extension "${ext.manifest.id}" failed to activate`, err);
+ deps.logger.error(`Extension "${ext.manifest.id}" failed to activate`, { err });
}
}
},
@@ -175,7 +182,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho
try {
await ext.deactivate();
} catch (err) {
- deps.logger.error(`Extension "${ext.manifest.id}" failed to deactivate`, err);
+ deps.logger.error(`Extension "${ext.manifest.id}" failed to deactivate`, { err });
}
}
},
@@ -207,7 +214,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho
return disabled;
},
getHostAPI() {
- return buildHostAPI({ registrationClosed: true });
+ return buildHostAPI("__host__", { registrationClosed: true });
},
};
}
diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts
index 626b333..1ba0849 100644
--- a/packages/kernel/src/runtime/dispatch.ts
+++ b/packages/kernel/src/runtime/dispatch.ts
@@ -1,4 +1,5 @@
import type { ToolDispatchPolicy } from "../contracts/dispatch.js";
+import type { Logger, Span } from "../contracts/logging.js";
import type { EventEmitter } from "../contracts/runtime.js";
import type { ToolCall, ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js";
import { toolOutputEvent } from "./events.js";
@@ -15,6 +16,7 @@ export async function executeToolCall(
emit: EventEmitter,
conversationId: string,
turnId: string,
+ toolSpan?: Span,
): Promise<ToolResult> {
if (tool === undefined) {
return { content: `Unknown tool: ${call.name}`, isError: true };
@@ -28,6 +30,7 @@ export async function executeToolCall(
onOutput: (data, stream) => {
emit(toolOutputEvent(conversationId, turnId, call.id, data, stream));
},
+ log: toolSpan?.log ?? createNoopLogger(),
};
try {
return await tool.execute(call.input, ctx);
@@ -50,6 +53,7 @@ export function createStepDispatcher(
emit: EventEmitter,
conversationId: string,
turnId: string,
+ toolSpans: Map<string, Span>,
): StepDispatcher {
let activeCount = 0;
let unsafeRunning = false;
@@ -78,6 +82,7 @@ export function createStepDispatcher(
}
async function runAndResolve(entry: QueueEntry): Promise<void> {
+ const tcSpan = toolSpans.get(entry.call.id);
const result = await executeToolCall(
entry.call,
entry.tool,
@@ -85,6 +90,7 @@ export function createStepDispatcher(
emit,
conversationId,
turnId,
+ tcSpan,
);
activeCount--;
if (entry.tool?.concurrencySafe === false) unsafeRunning = false;
@@ -129,3 +135,27 @@ export function createStepDispatcher(
return { submit, drain };
}
+
+function createNoopLogger(): Logger {
+ return {
+ debug() {},
+ info() {},
+ warn() {},
+ error() {},
+ child() {
+ return createNoopLogger();
+ },
+ span() {
+ return {
+ id: "noop",
+ log: createNoopLogger(),
+ setAttributes() {},
+ addLink() {},
+ child() {
+ return this;
+ },
+ end() {},
+ };
+ },
+ };
+}
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 696a385..48f0b5a 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -1,6 +1,8 @@
import { describe, expect, it } from "vitest";
import type { ChatMessage } from "../contracts/conversation.js";
import type { AgentEvent } from "../contracts/events.js";
+import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.js";
+import { createLogger } from "../contracts/logging.js";
import type { ProviderContract, ProviderEvent } from "../contracts/provider.js";
import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js";
import { runTurn } from "./run-turn.js";
@@ -814,4 +816,228 @@ describe("runTurn", () => {
expect(outputs[1]?.stream).toBe("stderr");
}
});
+
+ describe("span instrumentation", () => {
+ function createTestLogger(): {
+ logger: Logger;
+ sink: LogSink & { records: LogRecord[] };
+ deps: LogDeps;
+ } {
+ let idCounter = 0;
+ const deps: LogDeps = {
+ now: () => 1000 + idCounter * 100,
+ newId: () => `span-${++idCounter}`,
+ };
+ const records: LogRecord[] = [];
+ const sink: LogSink & { records: LogRecord[] } = {
+ records,
+ emit: (record) => records.push(record),
+ };
+ const logger = createLogger({ extensionId: "test" }, sink, deps);
+ return { logger, sink, deps };
+ }
+
+ it("emits turn + step span open/close in order", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const spanOpens = sink.records.filter((r) => r.kind === "span-open");
+ const spanCloses = sink.records.filter((r) => r.kind === "span-close");
+
+ expect(spanOpens.length).toBeGreaterThanOrEqual(2); // turn + step
+ expect(spanCloses.length).toBeGreaterThanOrEqual(2);
+
+ const turnOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "turn");
+ const stepOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "step");
+ expect(turnOpen).toBeDefined();
+ expect(stepOpen).toBeDefined();
+
+ if (turnOpen?.kind === "span-open") {
+ expect(turnOpen.extensionId).toBe("test");
+ expect(turnOpen.attributes?.conversationId).toBe("conv-1");
+ expect(turnOpen.attributes?.turnId).toBe("turn-1");
+ }
+
+ const turnClose = spanCloses.find((r) => r.kind === "span-close" && r.name === "turn");
+ expect(turnClose).toBeDefined();
+ if (turnClose?.kind === "span-close") {
+ expect(turnClose.status).toBe("ok");
+ expect(turnClose.durationMs).toBeGreaterThanOrEqual(0);
+ }
+ });
+
+ it("emits tool-call spans for dispatched tools", async () => {
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const toolCallSpans = sink.records.filter(
+ (r) => r.kind === "span-open" && r.name === "tool-call",
+ );
+ expect(toolCallSpans).toHaveLength(1);
+ if (toolCallSpans[0]?.kind === "span-open") {
+ expect(toolCallSpans[0].attributes?.name).toBe("echo");
+ expect(toolCallSpans[0].attributes?.toolCallId).toBe("tc1");
+ }
+
+ const toolCallCloses = sink.records.filter(
+ (r) => r.kind === "span-close" && r.name === "tool-call",
+ );
+ expect(toolCallCloses).toHaveLength(1);
+ if (toolCallCloses[0]?.kind === "span-close") {
+ expect(toolCallCloses[0].status).toBe("ok");
+ }
+ });
+
+ it("tools receive ctx.log (correlated logger)", async () => {
+ let capturedLog: Logger | undefined;
+
+ const tool = createFakeTool("logtest", async (_input, ctx) => {
+ capturedLog = ctx.log;
+ ctx.log.info("tool ran", { key: "value" });
+ return { content: "ok" };
+ });
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "logtest", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ expect(capturedLog).toBeDefined();
+
+ const toolLogs = sink.records.filter(
+ (r) => r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "tool ran",
+ );
+ expect(toolLogs).toHaveLength(1);
+ if (toolLogs[0]?.kind === "log") {
+ expect(toolLogs[0].attributes?.key).toBe("value");
+ expect(toolLogs[0].extensionId).toBe("test");
+ }
+ });
+
+ it("an aborted turn still closes its turn span", async () => {
+ const ac = new AbortController();
+ ac.abort();
+
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "should not appear" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ signal: ac.signal,
+ logger,
+ });
+
+ const turnCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "turn");
+ expect(turnCloses).toHaveLength(1);
+ if (turnCloses[0]?.kind === "span-close") {
+ expect(turnCloses[0].attributes?.finishReason).toBe("aborted");
+ }
+ });
+
+ it("a provider error closes the step span with error status", async () => {
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ throw new Error("provider exploded");
+ })();
+ },
+ };
+
+ const { logger, sink } = createTestLogger();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ expect(result.finishReason).toBe("error");
+
+ const stepCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "step");
+ expect(stepCloses).toHaveLength(1);
+ if (stepCloses[0]?.kind === "span-close") {
+ expect(stepCloses[0].status).toBe("error");
+ expect(stepCloses[0].attributes?.["error.message"]).toContain("provider exploded");
+ }
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 9421d86..0f42ef3 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -1,4 +1,5 @@
import type { ChatMessage, Chunk } from "../contracts/conversation.js";
+import type { Logger, Span } from "../contracts/logging.js";
import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js";
import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js";
import type { ToolCall, ToolContract } from "../contracts/tool.js";
@@ -76,6 +77,8 @@ interface StepContext {
readonly signal: AbortSignal;
readonly conversationId: string;
readonly turnId: string;
+ readonly logger: Logger;
+ readonly toolSpans: Map<string, Span>;
}
interface StepResult {
@@ -124,6 +127,18 @@ function processEvent(
event.input,
),
);
+
+ // Open a tool-call span (attrs: name, toolCallId)
+ try {
+ const tcSpan = ctx.logger.span("tool-call", {
+ name: event.toolName,
+ toolCallId: event.toolCallId,
+ });
+ ctx.toolSpans.set(event.toolCallId, tcSpan);
+ } catch {
+ // Swallow — D7: logging never breaks the turn.
+ }
+
if (ctx.dispatch.eager) {
dispatcher.submit(call);
}
@@ -151,6 +166,26 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
let stepUsage = zeroUsage();
let finishReason = "stop";
+ // Open a step span with the verbatim pre-mutation prompt in its body (BEFORE capture).
+ let stepSpan: Span | undefined;
+ try {
+ stepSpan = ctx.logger.span("step");
+ // Emit the verbatim pre-mutation prompt as a log record on the step span's logger.
+ // This is the "BEFORE" capture — the messages + tools as handed to provider.stream.
+ stepSpan.log.info("prompt:before", {
+ "prompt.messages": JSON.stringify(ctx.messages),
+ "prompt.tools": JSON.stringify(
+ ctx.tools.map((t) => ({
+ name: t.name,
+ description: t.description,
+ parameters: t.parameters,
+ })),
+ ),
+ });
+ } catch {
+ // Swallow — D7.
+ }
+
const dispatcher = createStepDispatcher(
ctx.toolMap,
ctx.dispatch,
@@ -158,6 +193,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
ctx.emit,
ctx.conversationId,
ctx.turnId,
+ ctx.toolSpans,
);
try {
@@ -177,6 +213,13 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
chunks.push({ type: "error", message });
ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message));
finishReason = "error";
+ // Close step span with error
+ try {
+ stepSpan?.end({ err });
+ } catch {
+ // Swallow — D7.
+ }
+ stepSpan = undefined;
}
if (!ctx.dispatch.eager) {
@@ -187,6 +230,25 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const results = await dispatcher.drain();
+ // Close remaining tool-call spans
+ for (const call of toolCalls) {
+ const tcSpan = ctx.toolSpans.get(call.id);
+ if (tcSpan !== undefined) {
+ const result = results.get(call.id);
+ try {
+ tcSpan.end({
+ attrs: {
+ isError: result?.isError ?? false,
+ contentLength: result?.content.length ?? 0,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
+ ctx.toolSpans.delete(call.id);
+ }
+ }
+
const toolMessages: ChatMessage[] = [];
for (const call of toolCalls) {
const result = results.get(call.id);
@@ -217,6 +279,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
}
}
+ // Close step span (if not already closed by error)
+ if (stepSpan !== undefined) {
+ try {
+ stepSpan.end({
+ attrs: {
+ finishReason,
+ usage_inputTokens: stepUsage.inputTokens,
+ usage_outputTokens: stepUsage.outputTokens,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
+ }
+
const assistantMessage: ChatMessage | undefined =
chunks.length > 0 ? { role: "assistant", chunks } : undefined;
@@ -237,51 +314,122 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
const conversationId = input.conversationId;
const turnId = input.turnId;
const signal = input.signal ?? new AbortController().signal;
+ const logger = input.logger;
- for (let step = 0; step < MAX_STEPS; step++) {
- if (signal.aborted) {
- finishReason = "aborted";
- break;
+ // Open a turn span (attrs: conversationId, turnId, model)
+ let turnSpan: Span | undefined;
+ if (logger !== undefined) {
+ try {
+ turnSpan = logger.span("turn", {
+ conversationId,
+ turnId,
+ model: input.providerOpts?.model ?? input.provider.id,
+ });
+ } catch {
+ // Swallow — D7.
}
+ }
- const stepResult = await executeStep({
- provider: input.provider,
- messages,
- tools: input.tools,
- toolMap,
- dispatch: input.dispatch,
- emit: input.emit,
- signal,
- conversationId,
- turnId,
- });
+ // Track open tool-call spans across steps so we can close them on abort
+ const toolSpans = new Map<string, Span>();
+
+ try {
+ for (let step = 0; step < MAX_STEPS; step++) {
+ if (signal.aborted) {
+ finishReason = "aborted";
+ break;
+ }
- totalUsage = addUsage(totalUsage, stepResult.usage);
+ const stepResult = await executeStep({
+ provider: input.provider,
+ messages,
+ tools: input.tools,
+ toolMap,
+ dispatch: input.dispatch,
+ emit: input.emit,
+ signal,
+ conversationId,
+ turnId,
+ logger: turnSpan?.log ?? logger ?? createNoopLogger(),
+ toolSpans,
+ });
- if (stepResult.assistantMessage !== undefined) {
- messages.push(stepResult.assistantMessage);
- resultMessages.push(stepResult.assistantMessage);
- }
+ totalUsage = addUsage(totalUsage, stepResult.usage);
- for (const msg of stepResult.toolMessages) {
- messages.push(msg);
- resultMessages.push(msg);
- }
+ if (stepResult.assistantMessage !== undefined) {
+ messages.push(stepResult.assistantMessage);
+ resultMessages.push(stepResult.assistantMessage);
+ }
- if (signal.aborted) {
- finishReason = "aborted";
- break;
- }
+ for (const msg of stepResult.toolMessages) {
+ messages.push(msg);
+ resultMessages.push(msg);
+ }
- if (stepResult.toolCalls.length === 0) {
- finishReason = stepResult.finishReason;
- break;
+ if (signal.aborted) {
+ finishReason = "aborted";
+ break;
+ }
+
+ if (stepResult.toolCalls.length === 0) {
+ finishReason = stepResult.finishReason;
+ break;
+ }
+
+ if (step === MAX_STEPS - 1) {
+ finishReason = "max-steps";
+ }
+ }
+ } finally {
+ // Close any orphaned tool-call spans (e.g. abort mid-tool)
+ for (const [id, tcSpan] of toolSpans) {
+ try {
+ tcSpan.end({ attrs: { orphaned: true } });
+ } catch {
+ // Swallow — D7.
+ }
+ toolSpans.delete(id);
}
- if (step === MAX_STEPS - 1) {
- finishReason = "max-steps";
+ // Close the turn span
+ if (turnSpan !== undefined) {
+ try {
+ turnSpan.end({
+ attrs: {
+ finishReason,
+ usage_inputTokens: totalUsage.inputTokens,
+ usage_outputTokens: totalUsage.outputTokens,
+ },
+ });
+ } catch {
+ // Swallow — D7.
+ }
}
}
return { messages: resultMessages, usage: totalUsage, finishReason };
}
+
+function createNoopLogger(): Logger {
+ return {
+ debug() {},
+ info() {},
+ warn() {},
+ error() {},
+ child() {
+ return createNoopLogger();
+ },
+ span() {
+ return {
+ id: "noop",
+ log: createNoopLogger(),
+ setAttributes() {},
+ addLink() {},
+ child() {
+ return this;
+ },
+ end() {},
+ };
+ },
+ };
+}
diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts
index 8cd4c44..bfbc7ca 100644
--- a/packages/session-orchestrator/src/extension.ts
+++ b/packages/session-orchestrator/src/extension.ts
@@ -29,6 +29,7 @@ export function activate(host: HostAPI): void {
resolveProvider: () => selectFirstProvider(host.getProviders()),
resolveTools: () => [...host.getTools().values()],
runTurn,
+ logger: host.logger,
});
host.provideService(sessionOrchestratorHandle, orchestrator);
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 302404e..37fc512 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -2,6 +2,7 @@ import type { ConversationStore } from "@dispatch/conversation-store";
import type {
AgentEvent,
ChatMessage,
+ Logger,
ProviderContract,
RunTurnInput,
RunTurnResult,
@@ -30,6 +31,8 @@ export interface SessionOrchestratorDeps {
readonly resolveTools: () => readonly ToolContract[];
readonly resolveDispatch?: () => ToolDispatchPolicy;
readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>;
+ /** Base logger (auto-scoped to this extension); childed per turn for span capture. */
+ readonly logger?: Logger;
}
export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator {
@@ -41,6 +44,7 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio
const tools = deps.resolveTools();
const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy();
const turnId = generateTurnId();
+ const turnLogger = deps.logger?.child({ conversationId, turnId });
const result = await deps.runTurn({
provider,
@@ -50,6 +54,7 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio
emit: onEvent,
conversationId,
turnId,
+ ...(turnLogger !== undefined ? { logger: turnLogger } : {}),
...(signal !== undefined ? { signal } : {}),
});
diff --git a/packages/tool-read-file/src/read-file.test.ts b/packages/tool-read-file/src/read-file.test.ts
index 0745b0b..f995b09 100644
--- a/packages/tool-read-file/src/read-file.test.ts
+++ b/packages/tool-read-file/src/read-file.test.ts
@@ -1,7 +1,7 @@
import { mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
-import type { ToolExecuteContext } from "@dispatch/kernel";
+import { createLogger, type ToolExecuteContext } from "@dispatch/kernel";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
createReadFileTool,
@@ -16,6 +16,11 @@ function stubCtx(): ToolExecuteContext {
toolCallId: "test-call-1",
onOutput: () => {},
signal: AbortSignal.timeout(5000),
+ log: createLogger(
+ { extensionId: "test" },
+ { emit: () => {} },
+ { now: () => 0, newId: () => "id" },
+ ),
};
}
diff --git a/tasks.md b/tasks.md
index a212fe5..8da0bcc 100644
--- a/tasks.md
+++ b/tasks.md
@@ -154,3 +154,39 @@ extension (read_file), hygiene CRs (getHostAPI + manifest honesty), and the
tabId→conversationId vocab rename. typecheck + biome clean; 218 tests green.
Remaining work is the parked design decisions (persistent waking agents, etc.) —
non-blocking. See HANDOFF.md "Open design decisions still parked".
+
+---
+
+## Observability — Phase A (logging substrate) ✅ DONE + verified live
+Goal: structured logs + spans captured durably to a journal file — the substrate
+for the agent-first observability subsystem (design: notes/observability-design.md).
+
+- [x] **Unit 1 — kernel-logging** (mimo-v2.5-pro): Logger/Span ABI
+ (`contracts/logging.ts`) — leveled/attributed/auto-scoped (host stamps
+ `extensionId`), incremental span records (open/close, crash-reconstructable, D3),
+ injected `LogSink` (pure record-builder). `ctx.log` on ToolContract; runTurn
+ opens turn/step/tool-call spans + the verbatim **"before"** prompt on the step span.
+- [x] **Unit 2 — journal-sink** (`packages/journal-sink/`, mimo-v2.5-pro): bootstrap
+ `LogSink` → NDJSON append-only journal (pure `serialize` + thin fs edge, rotation,
+ fail-safe drop — never blocks a turn). NOT an extension (HostDeps bootstrap dep).
+- [x] **Orchestrator fan-out + wiring** (direct): bus error-attrs `{ err }`,
+ FakeLogger/`ctx.log` test conformance; host-bin injects `logSink`+`logDeps`
+ (journal at `.dispatch/journal/`); session-orchestrator threads `host.logger`
+ (childed per turn) into runTurn.
+
+**Result:** typecheck clean, **250 tests** (218 → +22 journal-sink, +10 kernel),
+biome clean. **Live boot verified:** a turn's journal contains host logs + turn/step
+spans (open+close) + the `prompt:before` record carrying the verbatim messages array
+— the pre-mutation prompt is fully reconstructable. 2-process model: app + in-process
+sink → journal file; the collector (process 2) is Phase B. Redaction is
+per-extension self-redaction (no shared helper — isolation over DRY).
+
+### Next (observability)
+- **"AFTER" capture** — `provider.request` verbatim post-transform in
+ provider-openai-compat → full round-trip rebuild + before↔after diff (§10).
+- Minor refinement: move the large `prompt:before` payload from `attributes` into the
+ record `body` field (store-fat-serve-thin) — currently a stringified attribute.
+- Phase B: out-of-process collector → SQLite store + query (§11).
+
+Summons: prompts/phase-a-{kernel-logging,journal-sink}.md;
+reports/phase-a-{kernel-logging,journal-sink}.md.
diff --git a/tsconfig.json b/tsconfig.json
index 505d883..0f5f80f 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -9,6 +9,7 @@
{ "path": "./packages/session-orchestrator" },
{ "path": "./packages/transport-http" },
{ "path": "./packages/tool-read-file" },
+ { "path": "./packages/journal-sink" },
{ "path": "./packages/host-bin" }
]
}