summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-05 14:06:39 +0900
committerAdam Malczewski <[email protected]>2026-06-05 14:06:39 +0900
commit9ae09aad5d8d6232c55932af0d496b888166065f (patch)
treee62b59314ba065a531f9924151742c5adc7426ef
parent6733bbd47e6df8681fcf4b6815f82bd6b6922334 (diff)
downloaddispatch-9ae09aad5d8d6232c55932af0d496b888166065f.tar.gz
dispatch-9ae09aad5d8d6232c55932af0d496b888166065f.zip
feat(observability): Phase A.2 — verbatim provider.request "after" capture + self-redaction (267 tests)
Threads the step span's correlated logger into provider.stream (new optional ProviderStreamOptions.logger) so provider-openai-compat opens a child provider.request span at the fetch edge, capturing the verbatim post-transform request + response status/cache-tokens/raw-error. Auth header self-redacted in the provider's OWN code (graduated mask tiers; no shared helper). Capture is fail-safe (never throws into the turn). Adds the first hermetic provider HTTP test (stream.test.ts: fetch mocked, 15 cases). Large payloads use attributes for now; the LogRecord.body channel is a deferred ABI design (notes §10). Verified: tsc -b clean, 267 tests (250->+17), biome 0 warnings/0 infos. Live boot: provider.request shares turnId with prompt:before (before<->after diffable); auth-key leak count = 0 (self-redaction proven on a real request).
-rw-r--r--notes/observability-design.md32
-rw-r--r--packages/kernel/src/contracts/provider.ts9
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts103
-rw-r--r--packages/kernel/src/runtime/run-turn.ts7
-rw-r--r--packages/provider-openai-compat/src/stream.test.ts511
-rw-r--r--packages/provider-openai-compat/src/stream.ts160
-rw-r--r--tasks.md25
7 files changed, 812 insertions, 35 deletions
diff --git a/notes/observability-design.md b/notes/observability-design.md
index 6c09099..5e9c3a7 100644
--- a/notes/observability-design.md
+++ b/notes/observability-design.md
@@ -583,12 +583,32 @@ finish) → Unit 2 → host-bin wiring**. Optional overlap: start Unit 2 once Un
**Verify:** `typecheck`/`test`/`check` clean; live boot → `host.logger` lines land in
the journal file.
-### Next step (after Phase A) — the "AFTER" capture
-`provider.request` verbatim post-transform (D5) inside `provider-openai-compat`: the
-exact serialized request bytes + raw response/error + cache tokens, auth header
-self-redacted. This **completes full round-trip rebuild** and unlocks the
-**before↔after diff** (kernel "before" vs wire "after" → pinpoints transform bugs).
-Depends only on the Phase A Logger ABI; lives entirely in the provider extension.
+### Phase A.2 — the "AFTER" capture (build plan)
+`provider.request` verbatim post-transform (D5) inside `provider-openai-compat`:
+exact serialized request + response status/cache-tokens/raw-error, auth self-redacted.
+Completes full round-trip rebuild + the **before↔after diff**.
+- **Contract (DONE, orchestrator):** `ProviderStreamOptions.logger?: Logger`
+ (`contracts/provider.ts`) — threads the step's correlated logger into `stream()` so
+ the `provider.request` span is a child of the step span (before↔after share
+ turnId/parentSpanId). Optional = non-breaking.
+- **Unit K — kernel run-turn** (owner: kernel): pass the step span's logger into
+ `provider.stream(msgs, tools, { ...opts, logger })`. One file (`runtime/run-turn.ts`).
+- **Unit P — provider-openai-compat** (owner): at the fetch edge, if `opts.logger`,
+ open a `provider.request` child span; capture the verbatim post-transform request
+ (URL, model, params, serialized body) + `cache_control` presence; on response,
+ status + cache-read/creation tokens (Usage) + (on error) raw error; **self-redact
+ the auth header in its own code** (graduated tiers, §6). First hermetic provider
+ HTTP test (`stream.test.ts`, mock `fetch` + real-capture fixtures).
+- **Order:** contract frozen (done) → Unit K ∥ Unit P (disjoint: kernel vs provider).
+
+**DEFERRED — body-channel ABI (design decision, surface to user):** `LogRecord` has a
+`body` field but the Logger/Span API exposes no way to set it — that's why
+`prompt:before` (and now the request/response) use stringified `attributes`. Storing
+large verbatim payloads in `body` (store-fat-serve-thin; both before & after) needs a
+small ABI addition (e.g. `Span.setBody(body)`), worth doing before Phase B's query
+layer. Until then captures use attributes — functional + reconstructable, just not
+ideal for D9 `GROUP BY`.
+
*(Full per-extension prompt-segment provenance — D8 — comes later, with the
context-filter chain.)*
diff --git a/packages/kernel/src/contracts/provider.ts b/packages/kernel/src/contracts/provider.ts
index 1d0fd75..62dc8e9 100644
--- a/packages/kernel/src/contracts/provider.ts
+++ b/packages/kernel/src/contracts/provider.ts
@@ -7,6 +7,7 @@
*/
import type { ChatMessage } from "./conversation.js";
+import type { Logger } from "./logging.js";
import type { ToolContract } from "./tool.js";
/**
@@ -92,6 +93,14 @@ export interface ProviderStreamOptions {
readonly maxTokens?: number;
/** System prompt to prepend. */
readonly systemPrompt?: string;
+ /**
+ * Correlated logger for this turn's step (Phase A logging ABI). When present,
+ * the provider should open a child `provider.request` span and capture the
+ * verbatim post-transform request + raw response/error there, self-redacting
+ * secrets in its own code. Optional so non-instrumented callers/tests still
+ * compile (the provider falls back to no capture).
+ */
+ readonly logger?: Logger;
}
/**
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 48f0b5a..667476f 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -817,26 +817,26 @@ describe("runTurn", () => {
}
});
- describe("span instrumentation", () => {
- function createTestLogger(): {
- logger: Logger;
- sink: LogSink & { records: LogRecord[] };
- deps: LogDeps;
- } {
- let idCounter = 0;
- const deps: LogDeps = {
- now: () => 1000 + idCounter * 100,
- newId: () => `span-${++idCounter}`,
- };
- const records: LogRecord[] = [];
- const sink: LogSink & { records: LogRecord[] } = {
- records,
- emit: (record) => records.push(record),
- };
- const logger = createLogger({ extensionId: "test" }, sink, deps);
- return { logger, sink, deps };
- }
+ function createTestLogger(): {
+ logger: Logger;
+ sink: LogSink & { records: LogRecord[] };
+ deps: LogDeps;
+ } {
+ let idCounter = 0;
+ const deps: LogDeps = {
+ now: () => 1000 + idCounter * 100,
+ newId: () => `span-${++idCounter}`,
+ };
+ const records: LogRecord[] = [];
+ const sink: LogSink & { records: LogRecord[] } = {
+ records,
+ emit: (record) => records.push(record),
+ };
+ const logger = createLogger({ extensionId: "test" }, sink, deps);
+ return { logger, sink, deps };
+ }
+ describe("span instrumentation", () => {
it("emits turn + step span open/close in order", async () => {
const provider = createFakeProvider([
[
@@ -1040,4 +1040,69 @@ describe("runTurn", () => {
}
});
});
+
+ describe("provider logger threading", () => {
+ it("passes step span logger to provider.stream opts when logger provided", async () => {
+ let capturedOpts: Record<string, unknown> | undefined;
+
+ const provider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools, opts) {
+ capturedOpts = opts !== undefined ? { ...opts } : undefined;
+ return (async function* () {
+ yield { type: "text-delta", delta: "hi" } as ProviderEvent;
+ yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const { logger } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ expect(capturedOpts).toBeDefined();
+ expect(capturedOpts?.logger).toBeDefined();
+ expect(typeof (capturedOpts?.logger as Record<string, unknown>).info).toBe("function");
+ expect(typeof (capturedOpts?.logger as Record<string, unknown>).span).toBe("function");
+ });
+
+ it("passes undefined for opts.logger when no logger provided", async () => {
+ let capturedOpts: Record<string, unknown> | undefined;
+
+ const provider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools, opts) {
+ capturedOpts = opts !== undefined ? { ...opts } : undefined;
+ return (async function* () {
+ yield { type: "text-delta", delta: "hi" } as ProviderEvent;
+ yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ });
+
+ expect(capturedOpts).toBeDefined();
+ expect(capturedOpts?.logger).toBeUndefined();
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 0f42ef3..a78c31d 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -78,6 +78,7 @@ interface StepContext {
readonly conversationId: string;
readonly turnId: string;
readonly logger: Logger;
+ readonly stepLogger: Logger | undefined;
readonly toolSpans: Map<string, Span>;
}
@@ -197,7 +198,10 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
);
try {
- const stream = ctx.provider.stream(ctx.messages, ctx.tools);
+ const opts = {
+ ...(ctx.stepLogger !== undefined ? { logger: ctx.stepLogger } : {}),
+ };
+ const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
for await (const event of stream) {
if (ctx.signal.aborted) break;
processEvent(event, chunks, toolCalls, dispatcher, ctx);
@@ -351,6 +355,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
conversationId,
turnId,
logger: turnSpan?.log ?? logger ?? createNoopLogger(),
+ stepLogger: logger,
toolSpans,
});
diff --git a/packages/provider-openai-compat/src/stream.test.ts b/packages/provider-openai-compat/src/stream.test.ts
new file mode 100644
index 0000000..0b8e643
--- /dev/null
+++ b/packages/provider-openai-compat/src/stream.test.ts
@@ -0,0 +1,511 @@
+import type { ChatMessage, Logger, ProviderEvent, Span } from "@dispatch/kernel";
+import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
+import { type StreamConfig, streamChat } from "./stream.js";
+
+async function collectEvents(iter: AsyncIterable<ProviderEvent>): Promise<ProviderEvent[]> {
+ const events: ProviderEvent[] = [];
+ for await (const event of iter) {
+ events.push(event);
+ }
+ return events;
+}
+
+function assertDefined<T>(v: T, msg?: string): asserts v is NonNullable<T> {
+ if (v === undefined || v === null) {
+ throw new Error(msg ?? "expected defined");
+ }
+}
+
+interface CapturedSpan {
+ name: string;
+ attrs: Record<string, string | number | boolean | null>;
+ endOutcome?:
+ | { err?: unknown; attrs?: Record<string, string | number | boolean | null> }
+ | undefined;
+}
+
+function createFakeLogger(): { logger: Logger; spans: CapturedSpan[] } {
+ const spans: CapturedSpan[] = [];
+ let spanAttrBuffer: Record<string, string | number | boolean | null> = {};
+
+ const fakeSpan: Span = {
+ id: "fake-span-id",
+ log: {} as Logger,
+ setAttributes(attrs) {
+ Object.assign(spanAttrBuffer, attrs);
+ },
+ addLink() {},
+ child() {
+ return fakeSpan;
+ },
+ end(outcome?) {
+ spans.push({
+ name: "provider.request",
+ attrs: { ...spanAttrBuffer },
+ endOutcome: outcome as CapturedSpan["endOutcome"],
+ });
+ },
+ };
+
+ const logger: Logger = {
+ debug() {},
+ info() {},
+ warn() {},
+ error() {},
+ child() {
+ return logger;
+ },
+ span(_name, attrs) {
+ spanAttrBuffer = attrs ? { ...attrs } : {};
+ return fakeSpan;
+ },
+ };
+
+ return { logger, spans };
+}
+
+function makeConfig(apiKey = "sk-test-1234567890abcdef"): StreamConfig {
+ return {
+ baseURL: "https://api.example.com/v1",
+ apiKey,
+ model: "test-model",
+ };
+}
+
+function mockFetch(handler: (url: string | URL | Request, init?: RequestInit) => unknown): void {
+ globalThis.fetch = vi.fn(handler) as unknown as typeof globalThis.fetch;
+}
+
+function makeMessages(): readonly ChatMessage[] {
+ return [
+ {
+ role: "user",
+ chunks: [{ type: "text", text: "Hello" }],
+ },
+ ];
+}
+
+function sseBody(...lines: string[]): ReadableStream<Uint8Array> {
+ const encoder = new TextEncoder();
+ const chunks = lines.map((l) => encoder.encode(`${l}\n`));
+ let index = 0;
+ return new ReadableStream<Uint8Array>({
+ pull(controller) {
+ if (index < chunks.length) {
+ const chunk = chunks[index];
+ assertDefined(chunk);
+ controller.enqueue(chunk);
+ index++;
+ } else {
+ controller.close();
+ }
+ },
+ });
+}
+
+describe("streamChat — provider.request AFTER capture", () => {
+ let originalFetch: typeof globalThis.fetch;
+
+ beforeEach(() => {
+ originalFetch = globalThis.fetch;
+ });
+
+ afterEach(() => {
+ globalThis.fetch = originalFetch;
+ });
+
+ it("opens a provider.request span with verbatim request body", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-1","choices":[{"delta":{"content":"Hi"},"index":0}]}',
+ 'data: {"id":"cmpl-1","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ const events = await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ expect(events.some((e) => e.type === "text-delta")).toBe(true);
+ expect(spans).toHaveLength(1);
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.name).toBe("provider.request");
+ expect(span.attrs["request.method"]).toBe("POST");
+
+ const capturedBody = JSON.parse(span.attrs["request.body"] as string);
+ expect(capturedBody.model).toBe("test-model");
+ expect(capturedBody.stream).toBe(true);
+ expect(capturedBody.messages).toEqual([{ role: "user", content: "Hello" }]);
+
+ expect(span.endOutcome?.attrs?.status).toBe(200);
+ });
+
+ it("redacts a long API key (≥13 chars → reveal 3 each side)", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig("sk-abcdefghijkmnop");
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-2","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-2","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ const authHeader = span.attrs["request.headers.authorization"] as string;
+ expect(authHeader).toBe("Bearer sk-…redacted…nop");
+ expect(authHeader).not.toContain("abcdefghijkm");
+ });
+
+ it("redacts a medium API key (8–10 chars → reveal 1 each side)", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig("sk-abcde");
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-3","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-3","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ const authHeader = span.attrs["request.headers.authorization"] as string;
+ expect(authHeader).toBe("Bearer s…redacted…e");
+ });
+
+ it("redacts a short API key (≤7 chars → full mask)", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig("secret!");
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-4","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-4","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ const authHeader = span.attrs["request.headers.authorization"] as string;
+ expect(authHeader).toBe("Bearer …redacted…");
+ });
+
+ it("captures cache tokens from the response", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-5","choices":[{"delta":{"content":"Hi"},"index":0}]}',
+ 'data: {"id":"cmpl-5","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ 'data: {"id":"cmpl-5","usage":{"prompt_tokens":100,"completion_tokens":20,"cache_read_tokens":80,"cache_write_tokens":10}}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.endOutcome?.attrs?.["usage.inputTokens"]).toBe(100);
+ expect(span.endOutcome?.attrs?.["usage.outputTokens"]).toBe(20);
+ expect(span.endOutcome?.attrs?.["usage.cacheReadTokens"]).toBe(80);
+ expect(span.endOutcome?.attrs?.["usage.cacheWriteTokens"]).toBe(10);
+ });
+
+ it("captures cache_read_tokens alone", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-6","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-6","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ 'data: {"id":"cmpl-6","usage":{"prompt_tokens":50,"completion_tokens":5,"cache_read_tokens":45}}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.endOutcome?.attrs?.["usage.cacheReadTokens"]).toBe(45);
+ expect(span.endOutcome?.attrs?.["usage.cacheWriteTokens"]).toBeUndefined();
+ });
+
+ it("records HTTP error status and error body without throwing", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response("Invalid request body", {
+ status: 400,
+ headers: { "Content-Type": "text/plain" },
+ }),
+ );
+
+ const events = await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ expect(events).toHaveLength(1);
+ expect(events[0]).toEqual({
+ type: "error",
+ message: "HTTP 400: Invalid request body",
+ code: "400",
+ retryable: false,
+ });
+
+ expect(spans).toHaveLength(1);
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.endOutcome?.attrs?.status).toBe(400);
+ expect(span.endOutcome?.attrs?.["response.error_body"]).toBe("Invalid request body");
+ expect(span.endOutcome?.err).toBeInstanceOf(Error);
+ });
+
+ it("records network error without throwing", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(() => {
+ throw new Error("connection refused");
+ });
+
+ const events = await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ expect(events).toHaveLength(1);
+ expect(events[0]).toEqual({
+ type: "error",
+ message: "connection refused",
+ retryable: true,
+ });
+
+ expect(spans).toHaveLength(1);
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.endOutcome?.err).toBeInstanceOf(Error);
+ expect((span.endOutcome?.err as Error).message).toBe("connection refused");
+ });
+
+ it("detects cache_control breakpoint absence in a normal request body", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-7","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-7","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.attrs["request.cache_control_present"]).toBe(false);
+ });
+
+ it("does not open a span when opts.logger is absent", async () => {
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-8","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-8","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ const events = await collectEvents(streamChat(config, makeMessages(), []));
+
+ expect(events.some((e) => e.type === "text-delta")).toBe(true);
+ });
+
+ it("fail-safe: logger throwing does not break stream()", async () => {
+ const brokenLogger: Logger = {
+ debug() {},
+ info() {},
+ warn() {},
+ error() {},
+ child() {
+ return brokenLogger;
+ },
+ span() {
+ throw new Error("logger exploded");
+ },
+ };
+
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-9","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-9","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ const events = await collectEvents(
+ streamChat(config, makeMessages(), [], { logger: brokenLogger }),
+ );
+
+ expect(events.some((e) => e.type === "text-delta")).toBe(true);
+ expect(events.some((e) => e.type === "finish")).toBe(true);
+ });
+
+ it("redacts an 11-char API key (reveal 2 each side)", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig("sk-abcde1234");
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-10","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-10","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ const authHeader = span.attrs["request.headers.authorization"] as string;
+ expect(authHeader).toBe("Bearer sk…redacted…34");
+ });
+
+ it("records server error (500) as retryable", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response("Internal Server Error", {
+ status: 500,
+ headers: { "Content-Type": "text/plain" },
+ }),
+ );
+
+ const events = await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ expect(events).toHaveLength(1);
+ expect(events[0]).toEqual({
+ type: "error",
+ message: "HTTP 500: Internal Server Error",
+ code: "500",
+ retryable: true,
+ });
+
+ expect(spans).toHaveLength(1);
+ assertDefined(spans[0]);
+ expect(spans[0].endOutcome?.attrs?.status).toBe(500);
+ });
+
+ it("captures model and url on the span", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-11","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-11","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(streamChat(config, makeMessages(), [], { logger }));
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.attrs.model).toBe("test-model");
+ expect(span.attrs.url).toBe("https://api.example.com/v1/chat/completions");
+ });
+
+ it("uses opts.model override in capture", async () => {
+ const { logger, spans } = createFakeLogger();
+ const config = makeConfig();
+
+ mockFetch(
+ () =>
+ new Response(
+ sseBody(
+ 'data: {"id":"cmpl-12","choices":[{"delta":{"content":"ok"},"index":0}]}',
+ 'data: {"id":"cmpl-12","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ "data: [DONE]",
+ ),
+ { status: 200, headers: { "Content-Type": "text/event-stream" } },
+ ),
+ );
+
+ await collectEvents(
+ streamChat(config, makeMessages(), [], { logger, model: "override-model" }),
+ );
+
+ assertDefined(spans[0]);
+ const span = spans[0];
+ expect(span.attrs.model).toBe("override-model");
+
+ const capturedBody = JSON.parse(span.attrs["request.body"] as string);
+ expect(capturedBody.model).toBe("override-model");
+ });
+});
diff --git a/packages/provider-openai-compat/src/stream.ts b/packages/provider-openai-compat/src/stream.ts
index 7021120..9e27a89 100644
--- a/packages/provider-openai-compat/src/stream.ts
+++ b/packages/provider-openai-compat/src/stream.ts
@@ -2,6 +2,7 @@ import type {
ChatMessage,
ProviderEvent,
ProviderStreamOptions,
+ Span,
ToolContract,
} from "@dispatch/kernel";
import { convertMessages, type OpenAIMessage } from "./convert-messages.js";
@@ -13,6 +14,24 @@ export interface StreamConfig {
readonly model: string;
}
+/**
+ * Graduated secret mask — §6 tiers. Reimplemented locally (isolation-over-dry).
+ * ≥13 → reveal 3 each side · 11–12 → 2 · 8–10 → 1 · ≤7 → full mask.
+ */
+function maskSecret(value: string): string {
+ const len = value.length;
+ if (len <= 7) return "…redacted…";
+ let reveal: number;
+ if (len >= 13) {
+ reveal = 3;
+ } else if (len >= 11) {
+ reveal = 2;
+ } else {
+ reveal = 1;
+ }
+ return `${value.slice(0, reveal)}…redacted…${value.slice(-reveal)}`;
+}
+
export async function* streamChat(
config: StreamConfig,
messages: readonly ChatMessage[],
@@ -43,17 +62,56 @@ export async function* streamChat(
body.max_tokens = opts.maxTokens;
}
+ const url = `${config.baseURL}/chat/completions`;
+ const bodyString = JSON.stringify(body);
+
+ let reqSpan: Span | undefined;
+ let totalInputTokens = 0;
+ let totalOutputTokens = 0;
+ let totalCacheReadTokens: number | undefined;
+ let totalCacheWriteTokens: number | undefined;
+
+ if (opts?.logger) {
+ try {
+ const model = opts?.model ?? config.model;
+ reqSpan = opts.logger.span("provider.request", {
+ model,
+ url,
+ });
+ const hasCacheBreakpoint = bodyString.includes("cache_control");
+ reqSpan.setAttributes({
+ "request.method": "POST",
+ "request.body": bodyString,
+ "request.cache_control_present": hasCacheBreakpoint,
+ "request.headers.content_type": "application/json",
+ "request.headers.authorization": `Bearer ${maskSecret(config.apiKey)}`,
+ });
+ } catch {
+ // Fail-safe: capture must never break stream().
+ }
+ }
+
let response: Response;
try {
- response = await fetch(`${config.baseURL}/chat/completions`, {
+ response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${config.apiKey}`,
},
- body: JSON.stringify(body),
+ body: bodyString,
});
} catch (err) {
+ if (reqSpan) {
+ try {
+ reqSpan.end({
+ err,
+ attrs: { status: 0 },
+ });
+ } catch {
+ // Fail-safe.
+ }
+ }
yield {
type: "error",
message: err instanceof Error ? err.message : String(err),
@@ -64,6 +122,20 @@ export async function* streamChat(
if (!response.ok) {
const text = await response.text().catch(() => "unknown");
+ if (reqSpan) {
+ try {
+ reqSpan.setAttributes({ status: response.status });
+ reqSpan.end({
+ err: new Error(`HTTP ${response.status}: ${text}`),
+ attrs: {
+ status: response.status,
+ "response.error_body": text,
+ },
+ });
+ } catch {
+ // Fail-safe.
+ }
+ }
yield {
type: "error",
message: `HTTP ${response.status}: ${text}`,
@@ -74,14 +146,70 @@ export async function* streamChat(
}
if (!response.body) {
+ if (reqSpan) {
+ try {
+ reqSpan.end({
+ err: new Error("Response body is null"),
+ attrs: { status: response.status },
+ });
+ } catch {
+ // Fail-safe.
+ }
+ }
yield { type: "error", message: "Response body is null" };
return;
}
- yield* readSSEStream(response.body);
+ try {
+ yield* readSSEStream(response.body, (usage) => {
+ totalInputTokens = usage.inputTokens;
+ totalOutputTokens = usage.outputTokens;
+ totalCacheReadTokens = usage.cacheReadTokens;
+ totalCacheWriteTokens = usage.cacheWriteTokens;
+ });
+ } catch (err) {
+ if (reqSpan) {
+ try {
+ reqSpan.end({
+ err,
+ attrs: { status: response.status },
+ });
+ } catch {
+ // Fail-safe.
+ }
+ }
+ throw err;
+ }
+
+ if (reqSpan) {
+ try {
+ const attrs: Record<string, string | number | boolean | null> = {
+ status: response.status,
+ "usage.inputTokens": totalInputTokens,
+ "usage.outputTokens": totalOutputTokens,
+ };
+ if (totalCacheReadTokens !== undefined) {
+ attrs["usage.cacheReadTokens"] = totalCacheReadTokens;
+ }
+ if (totalCacheWriteTokens !== undefined) {
+ attrs["usage.cacheWriteTokens"] = totalCacheWriteTokens;
+ }
+ reqSpan.end({ attrs });
+ } catch {
+ // Fail-safe.
+ }
+ }
}
-async function* readSSEStream(body: ReadableStream<Uint8Array>): AsyncIterable<ProviderEvent> {
+async function* readSSEStream(
+ body: ReadableStream<Uint8Array>,
+ onUsage?: (usage: {
+ inputTokens: number;
+ outputTokens: number;
+ cacheReadTokens?: number;
+ cacheWriteTokens?: number;
+ }) => void,
+): AsyncIterable<ProviderEvent> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
@@ -182,15 +310,39 @@ async function* readSSEStream(body: ReadableStream<Uint8Array>): AsyncIterable<P
| {
prompt_tokens?: number;
completion_tokens?: number;
+ cache_read_tokens?: number;
+ cache_write_tokens?: number;
}
| undefined;
if (usage) {
+ const cacheRead =
+ usage.cache_read_tokens !== undefined ? usage.cache_read_tokens : undefined;
+ const cacheWrite =
+ usage.cache_write_tokens !== undefined ? usage.cache_write_tokens : undefined;
+ const usageObj: {
+ inputTokens: number;
+ outputTokens: number;
+ cacheReadTokens?: number;
+ cacheWriteTokens?: number;
+ } = {
+ inputTokens: usage.prompt_tokens ?? 0,
+ outputTokens: usage.completion_tokens ?? 0,
+ };
+ if (cacheRead !== undefined) {
+ usageObj.cacheReadTokens = cacheRead;
+ }
+ if (cacheWrite !== undefined) {
+ usageObj.cacheWriteTokens = cacheWrite;
+ }
+ onUsage?.(usageObj);
yield {
type: "usage",
usage: {
inputTokens: usage.prompt_tokens ?? 0,
outputTokens: usage.completion_tokens ?? 0,
+ ...(cacheRead !== undefined ? { cacheReadTokens: cacheRead } : {}),
+ ...(cacheWrite !== undefined ? { cacheWriteTokens: cacheWrite } : {}),
},
};
}
diff --git a/tasks.md b/tasks.md
index 8da0bcc..6af229e 100644
--- a/tasks.md
+++ b/tasks.md
@@ -181,12 +181,27 @@ spans (open+close) + the `prompt:before` record carrying the verbatim messages a
sink → journal file; the collector (process 2) is Phase B. Redaction is
per-extension self-redaction (no shared helper — isolation over DRY).
+### Phase A.2 — AFTER capture ✅ DONE + verified live
+- [x] **Contract** (orchestrator): `ProviderStreamOptions.logger?` threads the step's
+ correlated logger into `stream()` (optional, non-breaking).
+- [x] **Unit K — kernel run-turn**: passes the step span's logger into `provider.stream`.
+- [x] **Unit P — provider-openai-compat**: `provider.request` span capturing the
+ verbatim post-transform request + status/cache-tokens/raw-error; **auth self-redacted
+ in its own code** (graduated tiers, no shared helper); fail-safe; **15 hermetic
+ fetch-mocked tests** (first provider HTTP coverage).
+- typecheck clean, **267 tests** (250→+17), biome fully clean (0 warnings / 0 infos).
+ **Live:** `provider.request` shares the turn's `turnId` with `prompt:before`
+ (before↔after diffable); **auth-key leak count = 0** (self-redaction verified live).
+ Summons: prompts/phase-a2-{kernel-runturn,provider-after-capture}.md (+ 2 test cleanups).
+
### Next (observability)
-- **"AFTER" capture** — `provider.request` verbatim post-transform in
- provider-openai-compat → full round-trip rebuild + before↔after diff (§10).
-- Minor refinement: move the large `prompt:before` payload from `attributes` into the
- record `body` field (store-fat-serve-thin) — currently a stringified attribute.
-- Phase B: out-of-process collector → SQLite store + query (§11).
+- **Body-channel ABI (design — surface to user):** add a way to set `LogRecord.body`
+ (e.g. `Span.setBody`) so large verbatim payloads (prompt:before + provider request)
+ use `body` not stringified `attributes` (store-fat-serve-thin; before Phase B query).
+- **Phase B:** out-of-process collector → SQLite store + query (§11).
+- **Record/replay test fixtures** (goal): turn captured verbatim provider.request/
+ response traces into hermetic `stream.test.ts` fixtures (mock `fetch`, replay real
+ flash) for regression + deterministic repro. D5; §7. Complements contract-fakes.
Summons: prompts/phase-a-{kernel-logging,journal-sink}.md;
reports/phase-a-{kernel-logging,journal-sink}.md.