diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 13:07:23 +0900 |
| commit | c48d8ac7160c3cdcf32ed4e488807d3daeb8d457 (patch) | |
| tree | 1fccd7f35f051d8bae6bc8c6c5e3ffa22e816d0b | |
| parent | 94dd5334b0277f3cf3b0588150a6615af86a32b3 (diff) | |
| download | dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.tar.gz dispatch-c48d8ac7160c3cdcf32ed4e488807d3daeb8d457.zip | |
feat(observability): Phase A logging substrate — Logger/Span ABI + journal sink (250 tests)
Structured, agent-first logging captured durably to an append-only journal file.
Kernel (contracts/logging.ts): leveled/attributed Logger + Span, auto-scoped per extension (host stamps manifest.id, unspoofable), incremental span records (open/close) for crash-reconstructable traces, injected LogSink (pure record-builder). ctx.log on ToolContract; runTurn opens turn/step/tool-call spans and captures the verbatim pre-mutation prompt (the 'before') on the step span.
journal-sink (new package, bootstrap dep — not an extension): LogSink appending NDJSON to a rotating journal; pure serialize + thin fs edge; fail-safe drop, never blocks a turn. host-bin injects it via HostDeps; session-orchestrator threads host.logger (childed per turn) into runTurn.
Redaction is per-extension self-redaction (no shared helper — isolation over DRY). The out-of-process collector + SQLite store + the verbatim 'after' provider.request capture are Phase B / next (notes/observability-design.md §10/§11).
Verified: tsc -b clean, 250 tests (218→+32), biome clean. Live boot: a turn's journal holds host logs + turn/step spans (open+close) + the prompt:before record with the verbatim messages array.
Harness: ORCHESTRATOR §3 rule-scoping map; .dispatch/rules/isolation-over-dry.md; notes/observability-design.md (design D1–D10 + Phase A/B plan).
29 files changed, 2412 insertions, 87 deletions
diff --git a/.dispatch/rules/isolation-over-dry.md b/.dispatch/rules/isolation-over-dry.md new file mode 100644 index 0000000..55ac13a --- /dev/null +++ b/.dispatch/rules/isolation-over-dry.md @@ -0,0 +1,12 @@ +# Rule: isolation over DRY + +Prefer self-contained, even DUPLICATED, code in each extension over a shared helper +that two+ extensions import. A shared module wired between sibling features is a +coupling smell — it breaks feature-as-a-library (P1) and ties their fates together. +Get consistency by sharing KNOWLEDGE in the harness (`.dispatch/rules`, skills, +`GLOSSARY.md`), never shared runtime code. +The ONLY sanctioned shared surfaces are the kernel ABI (host-provided, injected) and +typed contracts — NOT a new utility module between features. +When tempted to "extract a helper for consistency," stop: that is reputation-driven +DRY (P4). Duplicating a small function across the few features that need it is the +intended trade. @@ -16,3 +16,6 @@ reports/ # Local runtime DB .dispatch-data/ + +# Local observability journal (runtime artifact) +.dispatch/journal/ diff --git a/ORCHESTRATOR.md b/ORCHESTRATOR.md index afb5520..fdeb859 100644 --- a/ORCHESTRATOR.md +++ b/ORCHESTRATOR.md @@ -114,9 +114,10 @@ log into context as a hard failure. Write self-contained prompts. Structure: 1. **Role:** "You are the owner-agent for <unit>." -2. **Read first (ordered):** `AGENTS.md`, `.dispatch/rules/`, `GLOSSARY.md`, the - relevant `notes/restructure-plan.md` §-sections, and **the exact contract - files under `packages/kernel/src/contracts/` it builds against**. +2. **Read first (ordered):** `AGENTS.md`, the **scoped `.dispatch/rules/`** for this + unit's layer (the scoping map is below the recipe), `GLOSSARY.md`, the relevant + `notes/restructure-plan.md` §-sections, and **the exact contract files under + `packages/kernel/src/contracts/` it builds against**. 3. **Ownership (strict):** the EXACT files it may create/edit, and an explicit "do not touch anything else; if you need a change elsewhere, write a change- request in your report — do NOT edit it." @@ -138,6 +139,17 @@ Write self-contained prompts. Structure: Keep the prompt scoped (P6): don't restate what a frontier model knows; do state the project-specific, non-inferable rules. +**`.dispatch/rules/` scoping map** — include ONLY the rows matching the unit (per §0 +"scoped rules beat general rules"); do NOT dump every rule on every agent: +- **Every agent:** `one-owner.md`, `isolation-over-dry.md`. +- **Kernel unit:** `kernel-purity.md` + `pure-core.md` + `no-internal-mocks.md`. +- **Pure-core unit:** `pure-core.md` + `no-internal-mocks.md`. +- **Any extension coupling via hooks/services:** `typed-handles.md`. +- **Any extension that emits logs/spans (≈ all of them):** `extension-logging.md` + *(pending — authored with the observability substrate, see + `notes/observability-design.md` §9; keystone: each extension self-redacts its OWN + secrets in its OWN code — NO shared redaction helper).* + --- ## 4. Verification (the orchestrator's trust protocol) @@ -31,6 +31,7 @@ "dependencies": { "@dispatch/auth-apikey": "workspace:*", "@dispatch/conversation-store": "workspace:*", + "@dispatch/journal-sink": "workspace:*", "@dispatch/kernel": "workspace:*", "@dispatch/provider-openai-compat": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", @@ -39,6 +40,13 @@ "@dispatch/transport-http": "workspace:*", }, }, + "packages/journal-sink": { + "name": "@dispatch/journal-sink", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + }, + }, "packages/kernel": { "name": "@dispatch/kernel", "version": "0.0.0", @@ -107,6 +115,8 @@ "@dispatch/host-bin": ["@dispatch/host-bin@workspace:packages/host-bin"], + "@dispatch/journal-sink": ["@dispatch/journal-sink@workspace:packages/journal-sink"], + "@dispatch/kernel": ["@dispatch/kernel@workspace:packages/kernel"], "@dispatch/provider-openai-compat": ["@dispatch/provider-openai-compat@workspace:packages/provider-openai-compat"], diff --git a/notes/observability-design.md b/notes/observability-design.md new file mode 100644 index 0000000..6c09099 --- /dev/null +++ b/notes/observability-design.md @@ -0,0 +1,619 @@ +# Observability & Debugging — Design Scratch + +> **Status:** IDEATION / scratch. NOT the plan, NOT decided-to-build. Captures a +> live design discussion so decisions don't live only in chat (per ORCHESTRATOR +> "write up before pivoting"). Promote settled parts into +> `notes/restructure-plan.md` + `GLOSSARY.md` when we commit to building. +> **Scope:** backend only. **Driver:** old Dispatch had bugs that were near- +> impossible to diagnose because an agent had *no real data to read*. This fixes +> that at the root. +> +> **Read order (fresh agent picking this up):** `ORCHESTRATOR.md` → +> `notes/restructure-plan.md` (the §-refs below point into it) → `GLOSSARY.md` → +> this file. **Mode = IDEATION with the user** (discuss/design, do NOT build yet); +> the user owns the boundary + vocabulary calls (§5.2 / §5.6). +> **Where we left off:** **Logger ABI = A, LOCKED** (evolve in place, §8). **Now +> PLANNING Phase A** (logging substrate, §10): A1 kernel Logger/Span ABI → A2 +> `journal-sink` package (in-process, host-bin-injected) → A3 runtime span +> instrumentation. Phase A captures the **"BEFORE"** (verbatim pre-mutation prompt on +> the step span, via a new `LogRecord.body` blob); the **"AFTER"** (provider.request +> verbatim, D5) is the **next step** → completes full round-trip rebuild + the +> before↔after diff. **2 processes total, no more:** process 1 = main app *incl. the +> in-process journal sink* + the journal file; process 2 = the **collector** (Phase B). +> A package ≠ a process — the sink runs in-process (cheap non-blocking append); the +> file-seam + out-of-process collector is what delivers the crash-safety. +> **Redaction = each ext self-redacts IN ISOLATION** (no shared helper; P1/P7). +> **D10** = logs are one-way. **§9** extension-logging rule → `.dispatch/rules/` +> (placement decided; authored with the substrate). Crash-safety: journal file IS the +> durable queue (§6/D3). **Mode: PLANNING Phase A (not building yet).** Settled: +> D1–D10 (§2); open per-phase: §6. + +--- + +## 0. Goal in one paragraph + +First-class, **agent-first** debugging. When something goes wrong with a chat or +feature, you grab an **ID** and hand it to a **debugging agent**; it pinpoints the +*source* of the problem from a **complete, queryable trace** of what actually +happened, then hands its findings up to the orchestrator, which dispatches fixers. +The debugging agent is trained ONLY to find root cause — it does not fix. + +--- + +## 1. Principles specific to this subsystem + +- **Agent-first.** The primary trace *consumer is an LLM*, not a human dashboard. + This single fact killed the OpenTelemetry path (see D1): its entire payoff is + human visualization, which is useless to the agent. Every field we capture is + justified by *"what query must the debugging agent run?"* +- **"Complete" = completely *captured & queryable*, never completely *loaded into + context*.** Noise reduction is done by deterministic **query/filter tools** + (plain code), so the agent always reads a small, targeted slice (a timeline + skeleton → drill into one span; a diff; a validator verdict). **Store fat, serve + thin.** This is the harness's progressive-disclosure idea (P5) applied to + runtime data, and it's what protects context + usage. +- **Subordinate & fail-safe.** Observability must NEVER break or slow a turn. A + tracing error, full disk, or downed collector → drop/sample + warn, never stall. + (The §3.7 fault-containment principle: the turn is sovereign.) +- **The debugger must survive the crash it's debugging** (see D3). Robustness of + the collector is paramount — it's the explicit reason it gets an exception to + the in-process architecture. + +--- + +## 2. Decisions settled so far (with the named problem each solves — P4) + +### D1 — Agent-first; OpenTelemetry dropped +No OTLP exporter, no Jaeger/Tempo/Grafana, no OTel SDK, no "stay compatible" +constraint. Its value is human dashboards + cross-service fleet tracing — we are +one process whose reader is an agent. **Kept** (not as "OTel" but because it's the +right abstraction for *"what caused X"*): the **causal-trace model** — spans with +**parent + links + attributes**. Designed for agent queryability, not rendering. +- *Rejected by a named tradeoff, not NIH:* the OTel SDK's ambient "current span" + (`AsyncLocalStorage`) is exactly the hidden global P3 bans; we pass correlation + explicitly. Dropping it removes that tax + a heavy dep tree. + +### D2 — Boundary: kernel owns types+mechanism; one `observability` extension owns the rest +(User's granularity call, §5.2.) +- **Kernel (ABI):** the structured-log + span **types**, the scoped logger/trace + **handle** shape, and the **`onAny`** firehose (reserved in §5.4 / ORCHESTRATOR + §6, **not yet built**). Kernel still touches no I/O and names no feature. +- **`observability` extension:** capture (onAny tap + AgentEvent tap + the scoped + handles), the store, the query/filter tools, redaction, and the debugging-agent + harness (P7). + +### D3 — The collector runs OUT OF PROCESS (the sanctioned exception) +Single-process app (§3.7): a buggy extension can crash/`exit`/OOM everything. An +in-process collector buffering in memory loses its evidence **at the exact moment +you need it** (the crash). So the **sink** behind the logging handle is a separate +OS process. Three agent-first wins only this enables: +1. **Query a chat that killed the main process** — the collector stays alive, so + the agent can debug a dead app. (Decisive.) +2. **Attribute the crash** — collector sees pipe EOF / heartbeat loss → stamps a + synthetic terminal span "process exited unexpectedly after span X." In-process + that final fact dies with the recorder. +3. **Isolate the query workload** from the live turn. + +**Robust shape (write-ahead logging for telemetry):** +``` +main process ──(cheap, non-blocking, append-only line)──▶ journal file (durable buffer) + │ tails + standalone collector process + normalize → SQLite trace store → query API ◀── debugging agent +``` +- Hot path only **appends a line to an OS-buffered fd** — no SQLite, no blocking; + buffer full → drop/sample (fail-safe). +- The **journal file is the queue**: if both processes die, flushed bytes are on + disk; collector resumes tailing on restart. Worst case loses the last partial + line (the §3.4 "last in-flight" bound). +- Collector is **deliberately dead-simple** (read→normalize→store→serve): the + less it does, the less it can crash. +- **Exception scoped tightly:** extensions still emit through a normal + `host.logger`/trace handle — *the contract is unchanged*; only the sink's + *implementation* is out-of-process. This is the first real use of §3.7's + pre-designed "move an extension to a worker without a rewrite" (serializable + payloads). It fits the architecture rather than fighting it. +- *Cost (honest):* a second process host-bin spawns first / drains last + a tiny + line protocol. Justified by the "must be robust" priority. + +### D4 — Correlation model (reuses existing keys — P8, no invented id) +- **turn = one trace** (`turnId` ≈ trace id). +- **conversationId = an indexed attribute** grouping a thread's turn-traces. +- **span = one occurrence** of work (its own span id), with `parentSpanId`. +- **extension.id = an auto-stamped attribute** — the host hands each extension a + handle pre-tagged with its `manifest.id`; nobody can spoof or invent an id + scheme. (This is the clean version of "each extension broadcasts its id.") +- **Cross-feature causality = span links**, recorded **at the handoff moment** + (enqueue→dequeue, summon→subagent-turn) — *causal edges, not co-occurrence.* +- Every span carries the correlation keys of what it acts on (e.g. a cross-chat + op records both `source.conversationId` and `target.conversationId`). + +### D5 — Verbatim provider I/O capture (the highest-value capture) +The cure for "the agent had no real data." Rule: +- **Capture the request verbatim at the fetch edge — AFTER all transforms** (OAuth + body rewrite, tool-name prefixing, normalization). Corrupted-history bugs are + often *introduced by* a transform; capture before the bytes hit the wire = ground + truth (URL, model, params, full messages array, tool schemas, cache markers). +- **Capture the raw response/error too** (status + error JSON — it usually *names* + the defect, e.g. `MissingToolResultsError`). +- **Redact only secrets** (auth headers, vault-injected fields) — never the body. + **Technique = partial masking, NOT removal** (user-decided): reveal a few first + + last real chars with `…redacted…` between (e.g. `sk-…redacted…f9a`), **graduated by + length** so short secrets reveal less (tiers in §6). The field stays present and + **diffable** (tells you if a secret changed/rotated — serves the §3.1 cache-bust + diff) without storing the live value. + **Mechanism (user-decided): each extension self-redacts IN ISOLATION — no shared + helper.** Logs are self-generated, so they are self-redacted: the extension that + owns the data — it alone knows what is secret and *how* to censor it — masks the + value in **its own code, at the log call-site**, before the record is built. There + is **no shared `redact()` utility and nothing in the kernel** for this: a shared + helper would couple every secret-handling extension to one algorithm (violates + **P1** feature-as-a-library + kernel-minimalism) and contradicts "the code decides + *how* to censor." Consistency comes from the **harness, not shared code** (**P7**): + the §9 rule makes self-redaction mandatory and documents the default tiers; **each + extension reimplements them locally**. Duplicating a tiny mask across the few + secret-handling exts (providers, auth) is **intentional — isolation over DRY**. + Result: raw secrets never leave the producing extension's code — they reach neither + the sink nor the journal. + *(Supersedes the earlier "shared helper / mask at mint" sketches. Refines D2: the + observability ext owns redaction POLICY/config + store/query; the redaction ACT is + each source extension's own, self-contained.)* + **Caveat (honest):** no scheme can mask a secret nobody marked — so the real + enforcement is the §9 harness rule; an optional cheap sink-side pattern-scan + (`sk-…`) may backstop obvious leaks but is not relied upon. +- **Retain successful requests in-window** (not error-only) so "diff failing vs a + known-good chat" works. Cost: bodies are large → retention/rotation + compression. +- **Volume control (cache warming):** stamp the cheap `prefix.fingerprint` + + cache-token counts on *every* request, but persist the **full body only when the + fingerprint changed** (or on error) — keeps the bust findable without storing + every warming send verbatim. +- **Store fat, serve thin:** the blob lives in the store; the agent gets a **diff** + or a **validator verdict**, never the raw 200KB. +- *Future affordance (free from the data):* replay a stored request to reproduce. + +### D6 — Per-extension observability: free by default, rich by opt-in (P4) +- **Default = free:** the auto-scoped structured handle (D4) makes *every* + extension observable for nothing — no per-extension logging contract to write. +- **Opt-in = a typed debug surface** only where an extension has domain-specific + diagnostics worth standardizing (queue's enqueue/dequeue/deliver lifecycle). + Lives in that extension's own **P7 harness**, loaded by the debugging agent only + when investigating that extension. Mandating a hand-written contract per + extension would be the boilerplate P4 warns against. +- **Worked boundary (who computes an attribute):** the `prefix.fingerprint` (§3.1) + can't be computed by the generic collector — only the cache-warming/provider + extension knows where the cacheable prefix ends. So that extension stamps the + attribute; the collector just stores it. The template for every "who computes + this attribute?" question. + +### D7 — Performance posture +Negligible. An agent turn is **LLM-network-bound (seconds)**; span/attribute work +is microseconds — in the noise. The only real cost is **write I/O + volume**, +solved by: non-blocking append (D3) + out-of-process normalize + **don't trace +token deltas** (high-frequency *and* redundant — the chunk log already has final +text) + levels/sampling + fail-safe drop. + +### D8 — The "easy view": one compact projection, served to agent AND human +A single-line-per-event **transcript skeleton** of how a chat was built — prompt +assembly, thinking, tool calls, timings, sizes — each line collapsing to a summary +(`ai thought 1.8s`, `tool read_file ran 0.5s → ok (2.0k)`), expandable to the +verbatim span. Key realizations: +- **It's a *projection*, not new infra** — a pure `spans → text` formatter in the + query layer (P2: zero I/O, exhaustively unit-testable). NOT a frontend, NOT a + graphical timeline (that's out-of-scope frontend later, which can consume the + same projection). +- **Same artifact for both consumers** — this compact skeleton IS the + "store-fat-serve-thin" *thin overview* the agent reads first before drilling + into a span. The human "easy view" is that same text rendered to a terminal/ + markdown. So it *reinforces* agent-first; it does NOT reopen the human-viz + question (D1) — no heavy infra, no design deformation. +- **One genuinely new capture it forces: prompt-assembly provenance** (see §4) — + the context-filter chain (§3.2) records each contribution as a *segment* so we + can show `persona(1180) · tool:read_file(300) · skill:web(400) · user(412)`. + Cheap: *segmentation metadata over the verbatim body we already store* (D5), + not a copy. +- **Interlocks with cache debugging (§3.1):** a prefix-fingerprint bust attributes + to the exact contributor whose segment changed ("skill:web 400→420 between warm + and real send"). +- **Doubles as a completeness test:** if the easy view reconstructs how the chat + was built end-to-end, the §4 taxonomy captured enough — every skeleton line must + be backed by a real span/segment. + +### D9 — Optimization analytics = derived aggregates, NOT a separate metrics pillar +A third consumption pattern over the SAME spans (after incident-debugging and the +D8 easy view): longitudinal roll-ups to tune Dispatch itself — token size per turn, +model generation time, tool-call durations, tool/skill usage frequency, error +rates. **P4 call: these are `GROUP BY` queries over the span store, not a separate +counters/histograms pipeline.** Everything needed is already a span attribute (turn +tokens, step/tool durations, tool-call counts, `isError`, D8 segment sizes) — no +new capture. (The "metrics pillar" machinery earns nothing here: no dashboards, no +alerting — agent-first.) +- **Cost/benefit interlock (the high-value one):** cross D8 `prompt.assembly` + segment sizes — the *standing cost* a tool/skill/persona imposes on EVERY turn + (context budget + cache pressure) — against usage frequency + success — the + *realized benefit*. Ranks contributors by cost/benefit → directly answers + "should we keep this tool in the definitions?" A rarely-used tool with a fat + definition is pure overhead and a cache-bust risk. +- **Objective vs. interpreted:** counts/durations/error-rates are objective SQL. + "Is the tool's description good enough for the model to know when to use it?" is + a *hypothesis* an analysis agent forms from those signals + samples (defined but + never called; called then result ignored; high input-schema error rate) — not a + pure metric. Agent-interpreted, consistent with agent-first. +- **Retention asymmetry (the one real new requirement):** fat verbatim data (D5) + rotates out fast; cheap aggregates are rolled up and kept long for trend signal. + A periodic rollup `scheduledJob` (§2.3 scheduler) writes compact daily/weekly + summaries that survive raw-trace pruning. +- **Three consumers, one capture:** incident-debug (one trace) · D8 easy view (one + chat) · D9 analytics (many turns) — all the same spans. The capture is the asset; + every consumer is a cheap projection/aggregation. Validates the D2 boundary. + +### D10 — Logs/spans are a one-way emission, not a feature channel (clarifies D2/D3/§5.4) +Answering "do logs pass through other extensions?": **no.** Two distinct flows that +never mix: +- **Regular (feature) data** — extension→extension via typed contracts / hooks / + services (§3.5, §5.4): in-band where it matters (filters awaited, services return + values, the provider stream consumed by the kernel), type-anchored, **affects the + turn outcome**. +- **Logged (observability) data** — ANY extension → its auto-scoped `host.logger`/ + span → kernel mints the record (stamps `extensionId`/ids/timing; secrets are + already self-redacted by the source, D5) → `sink.emit` → **journal** → + out-of-process **collector** → store → + debugging agent. **One-way, fire-and-forget** (D3/D7): never awaited, never changes + a turn, swallowed on error. + +**Each extension authors its OWN logs** (handle auto-stamped with its id, D4). A +sibling feature extension **never receives** another's logs — that would be feature +coupling *through logs* (an anti-pattern; cross-feature reaction is what hooks are +for). The **sole reader** is the `observability` extension, off the (out-of-process) +collector/store + the in-process **`onAny`** firehose + the AgentEvent tap — the +§5.4 "observability only, never feature code" exception. **Many independent one-way +producers, exactly one privileged consumer** — so logging can never become a +backdoor feature bus. + +--- + +## 3. Validation — the bug catalog (P4: earn it against real failures) + +Each row is a real (old-Dispatch) failure → the query/diff that finds it. This is +the acid test that the design pays for itself. + +| Failure | How the trace finds it | +|---|---| +| **Chat crashed** | Spans written incrementally → trace ends at the last span before death; collector's EOF stamp shows "process exited after span X". Read the tail. | +| **API rejected corrupted history** | `provider.request` span holds the exact post-transform body + raw rejection; an incomplete tool call is literally a tool-call span with no matching tool-result span; a `reconcile.repair` span shows any auto-fix. | +| **Queued message shown twice** | Two `deliver` spans for one message id (a count query). | +| **Queued message in wrong chat** | The enqueue span's `target.conversationId` ≠ where it delivered; filter by message id across conversations → the misroute is explicit. | +| **Prompt cache returns 0% hit** *(new)* | See §3.1 — diff consecutive verbatim requests' cacheable prefixes; a `prefix.fingerprint` attribute flags the bust; span timestamps reveal a cache-warming gap > TTL. | + +### 3.1 Worked example — the prompt-cache 0%-hit bug (cache warming) +**The system (cache warming):** provider prompt caches expire ~5 min. To keep a +chat's cache warm without a user message, periodically **resend the (rewound) +conversation** to refresh the cached prefix — staying warm until the real next +message lands a hit. +**The bug:** occasionally the API reported **0% cache read** with no way to debug +why. Cache hits depend on **byte-exact prefix identity**, so a miss has many silent +causes, all invisible before now: +- the cacheable prefix changed (tool schemas reserialized in different key order; a + volatile value crept into the system prompt — timestamp/date/nonce; a + skill/agent injection changed; attachment re-encoded); +- the `cache_control` breakpoint moved or wasn't set; +- the **cache-warming request's prefix wasn't byte-identical** to the prefix the + next real message extends → you warmed the *wrong* cache; +- provider-side: the warming send fired late (gap > TTL), eviction, model change. + +**Why this design nails it:** +- Verbatim capture (D5) of **every** provider request — *including cache-warming + sends, flagged `warm` vs `real`* — plus **cache-token counts from the response** + (`cache_read`, `cache_creation`). +- A **`prefix.fingerprint`** attribute (hash of the cacheable prefix up to the + `cache_control` breakpoint): a cache bust = the fingerprint changed unexpectedly + between a warming send and the next real send. One grouped query flags it; then + diff the two bodies to see *which bytes* diverged. +- **Bonus insight this surfaces:** it makes **non-deterministic serialization** (a + Map's key order, unstable tool-schema generation) — an otherwise invisible + cache-killer — show up as a visible diff. Capture turns "0% and no idea why" into + "these 14 bytes of the prefix changed, introduced by transform Y." + +--- + +## 4. Capture taxonomy (draft — the §-next thread) + +Span kinds, driven by the bug list (attributes are illustrative, not final): +- **turn** (root) — `conversationId`, `turnId`, model, status, token usage. +- **step** — one LLM round-trip within a turn. +- **provider.request** *(the star, D5)* — verbatim body (post-transform), headers + (redacted), raw response/error, `cache_control` presence, `prefix.fingerprint`, + `warm|real`, cache-read/creation tokens, latency. +- **prompt.assembly** *(D8)* — ordered composition segments of the request: + `{contributor extension.id, kind (persona|tool-def|skill|agent-profile|history| + user-msg), role, length, contentRef→verbatim body}`. Powers the easy-view + prompt-assembly render + per-contributor cache-bust attribution. +- **tool-call** — `toolCallId`, name, input, result, isError, duration. +- **reconcile.repair** *(§3.4)* — what the load-time repair changed. +- **queue.enqueue / dequeue / deliver** — message id, source/target conversation, + links to the turn it caused. *(queue/session-features ext not built yet.)* +- **process.lifecycle** — boot / shutdown / **crash** (collector-synthesized). +- **structured log** — leveled, attributed, correlated line (the evolved Logger). + +--- + +## 5. Vocabulary (user-approved; promote to GLOSSARY.md when this lands) + +Approved this session: **trace**, **span**, **attribute**, **structured log**, +**observability**, **redaction**, **debugging agent**. Standard/training-baked — +they cost zero glossary justification (P6). +- **trace** — the full correlated record of one operation (≠ *history*/chunk log). +- **span** — one timed unit of work in a trace (parent + links + attributes). +- **attribute** — typed key/value on a span/log. *(aliases to avoid: tag, field, metadata)* +- **structured log** — a leveled, attributed, correlated record. *(avoid: debug message)* +- **span link** — a causal edge to another span/trace (cross-feature). +- **collector** — the out-of-process sink that normalizes + stores + serves traces. +- **journal** — the append-only durable buffer file between app and collector. +- **cache warming** — periodically resending a (rewound) conversation to keep its + provider prompt cache warm within the ~5-min TTL. *(aliases to avoid: reheat, + cache reheating)* — **user-decided.** Distinct from **wake** (the Claude + wake-probe scheduler in old code) — keep the two concepts separate. +- **redaction** — replacing a secret's middle with `…redacted…`, keeping the first + 3 + last 3 real chars (partial masking — keeps the field present + diffable; never + dropped). *(aliases to avoid: censoring, scrubbing, masking-as-removal)* — + technique **user-decided.** + +--- + +## 6. Open threads (not yet decided) + +- **Span/attribute vocabulary** — finalize §4 (next up). +- **Structured-`Logger` ABI change** — today it's unstructured `(message, + ...args)`; evolve to leveled + attributed + auto-scoped + correlated. Kernel + owns `Logger` → this is an ABI change with `lsp references` fan-out. Decide: + evolve in place vs. add a parallel structured channel. **Proposed shape: §8** + (awaiting the A/B call). +- **Retention/rotation sizing** — verbatim bodies are large; TTL + size cap + + compression; keep successes long enough for diffing. +- **Journal / IPC line protocol** — NDJSON to a pipe vs. unix socket vs. append + file; framing; backpressure (drop-oldest/sample). **The journal file IS the durable + queue** — there is **no in-memory log queue** in either process (that's the + crash-safety win): a normal app crash/OOM does NOT lose already-`write()`-en lines + (bytes live in the OS buffer, not the process heap); the collector resumes tailing + from its last offset on restart. *Open:* **fsync cadence** — per-line (durable, + slow) vs. periodic vs. none (fast); only a kernel-panic/power-loss risks the last + unflushed line (§3.4 bound). +- **Redaction policy** — *DECIDED:* (b) **short-secret guard** = graduated tiers by + length: **≥13 → reveal 3** each side · **11–12 → 2** · **8–10 → 1** · **≤7 → full + mask** (the `10` overlap resolved conservatively to reveal-1; `…redacted…` token is + fixed-width so it never leaks the hidden length). (c) **who redacts** = **each + extension self-redacts in isolation — NO shared helper, nothing in the kernel** + (D5/§9): the producing ext masks in its own code at the call-site. *Still open:* + (a) the exact secret-field/header list — declared **per-extension** (it knows its + own; e.g. openai-compat: `authorization` header + any vault-injected body field). +- **Cache-token fields** — *RESOLVED by inspection:* `Usage` already carries + `cacheReadTokens?` / `cacheWriteTokens?` (`packages/kernel/src/contracts/provider.ts`) + — **no provider CR needed**; only confirm the openai-compat provider *populates* + them (instrumentation detail, not a contract gap). +- **Collector supervision** — host-bin spawns first / drains last; restart + + resume-tail; what if the collector dies. +- **Levels & default capture set** — what's on by default (deltas off). +- **Easy-view rendering (D8)** — the projection format + delivery surface (CLI + command vs. transport route returning markdown; frontend later), and char-counts + (free) vs token-counts (needs a tokenizer) in the skeleton. +- **Analytics roll-ups (D9)** — rollup table shape + retention asymmetry (raw + traces short, aggregates long); `GROUP BY` indexes (tool_name, model, kind, time). + *(The periodic-job mechanism already exists: `host.scheduler.register` — + `packages/kernel/src/contracts/extension.ts`; only the table shape + retention + + indexes remain to design.)* +- **`onAny` firehose (kernel)** — reserved (§5.4) but unbuilt: confirm it's the + capture tap and define its shape (one listener; payload = hook id + payload + + correlation). +- **Debugging-agent delivery + the "grab an ID" entry point** — how an id is handed + to the agent; rides on the (unbuilt) `agents` extension + its P7 harness. +- **Sequencing / dependencies** — build the substrate now (Logger ABI, onAny, + collector, store, core span kinds); instrument the rest as their features land + (queue spans ← session-features; `prompt.assembly` ← context-filter chain; + debugging agent ← agents ext). +- **Extension-logging rule placement** — *DECIDED:* `.dispatch/rules/extension-logging.md` + (user's call); wired into the ORCHESTRATOR.md §3 scoping map for extension agents. + Content drafted (§9, tribal-knowledge only); authored when the substrate is built. + +## 7. Deferred / out of scope +- Replay-to-reproduce (the data supports it; build later). +- Adversary/multi-tenant isolation (we defend faults, not adversaries — §3.7). +- Human dashboards / metrics viz (agent-first; revisit only if a human need + appears). + +--- + +## 8. Logger ABI — proposed shape (resolving open-question #1; awaiting A/B) + +> **STATUS: proposal, NOT yet decided.** Forced by: structured records · +> auto-scoped to extension (D4/D6) · explicit correlation / no ambient (P3) · +> spans (D8/D9) · fire-and-forget non-blocking emit (D3) · sink-injected (purity). + +### Proposed types (kernel ABI) +```ts +type Level = "debug" | "info" | "warn" | "error"; // keep 4 (P6) +type Attributes = Readonly<Record<string, string | number | boolean | null>>; // flat: serializable (D3) + queryable (D9) + +interface LogContext { + readonly extensionId: string; // auto-stamped by host (D6) — not caller-supplied + readonly conversationId?: string; + readonly turnId?: string; + readonly spanId?: string; +} + +interface Logger { + readonly debug: (msg: string, attrs?: Attributes) => void; + readonly info: (msg: string, attrs?: Attributes) => void; + readonly warn: (msg: string, attrs?: Attributes) => void; + readonly error: (msg: string, attrs?: Attributes & { err?: unknown }) => void; + readonly child: (ctx: Partial<LogContext> & { attrs?: Attributes }) => Logger; // explicit value, passed down (P3) + readonly span: (name: string, attrs?: Attributes) => Span; +} + +interface Span { + readonly id: string; + readonly log: Logger; // pre-bound to this span + readonly setAttributes: (attrs: Attributes) => void; + readonly addLink: (target: { spanId: string; turnId?: string }, reason?: string) => void; // D4 causal edges + readonly child: (name: string, attrs?: Attributes) => Span; // step → tool-call nesting + readonly end: (outcome?: { err?: unknown; attrs?: Attributes }) => void; // records duration + status +} +``` + +### Why this is P3-safe (the OTel contrast) +`child()` / `span()` return **explicit values you pass down** (orchestrator → +`runTurn` → `ctx.log` into tools). No hidden "current span"; correlation travels +as an argument — exactly why we could drop OTel's `AsyncLocalStorage`. + +### Purity story (a "kernel logger" that touches no I/O) +`Logger`/`Span` are **pure record-builders over an injected `LogSink { +emit(record): void }`**. The kernel mints records (auto-stamps `extensionId`, +ids/timing) and calls `sink.emit`; the **sink is a host-bin bootstrap dependency** +(like the storage factory — available before any extension activates, per the +"Logger always available" contract). The sink writes the **journal** (D3); the +observability **collector owns the other end**. Kernel testable with a fake sink, +zero mocks. + +### The fork — DECIDE THIS (open-question #1) +- **A — evolve `Logger` in place (recommended).** Replace the string logger; the + simple call `info("msg")` still compiles (attrs optional) but flows through the + same correlated logger → **A subsumes B's ergonomics**. Con: ABI fan-out — but + tiny now (a few `logger.error("...", err)` sites → `{ err }`); only grows later. +- **B — parallel structured channel, keep the string logger.** Zero churn, but two + paths → drift, and casual `logger.info` stays uncorrelated / invisible to the + store — defeats "everything queryable" and grates against P8. + +### Sub-decisions baked into the sketch +- Inject `{ sink, now, newId }` — deterministic ids/timing in tests (§3.6). +- Flat scalar attributes (nested → stringify) — keeps D9 `GROUP BY`/indexes clean. +- `error(msg, { err })` normalizes today's positional `Error` arg. +- **Downstream contract change:** tools get **`ctx.log`** (span-bound) so they log + correlated without a global (P3) — a `ToolContract` ctx addition (§3.3 ctx + already carries `signal`/`onOutput`). + +--- + +## 9. Harness artifact — the extension-logging rule (tribal-knowledge ONLY, P6/P7) + +> **Status:** planned deliverable (user-requested). Extension-scoped (P7): the +> knowledge lives in the **harness**, and **each extension implements logging + +> redaction in its OWN isolated code** — there is NO shared logging/redaction helper +> to couple them (P1; isolation over DRY). Loaded by every extension-author agent. +> **Placement DECIDED** (user): `.dispatch/rules/extension-logging.md`, wired into the +> ORCHESTRATOR.md §3 scoping map for extension agents. **Write it when we build the +> substrate, not before.** P6 governs hard: state ONLY what a frontier model would +> get wrong about THIS system; omit anything inferable. + +**MUST state (non-inferable, project-specific):** +- **Self-redact your own secrets before logging — in your own code.** There is no + shared `redact()` and nothing in the kernel does it for you. YOU decide what is + secret and how to censor it. Default censoring = the §6 tiered partial mask + (reimplement it locally; deviate if your secret type warrants). +- Use the injected `host.logger` / the span / `ctx.log` you're handed — never + `console.*`, never a hand-rolled logger or your own correlation ids. +- **Don't set `extensionId` / invent an id scheme** — it is auto-stamped (D4). +- **Attributes are flat scalars** (nested → stringify) — D9 GROUP BY + journal + serialization need it. +- **Don't log token deltas / per-chunk streaming** (D7) — redundant + noisy. +- **Logs are one-way (D10)** — never read another extension's logs; cross-feature + reaction is a hook, not a log. +- Edge I/O (providers): capture the request **verbatim, post-transform, at the fetch + edge** (D5), self-redacting secret headers/fields in your own code. + +**Must NOT state (inferable — omit, P6):** +- What logging/levels are for; info/warn/error semantics; "log your errors." +- How to call `logger.info("msg", {attrs})` (obvious from the type). +- Generic "don't log sensitive data" platitudes (replaced by the concrete + self-redaction rule) or any SQL/GROUP BY explanation. + +--- + +## 10. Build plan — Phase A (logging substrate) + +> **Status:** PLANNED, prompts drafted (`prompts/phase-a-{kernel-logging,journal-sink}.md`), +> awaiting final user review before summon. **Goal:** every extension + every turn +> emits structured records durably into the **journal file**. Phase A = **1 process** +> (main app + in-process sink); the collector is process 2 (Phase B). Logger ABI = **A**. + +### Record contract — frozen FIRST (both units depend on it) +`packages/kernel/src/contracts/logging.ts` (kernel owns it). Agent finalizes the +exact shape; load-bearing constraints: +- **`LogRecord`** = flat, JSON-serializable **discriminated union**: `log` line | + `span-open` | `span-close`. Spans emitted **incrementally** (open at `span()`, + close at `end()`) so a crashed turn is reconstructable from the journal (D3). + Every variant carries correlation (`extensionId` auto-stamped + optional + `conversationId`/`turnId`/`spanId`/`parentSpanId`), `timestamp`, flat `attributes` + (queryable scalars), **and an optional `body` blob** (string) for large verbatim + payloads — the pre-mutation prompt now, the verbatim provider request later + (store-fat-serve-thin: query tools serve diffs/slices, not the raw blob). +- **`LogSink { emit(record): void }`** — fire-and-forget; the kernel never lets a + sink error escape into a turn (D7). +- **`Logger` / `Span`** — the §8 shapes (leveled/attributed/auto-scoped/correlated; + `child()`/`span()` return explicit values — P3, no ambient). + +### Unit 1 — `kernel-logging` (ONE coordinated kernel owner; single-writer over kernel) +Interlocked ABI change (cf. the `tabId→conversationId` rename). Stage commits +(**contract checkpoint → host → runtime**) so the contract freezes early for Unit 2. +Owns: `contracts/logging.ts` (new), `contracts/extension.ts` (evolve `Logger`), +`contracts/tool.ts` (+ `ctx.log`), `contracts/index.ts` (re-export), `host/host.ts` +(+ `logSink` on `HostDeps`, mirroring `storageFactory` @ host.ts:34/60; build each +extension's auto-scoped logger that stamps `manifest.id`) + `host.test.ts`, +`runtime/{run-turn,dispatch}.ts` (open/close turn/step/tool-call spans; thread +`ctx.log`) + tests. **The `step` span records the verbatim pre-mutation prompt** +(messages + tools + opts) in its `body` — the **"BEFORE"** capture. Pure +record-builder injected with `{ now, newId }`. + +### Unit 2 — `journal-sink` (bootstrap library, NOT an extension) +`packages/journal-sink/`. Implements kernel `LogSink`: pure `record → one NDJSON +line` core + thin fs append edge + rotation/backpressure (drop-oldest + warn — D3 +fail-safe). Imports the frozen `LogRecord`/`LogSink` TYPES; never redefines them. + +### host-bin wiring (orchestrator CR) +Construct the sink, add `logSink` to `deps: HostDeps` (main.ts:77) so `host.logger` +works before any extension activates. Root tsconfig ref + host-bin dep on +`@dispatch/journal-sink` + `bun install` + biome import-sort (same wiring as +tool-read-file, Step 2). + +### Order & parallelism +Sequential for safety (record type is a hard dep): **Unit 1 (freeze contract → +finish) → Unit 2 → host-bin wiring**. Optional overlap: start Unit 2 once Unit 1's +`logging.ts` checkpoint is committed. + +### Open sub-decisions (lock while finalizing prompts) +- Package name `journal-sink` (vs `log-journal`). · fsync cadence (default periodic). +- Journal file path + rotation policy (size cap; on-disk location). +**Verify:** `typecheck`/`test`/`check` clean; live boot → `host.logger` lines land in +the journal file. + +### Next step (after Phase A) — the "AFTER" capture +`provider.request` verbatim post-transform (D5) inside `provider-openai-compat`: the +exact serialized request bytes + raw response/error + cache tokens, auth header +self-redacted. This **completes full round-trip rebuild** and unlocks the +**before↔after diff** (kernel "before" vs wire "after" → pinpoints transform bugs). +Depends only on the Phase A Logger ABI; lives entirely in the provider extension. +*(Full per-extension prompt-segment provenance — D8 — comes later, with the +context-filter chain.)* + +--- + +## 11. Phase B preview — collector flush into SQLite (NOT built; model only) + +> Captured so it's not lost; the exact knobs are open Phase B sub-decisions. + +The flush into the SQLite **store** is the **collector's** job (process 2), async + +continuous, **decoupled from turns** — the app only appends to the journal and never +waits. Per tick the collector: +1. **Tails** the journal — reads new complete lines since its last committed + byte-offset (short poll loop or fs-watch). +2. **Batches** them into **one SQLite transaction**, commits. +3. **Advances a persisted consume-offset.** + +- **Cadence:** per batch/tick, bounded by *interval OR batch-size, whichever first* + (e.g. ~250 ms or N records) — never one-txn-per-record (avoids per-record fsync), + never per-turn. Sub-second / near-real-time. +- **Crash-safety = at-least-once + idempotent:** resume from the persisted offset on + restart → may re-read a few lines → writes are idempotent (`INSERT OR IGNORE` on a + record/span id). No loss; harmless reprocess. This is what lets you query a chat + **after the app that produced it crashed** (D3). +- **Queryability lag** ≈ one batch interval; post-mortem still works (journal is + durable; the collector consumes whenever it's up). +- **Open (Phase B):** poll vs fs-watch; interval/batch-size; offset storage (store + metadata vs sidecar); dedup key; + the store schema/indexes (§6). diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json index 12ae633..a3e24c8 100644 --- a/packages/host-bin/package.json +++ b/packages/host-bin/package.json @@ -11,6 +11,7 @@ "@dispatch/provider-openai-compat": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", "@dispatch/transport-http": "workspace:*", - "@dispatch/tool-read-file": "workspace:*" + "@dispatch/tool-read-file": "workspace:*", + "@dispatch/journal-sink": "workspace:*" } } diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index 0cb0d37..0c795b8 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -2,14 +2,16 @@ import { mkdirSync } from "node:fs"; import { dirname } from "node:path"; import { extension as authApikeyExt } from "@dispatch/auth-apikey"; import { extension as conversationStoreExt } from "@dispatch/conversation-store"; +import { createJournalSink } from "@dispatch/journal-sink"; import { type ConfigAccess, createBus, createHost, + createLogger, type EventsEmitter, type Extension, type HostDeps, - type Logger, + type LogDeps, type PermissionGate, type ScheduledJob, type SecretsAccess, @@ -22,15 +24,6 @@ import { extension as toolReadFileExt } from "@dispatch/tool-read-file"; import { createServer, extension as transportHttpExt } from "@dispatch/transport-http"; import { configMapToAccess, envToConfigMap } from "./config.js"; -function createConsoleLogger(): Logger { - return { - debug: (message: string, ...args: unknown[]) => console.debug(`[debug] ${message}`, ...args), - info: (message: string, ...args: unknown[]) => console.info(`[info] ${message}`, ...args), - warn: (message: string, ...args: unknown[]) => console.warn(`[warn] ${message}`, ...args), - error: (message: string, ...args: unknown[]) => console.error(`[error] ${message}`, ...args), - }; -} - function createEmptySecrets(): SecretsAccess { return { get: async () => null, @@ -64,7 +57,11 @@ const CORE_EXTENSIONS: readonly Extension[] = [ ]; async function boot(): Promise<void> { - const logger = createConsoleLogger(); + const journalPath = process.env.DISPATCH_JOURNAL ?? "./.dispatch/journal/app.ndjson"; + mkdirSync(dirname(journalPath), { recursive: true }); + const logSink = createJournalSink({ path: journalPath }); + const logDeps: LogDeps = { now: () => Date.now(), newId: () => crypto.randomUUID() }; + const logger = createLogger({ extensionId: "host-bin" }, logSink, logDeps); const dbPath = process.env.DISPATCH_DB ?? "./.dispatch-data/dispatch.db"; mkdirSync(dirname(dbPath), { recursive: true }); @@ -83,6 +80,8 @@ async function boot(): Promise<void> { scheduler: createNoopScheduler(), bus: createBus(logger), events: createNoopEvents(), + logSink, + logDeps, }; const host = createHost(CORE_EXTENSIONS, deps); @@ -102,6 +101,7 @@ async function boot(): Promise<void> { const port = Number(process.env.BACKEND_PORT) || Number(process.env.PORT) || 24203; const server = Bun.serve({ fetch: app.fetch, port }); logger.info(`Dispatch listening on http://localhost:${server.port}`); + console.info(`Dispatch listening on http://localhost:${server.port}`); } boot().catch((err) => { diff --git a/packages/journal-sink/package.json b/packages/journal-sink/package.json new file mode 100644 index 0000000..f17d476 --- /dev/null +++ b/packages/journal-sink/package.json @@ -0,0 +1,11 @@ +{ + "name": "@dispatch/journal-sink", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*" + } +} diff --git a/packages/journal-sink/src/index.ts b/packages/journal-sink/src/index.ts new file mode 100644 index 0000000..02e3e2c --- /dev/null +++ b/packages/journal-sink/src/index.ts @@ -0,0 +1,2 @@ +export type { ClockOps, FsOps, JournalSinkOpts } from "./journal-sink.js"; +export { createJournalSink, serialize } from "./journal-sink.js"; diff --git a/packages/journal-sink/src/journal-sink.test.ts b/packages/journal-sink/src/journal-sink.test.ts new file mode 100644 index 0000000..a6531c7 --- /dev/null +++ b/packages/journal-sink/src/journal-sink.test.ts @@ -0,0 +1,309 @@ +import { mkdtemp, readFile, rm, stat } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { LogRecord } from "@dispatch/kernel"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ClockOps, FsOps } from "./journal-sink.js"; +import { createJournalSink, serialize } from "./journal-sink.js"; + +// --- Fixtures: one per LogRecord variant --- + +const logRecord: LogRecord = { + kind: "log", + level: "info", + msg: "hello world", + timestamp: 1700000000000, + extensionId: "test-ext", + conversationId: "conv-1", + turnId: "turn-1", + spanId: "span-1", + attributes: { key: "value" }, +}; + +const logRecordMinimal: LogRecord = { + kind: "log", + level: "debug", + msg: "minimal", + timestamp: 1700000000001, + extensionId: "ext-min", +}; + +const spanOpenRecord: LogRecord = { + kind: "span-open", + spanId: "span-2", + name: "provider.request", + timestamp: 1700000000100, + extensionId: "test-ext", + conversationId: "conv-1", + turnId: "turn-1", + parentSpanId: "span-1", + attributes: { model: "gpt-4" }, + links: [{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }], + body: "verbatim request body", +}; + +const spanOpenRecordMinimal: LogRecord = { + kind: "span-open", + spanId: "span-3", + name: "tool.call", + timestamp: 1700000000200, + extensionId: "ext-tool", +}; + +const spanCloseRecord: LogRecord = { + kind: "span-close", + spanId: "span-2", + name: "provider.request", + timestamp: 1700000000500, + durationMs: 400, + status: "ok", + extensionId: "test-ext", + conversationId: "conv-1", + turnId: "turn-1", + parentSpanId: "span-1", + attributes: { cacheHit: true }, + links: [{ spanId: "span-0" }], +}; + +const spanCloseRecordError: LogRecord = { + kind: "span-close", + spanId: "span-4", + name: "failing-step", + timestamp: 1700000000600, + durationMs: 100, + status: "error", + extensionId: "ext-err", +}; + +// --- Pure core: serialize --- + +describe("serialize", () => { + const allRecords = [ + { name: "log (full)", record: logRecord }, + { name: "log (minimal)", record: logRecordMinimal }, + { name: "span-open (full)", record: spanOpenRecord }, + { name: "span-open (minimal)", record: spanOpenRecordMinimal }, + { name: "span-close (full)", record: spanCloseRecord }, + { name: "span-close (error)", record: spanCloseRecordError }, + ]; + + for (const { name, record } of allRecords) { + it(`produces exactly one NDJSON line for ${name}`, () => { + const line = serialize(record); + expect(line.endsWith("\n")).toBe(true); + expect(line.endsWith("\n\n")).toBe(false); + const parsed = JSON.parse(line); + expect(parsed).toEqual(record); + }); + + it(`round-trips ${name} through JSON.parse`, () => { + const line = serialize(record); + const roundTripped: LogRecord = JSON.parse(line); + expect(roundTripped).toEqual(record); + expect(roundTripped.kind).toBe(record.kind); + }); + } + + it("preserves all LogRecord variant kinds", () => { + expect(JSON.parse(serialize(logRecord)).kind).toBe("log"); + expect(JSON.parse(serialize(spanOpenRecord)).kind).toBe("span-open"); + expect(JSON.parse(serialize(spanCloseRecord)).kind).toBe("span-close"); + }); + + it("preserves optional fields when present", () => { + const parsed = JSON.parse(serialize(spanOpenRecord)); + expect(parsed.body).toBe("verbatim request body"); + expect(parsed.links).toEqual([{ spanId: "span-0", turnId: "turn-0", reason: "caused-by" }]); + expect(parsed.attributes).toEqual({ model: "gpt-4" }); + }); + + it("omits optional fields when absent (no undefined in output)", () => { + const parsed = JSON.parse(serialize(logRecordMinimal)); + expect(parsed.conversationId).toBeUndefined(); + expect(parsed.turnId).toBeUndefined(); + expect(parsed.spanId).toBeUndefined(); + expect(parsed.attributes).toBeUndefined(); + expect(parsed.body).toBeUndefined(); + expect("conversationId" in parsed).toBe(false); + }); +}); + +// --- Imperative shell: createJournalSink (fs integration) --- + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "journal-sink-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +describe("createJournalSink", () => { + it("emits records that can be read back as NDJSON", async () => { + const path = join(tmpDir, "journal.log"); + const sink = createJournalSink({ path, fsync: "none" }); + + const records = [logRecord, spanOpenRecord, spanCloseRecord]; + for (const r of records) { + sink.emit(r); + } + + const content = await readFile(path, "utf8"); + const lines = content.split("\n").filter((l) => l.length > 0); + expect(lines).toHaveLength(3); + + for (let i = 0; i < lines.length; i++) { + const parsed: LogRecord = JSON.parse(lines[i] ?? ""); + expect(parsed).toEqual(records[i]); + } + }); + + it("warns and does NOT throw when writing to a bad path", () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const badPath = join(tmpDir, "nonexistent", "deep", "journal.log"); + const sink = createJournalSink({ path: badPath, fsync: "none" }); + + expect(() => sink.emit(logRecord)).not.toThrow(); + expect(warnSpy).toHaveBeenCalled(); + expect(warnSpy.mock.calls[0]?.[0]).toContain("[journal-sink]"); + + warnSpy.mockRestore(); + }); + + it("appends to existing file on creation", async () => { + const path = join(tmpDir, "journal.log"); + const { writeFileSync } = await import("node:fs"); + writeFileSync(path, serialize(logRecord)); + + const sink = createJournalSink({ path, fsync: "none" }); + sink.emit(spanOpenRecord); + + const content = await readFile(path, "utf8"); + const lines = content.split("\n").filter((l) => l.length > 0); + expect(lines).toHaveLength(2); + expect(JSON.parse(lines[0] ?? "").kind).toBe("log"); + expect(JSON.parse(lines[1] ?? "").kind).toBe("span-open"); + }); + + it("warns and drops records when fs.write throws mid-emit", () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const brokenFs: FsOps = { + open: () => 1, + write: () => { + throw new Error("disk full"); + }, + close: () => {}, + rename: () => {}, + statSize: () => 0, + fsync: () => {}, + }; + const sink = createJournalSink({ path: join(tmpDir, "j.log"), fs: brokenFs, fsync: "none" }); + + expect(() => sink.emit(logRecord)).not.toThrow(); + expect(warnSpy).toHaveBeenCalled(); + expect(warnSpy.mock.calls[0]?.[1]).toBeInstanceOf(Error); + + warnSpy.mockRestore(); + }); +}); + +// --- Rotation --- + +describe("rotation", () => { + it("rotates when file exceeds maxBytes", async () => { + const path = join(tmpDir, "journal.log"); + const sink = createJournalSink({ path, maxBytes: 100, fsync: "none" }); + + sink.emit(logRecord); + sink.emit(logRecordMinimal); + sink.emit(spanOpenRecord); + + const content = await readFile(path, "utf8"); + const lines = content.split("\n").filter((l) => l.length > 0); + expect(lines.length).toBeGreaterThan(0); + + let rotatedExists = false; + try { + await stat(`${path}.1`); + rotatedExists = true; + } catch { + // May not exist if rotation hasn't triggered yet. + } + expect(rotatedExists).toBe(true); + + const rotatedContent = await readFile(`${path}.1`, "utf8"); + const allLines = [...rotatedContent.split("\n").filter((l) => l.length > 0), ...lines]; + for (const line of allLines) { + const parsed = JSON.parse(line); + expect(["log", "span-open", "span-close"]).toContain(parsed.kind); + } + }); +}); + +// --- Fsync --- + +describe("fsync", () => { + it("calls fsync periodically when mode is periodic", () => { + let fsyncCalls = 0; + let currentTime = 0; + const mockFs: FsOps = { + open: () => 1, + write: () => {}, + close: () => {}, + rename: () => {}, + statSize: () => 0, + fsync: () => { + fsyncCalls++; + }, + }; + const mockClock: ClockOps = { + now: () => currentTime, + }; + const sink = createJournalSink({ + path: join(tmpDir, "j.log"), + fs: mockFs, + clock: mockClock, + fsync: "periodic", + }); + + sink.emit(logRecord); + expect(fsyncCalls).toBe(0); + + currentTime = 6_000; + sink.emit(logRecordMinimal); + expect(fsyncCalls).toBe(1); + + sink.emit(spanOpenRecord); + expect(fsyncCalls).toBe(1); + + currentTime = 12_000; + sink.emit(spanCloseRecord); + expect(fsyncCalls).toBe(2); + }); + + it("never calls fsync when mode is none", () => { + let fsyncCalls = 0; + const mockFs: FsOps = { + open: () => 1, + write: () => {}, + close: () => {}, + rename: () => {}, + statSize: () => 0, + fsync: () => { + fsyncCalls++; + }, + }; + const sink = createJournalSink({ + path: join(tmpDir, "j.log"), + fs: mockFs, + fsync: "none", + }); + + sink.emit(logRecord); + sink.emit(logRecord); + sink.emit(logRecord); + expect(fsyncCalls).toBe(0); + }); +}); diff --git a/packages/journal-sink/src/journal-sink.ts b/packages/journal-sink/src/journal-sink.ts new file mode 100644 index 0000000..8e36fd6 --- /dev/null +++ b/packages/journal-sink/src/journal-sink.ts @@ -0,0 +1,171 @@ +import { closeSync, fsyncSync, openSync, renameSync, statSync, writeSync } from "node:fs"; +import type { LogRecord, LogSink } from "@dispatch/kernel"; + +// --- Pure core (no I/O) --- + +/** + * Serialize a LogRecord to exactly one NDJSON line (record JSON + newline). + * Pure function — no I/O, no side effects. + */ +export function serialize(record: LogRecord): string { + return `${JSON.stringify(record)}\n`; +} + +// --- Imperative shell (fs edge) --- + +/** Injectable fs operations — confined to the edge. */ +export interface FsOps { + readonly open: (path: string) => number; + readonly write: (fd: number, data: string) => void; + readonly close: (fd: number) => void; + readonly rename: (oldPath: string, newPath: string) => void; + readonly statSize: (path: string) => number; + readonly fsync: (fd: number) => void; +} + +/** Clock injection for fsync interval. */ +export interface ClockOps { + readonly now: () => number; +} + +export interface JournalSinkOpts { + readonly path: string; + readonly maxBytes?: number; + readonly fsync?: "periodic" | "none"; + readonly fs?: FsOps; + readonly clock?: ClockOps; +} + +const DEFAULT_MAX_BYTES = 50 * 1024 * 1024; // 50 MB +const FSYNC_INTERVAL_MS = 5_000; // 5 seconds + +/** + * Create a LogSink that durably appends each record to the journal file + * as one NDJSON line. Fail-safe: write errors drop + warn, never throw. + * Rotates when file exceeds maxBytes (rename → .1, reopen fresh). + */ +export function createJournalSink(opts: JournalSinkOpts): LogSink { + const filePath = opts.path; + const maxBytes = opts.maxBytes ?? DEFAULT_MAX_BYTES; + const syncMode = opts.fsync ?? "periodic"; + const fs = opts.fs ?? createDefaultFsOps(); + const clock = opts.clock ?? { now: () => Date.now() }; + + const NO_FD = -1; + let fd: number; + let bytesWritten: number; + try { + fd = fs.open(filePath); + bytesWritten = fs.statSize(filePath); + } catch { + fd = NO_FD; + bytesWritten = 0; + } + let lastFsyncAt = clock.now(); + + function tryOpen(): boolean { + if (fd !== NO_FD) return true; + try { + fd = fs.open(filePath); + bytesWritten = 0; + return true; + } catch { + return false; + } + } + + function rotate(): void { + if (fd !== NO_FD) { + try { + fs.close(fd); + } catch { + // Ignore close errors during rotation. + } + } + const rotatedPath = `${filePath}.1`; + try { + fs.rename(filePath, rotatedPath); + } catch { + // If rename fails, just truncate by reopening. + } + try { + fd = fs.open(filePath); + } catch { + fd = NO_FD; + } + bytesWritten = 0; + } + + function maybeFsync(): void { + if (syncMode !== "periodic" || fd === NO_FD) return; + const now = clock.now(); + if (now - lastFsyncAt >= FSYNC_INTERVAL_MS) { + try { + fs.fsync(fd); + } catch { + // Swallow — fail-safe. + } + lastFsyncAt = now; + } + } + + const sink: LogSink = { + emit(record: LogRecord): void { + try { + if (!tryOpen()) { + console.warn("[journal-sink] cannot open journal, dropping record"); + return; + } + + const line = serialize(record); + const lineBytes = Buffer.byteLength(line, "utf8"); + + if (bytesWritten + lineBytes > maxBytes) { + rotate(); + if (fd === NO_FD) { + console.warn("[journal-sink] rotation failed, dropping record"); + return; + } + } + + fs.write(fd, line); + bytesWritten += lineBytes; + maybeFsync(); + } catch (err) { + // Fail-safe: drop + warn, never throw to the caller (D3/D7). + console.warn("[journal-sink] write failed, dropping record:", err); + } + }, + }; + + return sink; +} + +// --- Default fs ops (real Bun/Node I/O) --- + +function createDefaultFsOps(): FsOps { + return { + open(path: string): number { + return openSync(path, "a"); + }, + write(fd: number, data: string): void { + writeSync(fd, data); + }, + close(fd: number): void { + closeSync(fd); + }, + rename(oldPath: string, newPath: string): void { + renameSync(oldPath, newPath); + }, + statSize(path: string): number { + try { + return statSync(path).size; + } catch { + return 0; + } + }, + fsync(fd: number): void { + fsyncSync(fd); + }, + }; +} diff --git a/packages/journal-sink/tsconfig.json b/packages/journal-sink/tsconfig.json new file mode 100644 index 0000000..ff99a43 --- /dev/null +++ b/packages/journal-sink/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [{ "path": "../kernel" }] +} diff --git a/packages/kernel/src/bus/bus.test.ts b/packages/kernel/src/bus/bus.test.ts index 7366b50..05cf875 100644 --- a/packages/kernel/src/bus/bus.test.ts +++ b/packages/kernel/src/bus/bus.test.ts @@ -1,6 +1,6 @@ import { beforeEach, describe, expect, it } from "vitest"; -import type { Logger } from "../contracts/extension.js"; import { defineEventHook, defineFilter, defineService } from "../contracts/hooks.js"; +import type { Logger, Span } from "../contracts/logging.js"; import { type Bus, createBus } from "./bus.js"; import { applyFilterChain, dispatchEventSync, sortFilters } from "./pure.js"; @@ -10,15 +10,30 @@ interface FakeLogger extends Logger { function createFakeLogger(): FakeLogger { const errors: Array<{ message: string; args: unknown[] }> = []; - return { + const logger: FakeLogger = { errors, debug: () => {}, info: () => {}, warn: () => {}, - error: (message: string, ...args: unknown[]) => { - errors.push({ message, args }); + error: (message, attrs) => { + errors.push({ message, args: attrs === undefined ? [] : [attrs] }); }, + child: () => logger, + span: () => makeNoopSpan(logger), }; + return logger; +} + +function makeNoopSpan(log: Logger): Span { + const span: Span = { + id: "noop", + log, + setAttributes: () => {}, + addLink: () => {}, + child: () => span, + end: () => {}, + }; + return span; } describe("event hooks", () => { diff --git a/packages/kernel/src/bus/pure.ts b/packages/kernel/src/bus/pure.ts index 7cd9143..4d90fc6 100644 --- a/packages/kernel/src/bus/pure.ts +++ b/packages/kernel/src/bus/pure.ts @@ -12,11 +12,11 @@ export function dispatchEventSync<T>( const result = handler(payload); if (result instanceof Promise) { result.catch((err: unknown) => { - logger.error(`Event hook "${hookId}" handler rejected`, err); + logger.error(`Event hook "${hookId}" handler rejected`, { err }); }); } } catch (err) { - logger.error(`Event hook "${hookId}" handler threw`, err); + logger.error(`Event hook "${hookId}" handler threw`, { err }); } } } @@ -32,7 +32,7 @@ export async function dispatchEventAsync<T>( try { await handler(payload); } catch (err) { - logger.error(`Event hook "${hookId}" handler threw`, err); + logger.error(`Event hook "${hookId}" handler threw`, { err }); } }); @@ -76,7 +76,7 @@ export async function applyFilterChain<T>( current = await fn(current); } catch (err) { if (failClosed) throw err; - logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, err); + logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, { err }); } } return current; diff --git a/packages/kernel/src/contracts/extension.ts b/packages/kernel/src/contracts/extension.ts index 424f714..00b41f1 100644 --- a/packages/kernel/src/contracts/extension.ts +++ b/packages/kernel/src/contracts/extension.ts @@ -15,6 +15,10 @@ import type { FilterHandler, ServiceHandle, } from "./hooks.js"; +import type { Logger } from "./logging.js"; + +export type { Logger } from "./logging.js"; + import type { ProviderContract } from "./provider.js"; import type { ToolContract } from "./tool.js"; @@ -132,15 +136,7 @@ export interface ScheduledJob { readonly execute: () => void | Promise<void>; } -// --- Logger --- - -/** Logger interface available to every extension via the Host API. */ -export interface Logger { - readonly debug: (message: string, ...args: unknown[]) => void; - readonly info: (message: string, ...args: unknown[]) => void; - readonly warn: (message: string, ...args: unknown[]) => void; - readonly error: (message: string, ...args: unknown[]) => void; -} +// --- Logger is re-exported from logging.ts (structured, correlated) --- // --- Config --- diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index 0b578cb..e4eba87 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -45,7 +45,6 @@ export type { EventsEmitter, Extension, HostAPI, - Logger, Manifest, ManifestCapabilities, ManifestContributions, @@ -57,7 +56,6 @@ export type { StorageNamespace, TrustLevel, } from "./extension.js"; - export type { EventHandler, EventHookDescriptor, @@ -66,9 +64,25 @@ export type { HookDescriptor, ServiceHandle, } from "./hooks.js"; - export { defineEventHook, defineFilter, defineService } from "./hooks.js"; export type { + Attributes, + ErrorAttributes, + Level, + LogContext, + LogDeps, + Logger, + LogLineRecord, + LogRecord, + LogSink, + Span, + SpanCloseRecord, + SpanLink, + SpanOpenRecord, + SpanStatus, +} from "./logging.js"; +export { createLogger } from "./logging.js"; +export type { FinishEvent, ProviderContract, ProviderErrorEvent, diff --git a/packages/kernel/src/contracts/logging.ts b/packages/kernel/src/contracts/logging.ts new file mode 100644 index 0000000..8e1eef3 --- /dev/null +++ b/packages/kernel/src/contracts/logging.ts @@ -0,0 +1,461 @@ +/** + * Logging contract — structured, correlated, span-capable Logger/Span ABI. + * + * The kernel owns types + pure record-builders. NO I/O — the LogSink is + * injected by the host-bin. Logger/Span mint records and call sink.emit; + * sink errors are swallowed (D7 — the turn is sovereign). + * + * Key properties: + * - P3-safe: correlation flows via explicit child()/span() values, no ambient. + * - P2: record-builder is pure ({ now, newId } injected). + * - D3: spans emitted incrementally (open at span(), close at end()). + * - D6: extensionId auto-stamped by host, not caller-supplied. + * - Flat scalar attributes (serializable D3, queryable D9). + */ + +// --- Levels --- + +export type Level = "debug" | "info" | "warn" | "error"; + +// --- Attributes --- + +/** Flat, serializable key/value pairs. Caller stringifies nested objects. */ +export type Attributes = Readonly<Record<string, string | number | boolean | null>>; + +// --- LogContext (correlation) --- + +/** Correlation context carried on every log record and span. */ +export interface LogContext { + /** Auto-stamped by host from manifest.id (D6) — never caller-supplied. */ + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly spanId?: string; + readonly parentSpanId?: string; +} + +// --- Span --- + +/** + * A timed unit of work within a trace. Opened via `logger.span(name)`, + * closed via `span.end()`. Emits incremental open/close records so a + * crashed turn is reconstructable from the journal (D3). + */ +export interface Span { + readonly id: string; + /** Pre-bound Logger scoped to this span's correlation. */ + readonly log: Logger; + /** Add or overwrite attributes on this span. */ + readonly setAttributes: (attrs: Attributes) => void; + /** Record a causal link to another span (D4 cross-feature causality). */ + readonly addLink: ( + target: { readonly spanId: string; readonly turnId?: string }, + reason?: string, + ) => void; + /** Open a child span nested under this one. */ + readonly child: (name: string, attrs?: Attributes) => Span; + /** + * Close this span. Records duration + status. Optionally records an + * error and/or additional attributes. + */ + readonly end: (outcome?: { readonly err?: unknown; readonly attrs?: Attributes }) => void; +} + +// --- Logger --- + +/** + * Structured, correlated logger. The host auto-scopes each extension's + * logger with its manifest.id as extensionId (D6). + * + * `info("msg")` must still compile — attrs is optional (backward compat). + */ +export interface Logger { + readonly debug: (msg: string, attrs?: Attributes) => void; + readonly info: (msg: string, attrs?: Attributes) => void; + readonly warn: (msg: string, attrs?: Attributes) => void; + readonly error: (msg: string, attrs?: ErrorAttributes) => void; + /** + * Create a child logger with additional correlation context. + * Explicit values passed down (P3 — no ambient state). + */ + readonly child: (ctx: Partial<LogContext> & { readonly attrs?: Attributes }) => Logger; + /** Open a new span. Emits a `span-open` record immediately (D3). */ + readonly span: (name: string, attrs?: Attributes) => Span; +} + +/** + * Attributes for error log calls. The `err` field is `unknown` (not + * constrained to Attributes' scalar index signature) so callers can + * pass `error("msg", { err })` directly. + */ +export interface ErrorAttributes { + readonly err?: unknown; + readonly [key: string]: unknown; +} + +// --- LogRecord (discriminated union) --- + +/** + * Status of a span. "ok" is the default when no error is recorded. + */ +export type SpanStatus = "ok" | "error"; + +/** + * A link to another span, recorded at a handoff moment (D4). + */ +export interface SpanLink { + readonly spanId: string; + readonly turnId?: string; + readonly reason?: string; +} + +/** + * Flat, JSON-serializable discriminated union. Spans are emitted + * incrementally (open at span(), close at end()) so a crashed turn is + * reconstructable from the journal (D3). + * + * Every variant carries correlation keys + timestamp. An optional `body` + * field (string) holds large verbatim payloads (store-fat-serve-thin). + */ +export type LogRecord = LogLineRecord | SpanOpenRecord | SpanCloseRecord; + +/** A structured log line (debug/info/warn/error). */ +export interface LogLineRecord { + readonly kind: "log"; + readonly level: Level; + readonly msg: string; + readonly timestamp: number; + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly spanId?: string; + readonly parentSpanId?: string; + readonly attributes?: Attributes; + /** Optional large verbatim payload (store-fat, serve-thin). */ + readonly body?: string; +} + +/** Emitted when a span is opened (at `logger.span(name)`). */ +export interface SpanOpenRecord { + readonly kind: "span-open"; + readonly spanId: string; + readonly name: string; + readonly timestamp: number; + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly parentSpanId?: string; + readonly attributes?: Attributes; + readonly links?: readonly SpanLink[]; + readonly body?: string; +} + +/** Emitted when a span is closed (at `span.end()`). Carries duration + status. */ +export interface SpanCloseRecord { + readonly kind: "span-close"; + readonly spanId: string; + readonly name: string; + readonly timestamp: number; + readonly durationMs: number; + readonly status: SpanStatus; + readonly extensionId: string; + readonly conversationId?: string; + readonly turnId?: string; + readonly parentSpanId?: string; + readonly attributes?: Attributes; + readonly links?: readonly SpanLink[]; + readonly body?: string; +} + +// --- LogSink --- + +/** + * Fire-and-forget sink. The kernel calls emit(); the host-bin injects + * a concrete implementation. Kernel never lets sink errors escape (D7). + */ +export interface LogSink { + readonly emit: (record: LogRecord) => void; +} + +// --- Deterministic helpers (injected for testability) --- + +/** Clock + id generator injected into the logger factory. */ +export interface LogDeps { + readonly now: () => number; + readonly newId: () => string; +} + +// --- Pure record builder (no I/O) --- + +/** + * Internal state carried by a logger instance. Built by createLogger; + * never exposed outside this module. + */ +interface LoggerState { + readonly ctx: LogContext; + readonly attrs: Attributes | undefined; + readonly deps: LogDeps; + readonly sink: LogSink; +} + +function mergeAttributes( + base: Attributes | undefined, + extra: Attributes | undefined, +): Attributes | undefined { + if (base === undefined && extra === undefined) return undefined; + if (base === undefined) return extra; + if (extra === undefined) return base; + return { ...base, ...extra }; +} + +function isScalarAttr(value: unknown): value is string | number | boolean | null { + const t = typeof value; + return t === "string" || t === "number" || t === "boolean" || value === null; +} + +function emitLog(state: LoggerState, level: Level, msg: string, attrs?: Attributes): void { + const merged = mergeAttributes(state.attrs, attrs); + const base = { + kind: "log" as const, + level, + msg, + timestamp: state.deps.now(), + extensionId: state.ctx.extensionId, + }; + const record: LogLineRecord = + state.ctx.conversationId !== undefined || + state.ctx.turnId !== undefined || + state.ctx.spanId !== undefined || + state.ctx.parentSpanId !== undefined || + merged !== undefined + ? { + ...base, + ...(state.ctx.conversationId !== undefined + ? { conversationId: state.ctx.conversationId } + : {}), + ...(state.ctx.turnId !== undefined ? { turnId: state.ctx.turnId } : {}), + ...(state.ctx.spanId !== undefined ? { spanId: state.ctx.spanId } : {}), + ...(state.ctx.parentSpanId !== undefined ? { parentSpanId: state.ctx.parentSpanId } : {}), + ...(merged !== undefined ? { attributes: merged } : {}), + } + : base; + try { + state.sink.emit(record); + } catch { + // Swallow — D7: the turn is sovereign (never break the caller). + } +} + +function buildSpanOpen( + state: LoggerState, + name: string, + spanId: string, + attrs?: Attributes, +): SpanOpenRecord { + const base = { + kind: "span-open" as const, + spanId, + name, + timestamp: state.deps.now(), + extensionId: state.ctx.extensionId, + }; + const merged = mergeAttributes(state.attrs, attrs); + return { + ...base, + ...(state.ctx.conversationId !== undefined ? { conversationId: state.ctx.conversationId } : {}), + ...(state.ctx.turnId !== undefined ? { turnId: state.ctx.turnId } : {}), + ...(state.ctx.parentSpanId !== undefined ? { parentSpanId: state.ctx.parentSpanId } : {}), + ...(merged !== undefined ? { attributes: merged } : {}), + }; +} + +function buildSpanLink( + target: { readonly spanId: string; readonly turnId?: string }, + reason?: string, +): SpanLink { + return { + spanId: target.spanId, + ...(target.turnId !== undefined ? { turnId: target.turnId } : {}), + ...(reason !== undefined ? { reason } : {}), + }; +} + +/** + * Create a structured Logger. Pure factory — all I/O goes through the + * injected sink. `{ now, newId }` are injected for deterministic tests. + * + * @param ctx Initial correlation context (extensionId + optional ids). + * @param sink Fire-and-forget record sink. + * @param deps Clock + id generator. + * @param attrs Optional default attributes (from child()). + */ +export function createLogger( + ctx: LogContext, + sink: LogSink, + deps: LogDeps, + attrs?: Attributes, +): Logger { + const state: LoggerState = { ctx, attrs, deps, sink }; + + function makeSpan(name: string, spanAttrs?: Attributes, parentSpanId?: string): Span { + const spanId = deps.newId(); + const mergedParent = parentSpanId ?? state.ctx.spanId; + const spanCtx: LogContext = { + extensionId: ctx.extensionId, + ...(ctx.conversationId !== undefined ? { conversationId: ctx.conversationId } : {}), + ...(ctx.turnId !== undefined ? { turnId: ctx.turnId } : {}), + spanId, + ...(mergedParent !== undefined ? { parentSpanId: mergedParent } : {}), + }; + + const openRecord = buildSpanOpen(state, name, spanId, spanAttrs); + const spanAttrsMutable: Record<string, string | number | boolean | null> = + spanAttrs !== undefined ? { ...spanAttrs } : {}; + const links: SpanLink[] = []; + const openedAt = deps.now(); + + try { + sink.emit(openRecord); + } catch { + // Swallow — D7. + } + + const spanLogger = createLogger(spanCtx, sink, deps, state.attrs); + + const span: Span = { + id: spanId, + log: spanLogger, + setAttributes(newAttrs: Attributes): void { + for (const [key, value] of Object.entries(newAttrs)) { + spanAttrsMutable[key] = value; + } + }, + addLink(target, reason): void { + links.push(buildSpanLink(target, reason)); + }, + child(childName: string, childAttrs?: Attributes): Span { + return makeSpan(childName, childAttrs, spanId); + }, + end(outcome?): void { + const closedAt = deps.now(); + const err = outcome?.err; + let status: SpanStatus = "ok"; + if (err !== undefined && err !== null) { + status = "error"; + const errMsg = err instanceof Error ? err.message : String(err); + spanAttrsMutable["error.message"] = errMsg; + if (err instanceof Error && err.stack !== undefined) { + spanAttrsMutable["error.stack"] = err.stack; + } + } + if (outcome?.attrs !== undefined) { + for (const [key, value] of Object.entries(outcome.attrs)) { + spanAttrsMutable[key] = value; + } + } + + const hasAttrs = Object.keys(spanAttrsMutable).length > 0; + const hasLinks = links.length > 0; + const base = { + kind: "span-close" as const, + spanId, + name, + timestamp: closedAt, + durationMs: closedAt - openedAt, + status, + extensionId: ctx.extensionId, + }; + const closeRecord: SpanCloseRecord = { + ...base, + ...(ctx.conversationId !== undefined ? { conversationId: ctx.conversationId } : {}), + ...(ctx.turnId !== undefined ? { turnId: ctx.turnId } : {}), + ...(mergedParent !== undefined ? { parentSpanId: mergedParent } : {}), + ...(hasAttrs ? { attributes: { ...spanAttrsMutable } } : {}), + ...(hasLinks ? { links: [...links] } : {}), + }; + try { + sink.emit(closeRecord); + } catch { + // Swallow — D7. + } + }, + }; + + return span; + } + + const logger: Logger = { + debug(msg: string, attrs?: Attributes): void { + emitLog(state, "debug", msg, attrs); + }, + info(msg: string, attrs?: Attributes): void { + emitLog(state, "info", msg, attrs); + }, + warn(msg: string, attrs?: Attributes): void { + emitLog(state, "warn", msg, attrs); + }, + error(msg: string, attrs?: ErrorAttributes): void { + const err = attrs?.err; + if (err !== undefined && err !== null) { + // Extract scalar attributes (everything except err). + const scalarAttrs: Record<string, string | number | boolean | null> = {}; + if (attrs !== undefined) { + for (const [key, value] of Object.entries(attrs)) { + if (key !== "err" && isScalarAttr(value)) { + scalarAttrs[key] = value; + } + } + } + const merged = mergeAttributes( + state.attrs, + Object.keys(scalarAttrs).length > 0 ? scalarAttrs : undefined, + ); + const errMsg = err instanceof Error ? err.message : String(err); + const errorAttrs: Record<string, string | number | boolean | null> = { + ...(merged ?? {}), + "error.message": errMsg, + }; + if (err instanceof Error && err.stack !== undefined) { + errorAttrs["error.stack"] = err.stack; + } + emitLog(state, "error", msg, errorAttrs as Attributes); + } else { + // No err field — filter to scalar attributes only. + const scalarAttrs: Record<string, string | number | boolean | null> = {}; + if (attrs !== undefined) { + for (const [key, value] of Object.entries(attrs)) { + if (isScalarAttr(value)) { + scalarAttrs[key] = value; + } + } + } + emitLog( + state, + "error", + msg, + Object.keys(scalarAttrs).length > 0 ? (scalarAttrs as Attributes) : undefined, + ); + } + }, + child(childCtx: Partial<LogContext> & { readonly attrs?: Attributes }): Logger { + const convId = childCtx.conversationId ?? ctx.conversationId; + const tId = childCtx.turnId ?? ctx.turnId; + const sId = childCtx.spanId ?? ctx.spanId; + const pId = childCtx.parentSpanId ?? ctx.parentSpanId; + const newCtx: LogContext = { + extensionId: ctx.extensionId, + ...(convId !== undefined ? { conversationId: convId } : {}), + ...(tId !== undefined ? { turnId: tId } : {}), + ...(sId !== undefined ? { spanId: sId } : {}), + ...(pId !== undefined ? { parentSpanId: pId } : {}), + }; + const newAttrs = mergeAttributes(state.attrs, childCtx.attrs); + return createLogger(newCtx, sink, deps, newAttrs); + }, + span(name: string, attrs?: Attributes): Span { + return makeSpan(name, attrs); + }, + }; + + return logger; +} diff --git a/packages/kernel/src/contracts/runtime.ts b/packages/kernel/src/contracts/runtime.ts index 68c0444..1e8f14f 100644 --- a/packages/kernel/src/contracts/runtime.ts +++ b/packages/kernel/src/contracts/runtime.ts @@ -10,6 +10,7 @@ import type { ChatMessage } from "./conversation.js"; import type { ToolDispatchPolicy } from "./dispatch.js"; import type { AgentEvent } from "./events.js"; +import type { Logger } from "./logging.js"; import type { ProviderContract, ProviderStreamOptions, Usage } from "./provider.js"; import type { ToolContract } from "./tool.js"; @@ -73,6 +74,13 @@ export interface RunTurnInput { /** Cancellation signal for the entire turn. */ readonly signal?: AbortSignal; + + /** + * Optional logger for structured span instrumentation. The runtime opens + * turn/step/tool-call spans using this logger. If omitted, no spans are + * emitted (backward-compatible with callers that don't yet pass a logger). + */ + readonly logger?: Logger; } /** diff --git a/packages/kernel/src/contracts/tool.ts b/packages/kernel/src/contracts/tool.ts index f74ce77..0699d05 100644 --- a/packages/kernel/src/contracts/tool.ts +++ b/packages/kernel/src/contracts/tool.ts @@ -7,6 +7,8 @@ * Extensions may use zod internally and convert to this shape. */ +import type { Logger } from "./logging.js"; + /** * Structural JSON Schema subset for tool parameter declarations. * The kernel does not validate against this — the provider serializes it for @@ -53,6 +55,13 @@ export interface ToolExecuteContext { * can clean up rather than leak. */ readonly signal: AbortSignal; + + /** + * Pre-bound Logger scoped to this tool-call span. Tools log correlated + * without a global (P3). The kernel stamps extensionId, conversationId, + * turnId, and spanId automatically. + */ + readonly log: Logger; } /** diff --git a/packages/kernel/src/host/host.test.ts b/packages/kernel/src/host/host.test.ts index 11c2356..7688366 100644 --- a/packages/kernel/src/host/host.test.ts +++ b/packages/kernel/src/host/host.test.ts @@ -6,7 +6,6 @@ import type { EventsEmitter, Extension, HostAPI, - Logger, Manifest, ManifestContributions, PermissionDecision, @@ -17,33 +16,92 @@ import type { StorageNamespace, } from "../contracts/extension.js"; import { defineEventHook, defineService } from "../contracts/hooks.js"; +import type { + Attributes, + ErrorAttributes, + LogDeps, + Logger, + LogRecord, + LogSink, +} from "../contracts/logging.js"; import type { ProviderContract } from "../contracts/provider.js"; import type { ToolContract } from "../contracts/tool.js"; import { createHost, type HostDeps, KERNEL_API_VERSION } from "./host.js"; interface FakeLogger extends Logger { - readonly logs: Array<{ level: string; message: string; args: unknown[] }>; + readonly logs: Array<{ level: string; message: string; attrs?: Attributes | ErrorAttributes }>; } function createFakeLogger(): FakeLogger { - const logs: Array<{ level: string; message: string; args: unknown[] }> = []; + const logs: Array<{ level: string; message: string; attrs?: Attributes | ErrorAttributes }> = []; return { logs, - debug: (message: string, ...args: unknown[]) => { - logs.push({ level: "debug", message, args }); + debug: (message: string, attrs?: Attributes) => { + if (attrs !== undefined) { + logs.push({ level: "debug", message, attrs }); + } else { + logs.push({ level: "debug", message }); + } + }, + info: (message: string, attrs?: Attributes) => { + if (attrs !== undefined) { + logs.push({ level: "info", message, attrs }); + } else { + logs.push({ level: "info", message }); + } + }, + warn: (message: string, attrs?: Attributes) => { + if (attrs !== undefined) { + logs.push({ level: "warn", message, attrs }); + } else { + logs.push({ level: "warn", message }); + } }, - info: (message: string, ...args: unknown[]) => { - logs.push({ level: "info", message, args }); + error: (message: string, attrs?: ErrorAttributes) => { + if (attrs !== undefined) { + logs.push({ level: "error", message, attrs }); + } else { + logs.push({ level: "error", message }); + } }, - warn: (message: string, ...args: unknown[]) => { - logs.push({ level: "warn", message, args }); + child( + _ctx: Partial<import("../contracts/logging.js").LogContext> & { readonly attrs?: Attributes }, + ): Logger { + return createFakeLogger(); }, - error: (message: string, ...args: unknown[]) => { - logs.push({ level: "error", message, args }); + span(_name: string, _attrs?: Attributes): import("../contracts/logging.js").Span { + return { + id: "fake-span", + log: createFakeLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; }, }; } +function createFakeLogSink(): LogSink & { readonly records: LogRecord[] } { + const records: LogRecord[] = []; + return { + records, + emit: (record: LogRecord) => { + records.push(record); + }, + }; +} + +function createFakeLogDeps(): LogDeps { + let idCounter = 0; + return { + now: () => 1000 + idCounter * 100, + newId: () => `span-${++idCounter}`, + }; +} + function createFakeConfig(): ConfigAccess { return { get: () => undefined, @@ -176,12 +234,16 @@ function createFakeAuth(id: string): AuthContract { describe("createHost", () => { let logger: FakeLogger; + let logSink: ReturnType<typeof createFakeLogSink>; + let logDeps: LogDeps; let deps: HostDeps; let scheduler: ReturnType<typeof createFakeScheduler>; let events: ReturnType<typeof createFakeEvents>; beforeEach(() => { logger = createFakeLogger(); + logSink = createFakeLogSink(); + logDeps = createFakeLogDeps(); scheduler = createFakeScheduler(); events = createFakeEvents(); deps = { @@ -193,6 +255,8 @@ describe("createHost", () => { scheduler, bus: createBus(logger), events, + logSink, + logDeps, }; }); @@ -529,6 +593,7 @@ describe("createHost", () => { expect(order).toEqual(["deactivate-c", "deactivate-a"]); const errors = logger.logs.filter((l) => l.level === "error"); expect(errors.some((e) => e.message.includes("deactivate"))).toBe(true); + expect(errors.some((e) => (e.attrs as { err?: unknown })?.err instanceof Error)).toBe(true); }); }); @@ -750,4 +815,146 @@ describe("createHost", () => { ); }); }); + + describe("auto-scoped logger (D6)", () => { + it("each extension's logger stamps its own manifest.id as extensionId", async () => { + let extALogger: Logger | undefined; + let extBLogger: Logger | undefined; + + const a = createExtension("ext-a", { + activate: (host) => { + extALogger = host.logger; + }, + }); + const b = createExtension("ext-b", { + activate: (host) => { + extBLogger = host.logger; + }, + }); + + const host = createHost([a, b], deps); + await host.activate(); + + extALogger?.info("from-a"); + extBLogger?.info("from-b"); + + const logRecords = logSink.records.filter((r) => r.kind === "log"); + expect(logRecords).toHaveLength(2); + if (logRecords[0]?.kind === "log") { + expect(logRecords[0].extensionId).toBe("ext-a"); + expect(logRecords[0].msg).toBe("from-a"); + } + if (logRecords[1]?.kind === "log") { + expect(logRecords[1].extensionId).toBe("ext-b"); + expect(logRecords[1].msg).toBe("from-b"); + } + }); + + it("an extension cannot spoof extensionId — it is auto-stamped", async () => { + let extLogger: Logger | undefined; + + const ext = createExtension("real-id", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + // child() cannot override extensionId + const child = extLogger?.child({ extensionId: "spoofed" }); + child?.info("msg"); + + const logRecords = logSink.records.filter((r) => r.kind === "log"); + expect(logRecords).toHaveLength(1); + if (logRecords[0]?.kind === "log") { + expect(logRecords[0].extensionId).toBe("real-id"); + } + }); + + it("host.logger.error uses structured { err } shape", async () => { + let extLogger: Logger | undefined; + + const ext = createExtension("ext", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + extLogger?.error("something broke", { err: new Error("boom") }); + + const logRecords = logSink.records.filter((r) => r.kind === "log"); + expect(logRecords).toHaveLength(1); + if (logRecords[0]?.kind === "log") { + expect(logRecords[0].level).toBe("error"); + expect(logRecords[0].msg).toBe("something broke"); + expect(logRecords[0].attributes?.["error.message"]).toBe("boom"); + } + }); + + it("a throwing sink does NOT break the caller", async () => { + const brokenSink: LogSink = { + emit() { + throw new Error("sink down"); + }, + }; + const brokenDeps: HostDeps = { + ...deps, + logSink: brokenSink, + }; + + let extLogger: Logger | undefined; + const ext = createExtension("ext", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], brokenDeps); + await host.activate(); + + // Should not throw + expect(() => extLogger?.info("msg")).not.toThrow(); + }); + + it("span() + end() emit incremental span-open and span-close records", async () => { + let extLogger: Logger | undefined; + + const ext = createExtension("ext", { + activate: (host) => { + extLogger = host.logger; + }, + }); + + const host = createHost([ext], deps); + await host.activate(); + + const span = extLogger?.span("my-span", { key: "value" }); + span?.setAttributes({ extra: "attr" }); + span?.end({ attrs: { result: "ok" } }); + + const spanOpens = logSink.records.filter((r) => r.kind === "span-open"); + const spanCloses = logSink.records.filter((r) => r.kind === "span-close"); + + expect(spanOpens).toHaveLength(1); + expect(spanCloses).toHaveLength(1); + + if (spanOpens[0]?.kind === "span-open") { + expect(spanOpens[0].name).toBe("my-span"); + expect(spanOpens[0].extensionId).toBe("ext"); + expect(spanOpens[0].attributes?.key).toBe("value"); + } + if (spanCloses[0]?.kind === "span-close") { + expect(spanCloses[0].name).toBe("my-span"); + expect(spanCloses[0].status).toBe("ok"); + expect(spanCloses[0].durationMs).toBeGreaterThanOrEqual(0); + expect(spanCloses[0].attributes?.extra).toBe("attr"); + expect(spanCloses[0].attributes?.result).toBe("ok"); + } + }); + }); }); diff --git a/packages/kernel/src/host/host.ts b/packages/kernel/src/host/host.ts index dd61f9f..c7ec7a9 100644 --- a/packages/kernel/src/host/host.ts +++ b/packages/kernel/src/host/host.ts @@ -5,7 +5,6 @@ import type { EventsEmitter, Extension, HostAPI, - Logger, Manifest, PermissionGate, ScheduledJob, @@ -19,6 +18,8 @@ import type { FilterHandler, ServiceHandle, } from "../contracts/hooks.js"; +import type { LogDeps, Logger, LogSink } from "../contracts/logging.js"; +import { createLogger } from "../contracts/logging.js"; import type { ProviderContract } from "../contracts/provider.js"; import type { ToolContract } from "../contracts/tool.js"; import { resolveActivationOrder } from "./dag.js"; @@ -40,6 +41,8 @@ export interface HostDeps { readonly scheduler: { readonly register: (job: ScheduledJob) => void }; readonly bus: Bus; readonly events: EventsEmitter; + readonly logSink: LogSink; + readonly logDeps: LogDeps; } export interface Host { @@ -96,8 +99,12 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho } } - function buildHostAPI(opts?: { readonly registrationClosed?: boolean }): HostAPI { + function buildHostAPI( + extensionId: string, + opts?: { readonly registrationClosed?: boolean }, + ): HostAPI { const closed = opts?.registrationClosed ?? false; + const extLogger = createLogger({ extensionId }, deps.logSink, deps.logDeps); return { defineTool(tool: ToolContract) { if (closed) throw new Error("Registration not available after activation"); @@ -130,7 +137,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho secrets: deps.secrets, permissions: deps.permissions, events: deps.events, - logger: deps.logger, + logger: extLogger, getProviders() { return providers; }, @@ -156,7 +163,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho async activate() { for (const ext of compatible) { try { - await ext.activate(buildHostAPI()); + await ext.activate(buildHostAPI(ext.manifest.id)); activated.push(ext); deps.logger.info(`Extension "${ext.manifest.id}" activated`); } catch (err) { @@ -164,7 +171,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho manifest: ext.manifest, reason: `Activation failed: ${err instanceof Error ? err.message : String(err)}`, }); - deps.logger.error(`Extension "${ext.manifest.id}" failed to activate`, err); + deps.logger.error(`Extension "${ext.manifest.id}" failed to activate`, { err }); } } }, @@ -175,7 +182,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho try { await ext.deactivate(); } catch (err) { - deps.logger.error(`Extension "${ext.manifest.id}" failed to deactivate`, err); + deps.logger.error(`Extension "${ext.manifest.id}" failed to deactivate`, { err }); } } }, @@ -207,7 +214,7 @@ export function createHost(extensions: readonly Extension[], deps: HostDeps): Ho return disabled; }, getHostAPI() { - return buildHostAPI({ registrationClosed: true }); + return buildHostAPI("__host__", { registrationClosed: true }); }, }; } diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts index 626b333..1ba0849 100644 --- a/packages/kernel/src/runtime/dispatch.ts +++ b/packages/kernel/src/runtime/dispatch.ts @@ -1,4 +1,5 @@ import type { ToolDispatchPolicy } from "../contracts/dispatch.js"; +import type { Logger, Span } from "../contracts/logging.js"; import type { EventEmitter } from "../contracts/runtime.js"; import type { ToolCall, ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { toolOutputEvent } from "./events.js"; @@ -15,6 +16,7 @@ export async function executeToolCall( emit: EventEmitter, conversationId: string, turnId: string, + toolSpan?: Span, ): Promise<ToolResult> { if (tool === undefined) { return { content: `Unknown tool: ${call.name}`, isError: true }; @@ -28,6 +30,7 @@ export async function executeToolCall( onOutput: (data, stream) => { emit(toolOutputEvent(conversationId, turnId, call.id, data, stream)); }, + log: toolSpan?.log ?? createNoopLogger(), }; try { return await tool.execute(call.input, ctx); @@ -50,6 +53,7 @@ export function createStepDispatcher( emit: EventEmitter, conversationId: string, turnId: string, + toolSpans: Map<string, Span>, ): StepDispatcher { let activeCount = 0; let unsafeRunning = false; @@ -78,6 +82,7 @@ export function createStepDispatcher( } async function runAndResolve(entry: QueueEntry): Promise<void> { + const tcSpan = toolSpans.get(entry.call.id); const result = await executeToolCall( entry.call, entry.tool, @@ -85,6 +90,7 @@ export function createStepDispatcher( emit, conversationId, turnId, + tcSpan, ); activeCount--; if (entry.tool?.concurrencySafe === false) unsafeRunning = false; @@ -129,3 +135,27 @@ export function createStepDispatcher( return { submit, drain }; } + +function createNoopLogger(): Logger { + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; +} diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 696a385..48f0b5a 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -1,6 +1,8 @@ import { describe, expect, it } from "vitest"; import type { ChatMessage } from "../contracts/conversation.js"; import type { AgentEvent } from "../contracts/events.js"; +import type { LogDeps, Logger, LogRecord, LogSink } from "../contracts/logging.js"; +import { createLogger } from "../contracts/logging.js"; import type { ProviderContract, ProviderEvent } from "../contracts/provider.js"; import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; import { runTurn } from "./run-turn.js"; @@ -814,4 +816,228 @@ describe("runTurn", () => { expect(outputs[1]?.stream).toBe("stderr"); } }); + + describe("span instrumentation", () => { + function createTestLogger(): { + logger: Logger; + sink: LogSink & { records: LogRecord[] }; + deps: LogDeps; + } { + let idCounter = 0; + const deps: LogDeps = { + now: () => 1000 + idCounter * 100, + newId: () => `span-${++idCounter}`, + }; + const records: LogRecord[] = []; + const sink: LogSink & { records: LogRecord[] } = { + records, + emit: (record) => records.push(record), + }; + const logger = createLogger({ extensionId: "test" }, sink, deps); + return { logger, sink, deps }; + } + + it("emits turn + step span open/close in order", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const spanOpens = sink.records.filter((r) => r.kind === "span-open"); + const spanCloses = sink.records.filter((r) => r.kind === "span-close"); + + expect(spanOpens.length).toBeGreaterThanOrEqual(2); // turn + step + expect(spanCloses.length).toBeGreaterThanOrEqual(2); + + const turnOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "turn"); + const stepOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "step"); + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + + if (turnOpen?.kind === "span-open") { + expect(turnOpen.extensionId).toBe("test"); + expect(turnOpen.attributes?.conversationId).toBe("conv-1"); + expect(turnOpen.attributes?.turnId).toBe("turn-1"); + } + + const turnClose = spanCloses.find((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.status).toBe("ok"); + expect(turnClose.durationMs).toBeGreaterThanOrEqual(0); + } + }); + + it("emits tool-call spans for dispatched tools", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const toolCallSpans = sink.records.filter( + (r) => r.kind === "span-open" && r.name === "tool-call", + ); + expect(toolCallSpans).toHaveLength(1); + if (toolCallSpans[0]?.kind === "span-open") { + expect(toolCallSpans[0].attributes?.name).toBe("echo"); + expect(toolCallSpans[0].attributes?.toolCallId).toBe("tc1"); + } + + const toolCallCloses = sink.records.filter( + (r) => r.kind === "span-close" && r.name === "tool-call", + ); + expect(toolCallCloses).toHaveLength(1); + if (toolCallCloses[0]?.kind === "span-close") { + expect(toolCallCloses[0].status).toBe("ok"); + } + }); + + it("tools receive ctx.log (correlated logger)", async () => { + let capturedLog: Logger | undefined; + + const tool = createFakeTool("logtest", async (_input, ctx) => { + capturedLog = ctx.log; + ctx.log.info("tool ran", { key: "value" }); + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "logtest", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedLog).toBeDefined(); + + const toolLogs = sink.records.filter( + (r) => r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "tool ran", + ); + expect(toolLogs).toHaveLength(1); + if (toolLogs[0]?.kind === "log") { + expect(toolLogs[0].attributes?.key).toBe("value"); + expect(toolLogs[0].extensionId).toBe("test"); + } + }); + + it("an aborted turn still closes its turn span", async () => { + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + signal: ac.signal, + logger, + }); + + const turnCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnCloses).toHaveLength(1); + if (turnCloses[0]?.kind === "span-close") { + expect(turnCloses[0].attributes?.finishReason).toBe("aborted"); + } + }); + + it("a provider error closes the step span with error status", async () => { + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("provider exploded"); + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(result.finishReason).toBe("error"); + + const stepCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "step"); + expect(stepCloses).toHaveLength(1); + if (stepCloses[0]?.kind === "span-close") { + expect(stepCloses[0].status).toBe("error"); + expect(stepCloses[0].attributes?.["error.message"]).toContain("provider exploded"); + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 9421d86..0f42ef3 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -1,4 +1,5 @@ import type { ChatMessage, Chunk } from "../contracts/conversation.js"; +import type { Logger, Span } from "../contracts/logging.js"; import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js"; import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js"; import type { ToolCall, ToolContract } from "../contracts/tool.js"; @@ -76,6 +77,8 @@ interface StepContext { readonly signal: AbortSignal; readonly conversationId: string; readonly turnId: string; + readonly logger: Logger; + readonly toolSpans: Map<string, Span>; } interface StepResult { @@ -124,6 +127,18 @@ function processEvent( event.input, ), ); + + // Open a tool-call span (attrs: name, toolCallId) + try { + const tcSpan = ctx.logger.span("tool-call", { + name: event.toolName, + toolCallId: event.toolCallId, + }); + ctx.toolSpans.set(event.toolCallId, tcSpan); + } catch { + // Swallow — D7: logging never breaks the turn. + } + if (ctx.dispatch.eager) { dispatcher.submit(call); } @@ -151,6 +166,26 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { let stepUsage = zeroUsage(); let finishReason = "stop"; + // Open a step span with the verbatim pre-mutation prompt in its body (BEFORE capture). + let stepSpan: Span | undefined; + try { + stepSpan = ctx.logger.span("step"); + // Emit the verbatim pre-mutation prompt as a log record on the step span's logger. + // This is the "BEFORE" capture — the messages + tools as handed to provider.stream. + stepSpan.log.info("prompt:before", { + "prompt.messages": JSON.stringify(ctx.messages), + "prompt.tools": JSON.stringify( + ctx.tools.map((t) => ({ + name: t.name, + description: t.description, + parameters: t.parameters, + })), + ), + }); + } catch { + // Swallow — D7. + } + const dispatcher = createStepDispatcher( ctx.toolMap, ctx.dispatch, @@ -158,6 +193,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ctx.emit, ctx.conversationId, ctx.turnId, + ctx.toolSpans, ); try { @@ -177,6 +213,13 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { chunks.push({ type: "error", message }); ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message)); finishReason = "error"; + // Close step span with error + try { + stepSpan?.end({ err }); + } catch { + // Swallow — D7. + } + stepSpan = undefined; } if (!ctx.dispatch.eager) { @@ -187,6 +230,25 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { const results = await dispatcher.drain(); + // Close remaining tool-call spans + for (const call of toolCalls) { + const tcSpan = ctx.toolSpans.get(call.id); + if (tcSpan !== undefined) { + const result = results.get(call.id); + try { + tcSpan.end({ + attrs: { + isError: result?.isError ?? false, + contentLength: result?.content.length ?? 0, + }, + }); + } catch { + // Swallow — D7. + } + ctx.toolSpans.delete(call.id); + } + } + const toolMessages: ChatMessage[] = []; for (const call of toolCalls) { const result = results.get(call.id); @@ -217,6 +279,21 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { } } + // Close step span (if not already closed by error) + if (stepSpan !== undefined) { + try { + stepSpan.end({ + attrs: { + finishReason, + usage_inputTokens: stepUsage.inputTokens, + usage_outputTokens: stepUsage.outputTokens, + }, + }); + } catch { + // Swallow — D7. + } + } + const assistantMessage: ChatMessage | undefined = chunks.length > 0 ? { role: "assistant", chunks } : undefined; @@ -237,51 +314,122 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { const conversationId = input.conversationId; const turnId = input.turnId; const signal = input.signal ?? new AbortController().signal; + const logger = input.logger; - for (let step = 0; step < MAX_STEPS; step++) { - if (signal.aborted) { - finishReason = "aborted"; - break; + // Open a turn span (attrs: conversationId, turnId, model) + let turnSpan: Span | undefined; + if (logger !== undefined) { + try { + turnSpan = logger.span("turn", { + conversationId, + turnId, + model: input.providerOpts?.model ?? input.provider.id, + }); + } catch { + // Swallow — D7. } + } - const stepResult = await executeStep({ - provider: input.provider, - messages, - tools: input.tools, - toolMap, - dispatch: input.dispatch, - emit: input.emit, - signal, - conversationId, - turnId, - }); + // Track open tool-call spans across steps so we can close them on abort + const toolSpans = new Map<string, Span>(); + + try { + for (let step = 0; step < MAX_STEPS; step++) { + if (signal.aborted) { + finishReason = "aborted"; + break; + } - totalUsage = addUsage(totalUsage, stepResult.usage); + const stepResult = await executeStep({ + provider: input.provider, + messages, + tools: input.tools, + toolMap, + dispatch: input.dispatch, + emit: input.emit, + signal, + conversationId, + turnId, + logger: turnSpan?.log ?? logger ?? createNoopLogger(), + toolSpans, + }); - if (stepResult.assistantMessage !== undefined) { - messages.push(stepResult.assistantMessage); - resultMessages.push(stepResult.assistantMessage); - } + totalUsage = addUsage(totalUsage, stepResult.usage); - for (const msg of stepResult.toolMessages) { - messages.push(msg); - resultMessages.push(msg); - } + if (stepResult.assistantMessage !== undefined) { + messages.push(stepResult.assistantMessage); + resultMessages.push(stepResult.assistantMessage); + } - if (signal.aborted) { - finishReason = "aborted"; - break; - } + for (const msg of stepResult.toolMessages) { + messages.push(msg); + resultMessages.push(msg); + } - if (stepResult.toolCalls.length === 0) { - finishReason = stepResult.finishReason; - break; + if (signal.aborted) { + finishReason = "aborted"; + break; + } + + if (stepResult.toolCalls.length === 0) { + finishReason = stepResult.finishReason; + break; + } + + if (step === MAX_STEPS - 1) { + finishReason = "max-steps"; + } + } + } finally { + // Close any orphaned tool-call spans (e.g. abort mid-tool) + for (const [id, tcSpan] of toolSpans) { + try { + tcSpan.end({ attrs: { orphaned: true } }); + } catch { + // Swallow — D7. + } + toolSpans.delete(id); } - if (step === MAX_STEPS - 1) { - finishReason = "max-steps"; + // Close the turn span + if (turnSpan !== undefined) { + try { + turnSpan.end({ + attrs: { + finishReason, + usage_inputTokens: totalUsage.inputTokens, + usage_outputTokens: totalUsage.outputTokens, + }, + }); + } catch { + // Swallow — D7. + } } } return { messages: resultMessages, usage: totalUsage, finishReason }; } + +function createNoopLogger(): Logger { + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; +} diff --git a/packages/session-orchestrator/src/extension.ts b/packages/session-orchestrator/src/extension.ts index 8cd4c44..bfbc7ca 100644 --- a/packages/session-orchestrator/src/extension.ts +++ b/packages/session-orchestrator/src/extension.ts @@ -29,6 +29,7 @@ export function activate(host: HostAPI): void { resolveProvider: () => selectFirstProvider(host.getProviders()), resolveTools: () => [...host.getTools().values()], runTurn, + logger: host.logger, }); host.provideService(sessionOrchestratorHandle, orchestrator); diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 302404e..37fc512 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -2,6 +2,7 @@ import type { ConversationStore } from "@dispatch/conversation-store"; import type { AgentEvent, ChatMessage, + Logger, ProviderContract, RunTurnInput, RunTurnResult, @@ -30,6 +31,8 @@ export interface SessionOrchestratorDeps { readonly resolveTools: () => readonly ToolContract[]; readonly resolveDispatch?: () => ToolDispatchPolicy; readonly runTurn: (input: RunTurnInput) => Promise<RunTurnResult>; + /** Base logger (auto-scoped to this extension); childed per turn for span capture. */ + readonly logger?: Logger; } export function createSessionOrchestrator(deps: SessionOrchestratorDeps): SessionOrchestrator { @@ -41,6 +44,7 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio const tools = deps.resolveTools(); const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy(); const turnId = generateTurnId(); + const turnLogger = deps.logger?.child({ conversationId, turnId }); const result = await deps.runTurn({ provider, @@ -50,6 +54,7 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio emit: onEvent, conversationId, turnId, + ...(turnLogger !== undefined ? { logger: turnLogger } : {}), ...(signal !== undefined ? { signal } : {}), }); diff --git a/packages/tool-read-file/src/read-file.test.ts b/packages/tool-read-file/src/read-file.test.ts index 0745b0b..f995b09 100644 --- a/packages/tool-read-file/src/read-file.test.ts +++ b/packages/tool-read-file/src/read-file.test.ts @@ -1,7 +1,7 @@ import { mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import type { ToolExecuteContext } from "@dispatch/kernel"; +import { createLogger, type ToolExecuteContext } from "@dispatch/kernel"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { createReadFileTool, @@ -16,6 +16,11 @@ function stubCtx(): ToolExecuteContext { toolCallId: "test-call-1", onOutput: () => {}, signal: AbortSignal.timeout(5000), + log: createLogger( + { extensionId: "test" }, + { emit: () => {} }, + { now: () => 0, newId: () => "id" }, + ), }; } @@ -154,3 +154,39 @@ extension (read_file), hygiene CRs (getHostAPI + manifest honesty), and the tabId→conversationId vocab rename. typecheck + biome clean; 218 tests green. Remaining work is the parked design decisions (persistent waking agents, etc.) — non-blocking. See HANDOFF.md "Open design decisions still parked". + +--- + +## Observability — Phase A (logging substrate) ✅ DONE + verified live +Goal: structured logs + spans captured durably to a journal file — the substrate +for the agent-first observability subsystem (design: notes/observability-design.md). + +- [x] **Unit 1 — kernel-logging** (mimo-v2.5-pro): Logger/Span ABI + (`contracts/logging.ts`) — leveled/attributed/auto-scoped (host stamps + `extensionId`), incremental span records (open/close, crash-reconstructable, D3), + injected `LogSink` (pure record-builder). `ctx.log` on ToolContract; runTurn + opens turn/step/tool-call spans + the verbatim **"before"** prompt on the step span. +- [x] **Unit 2 — journal-sink** (`packages/journal-sink/`, mimo-v2.5-pro): bootstrap + `LogSink` → NDJSON append-only journal (pure `serialize` + thin fs edge, rotation, + fail-safe drop — never blocks a turn). NOT an extension (HostDeps bootstrap dep). +- [x] **Orchestrator fan-out + wiring** (direct): bus error-attrs `{ err }`, + FakeLogger/`ctx.log` test conformance; host-bin injects `logSink`+`logDeps` + (journal at `.dispatch/journal/`); session-orchestrator threads `host.logger` + (childed per turn) into runTurn. + +**Result:** typecheck clean, **250 tests** (218 → +22 journal-sink, +10 kernel), +biome clean. **Live boot verified:** a turn's journal contains host logs + turn/step +spans (open+close) + the `prompt:before` record carrying the verbatim messages array +— the pre-mutation prompt is fully reconstructable. 2-process model: app + in-process +sink → journal file; the collector (process 2) is Phase B. Redaction is +per-extension self-redaction (no shared helper — isolation over DRY). + +### Next (observability) +- **"AFTER" capture** — `provider.request` verbatim post-transform in + provider-openai-compat → full round-trip rebuild + before↔after diff (§10). +- Minor refinement: move the large `prompt:before` payload from `attributes` into the + record `body` field (store-fat-serve-thin) — currently a stringified attribute. +- Phase B: out-of-process collector → SQLite store + query (§11). + +Summons: prompts/phase-a-{kernel-logging,journal-sink}.md; +reports/phase-a-{kernel-logging,journal-sink}.md. diff --git a/tsconfig.json b/tsconfig.json index 505d883..0f5f80f 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,6 +9,7 @@ { "path": "./packages/session-orchestrator" }, { "path": "./packages/transport-http" }, { "path": "./packages/tool-read-file" }, + { "path": "./packages/journal-sink" }, { "path": "./packages/host-bin" } ] } |
