diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 14:06:39 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 14:06:39 +0900 |
| commit | 9ae09aad5d8d6232c55932af0d496b888166065f (patch) | |
| tree | e62b59314ba065a531f9924151742c5adc7426ef | |
| parent | 6733bbd47e6df8681fcf4b6815f82bd6b6922334 (diff) | |
| download | dispatch-9ae09aad5d8d6232c55932af0d496b888166065f.tar.gz dispatch-9ae09aad5d8d6232c55932af0d496b888166065f.zip | |
feat(observability): Phase A.2 — verbatim provider.request "after" capture + self-redaction (267 tests)
Threads the step span's correlated logger into provider.stream (new optional ProviderStreamOptions.logger) so provider-openai-compat opens a child provider.request span at the fetch edge, capturing the verbatim post-transform request + response status/cache-tokens/raw-error. Auth header self-redacted in the provider's OWN code (graduated mask tiers; no shared helper). Capture is fail-safe (never throws into the turn). Adds the first hermetic provider HTTP test (stream.test.ts: fetch mocked, 15 cases). Large payloads use attributes for now; the LogRecord.body channel is a deferred ABI design (notes §10).
Verified: tsc -b clean, 267 tests (250->+17), biome 0 warnings/0 infos. Live boot: provider.request shares turnId with prompt:before (before<->after diffable); auth-key leak count = 0 (self-redaction proven on a real request).
| -rw-r--r-- | notes/observability-design.md | 32 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/provider.ts | 9 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 103 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 7 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/stream.test.ts | 511 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/stream.ts | 160 | ||||
| -rw-r--r-- | tasks.md | 25 |
7 files changed, 812 insertions, 35 deletions
diff --git a/notes/observability-design.md b/notes/observability-design.md index 6c09099..5e9c3a7 100644 --- a/notes/observability-design.md +++ b/notes/observability-design.md @@ -583,12 +583,32 @@ finish) → Unit 2 → host-bin wiring**. Optional overlap: start Unit 2 once Un **Verify:** `typecheck`/`test`/`check` clean; live boot → `host.logger` lines land in the journal file. -### Next step (after Phase A) — the "AFTER" capture -`provider.request` verbatim post-transform (D5) inside `provider-openai-compat`: the -exact serialized request bytes + raw response/error + cache tokens, auth header -self-redacted. This **completes full round-trip rebuild** and unlocks the -**before↔after diff** (kernel "before" vs wire "after" → pinpoints transform bugs). -Depends only on the Phase A Logger ABI; lives entirely in the provider extension. +### Phase A.2 — the "AFTER" capture (build plan) +`provider.request` verbatim post-transform (D5) inside `provider-openai-compat`: +exact serialized request + response status/cache-tokens/raw-error, auth self-redacted. +Completes full round-trip rebuild + the **before↔after diff**. +- **Contract (DONE, orchestrator):** `ProviderStreamOptions.logger?: Logger` + (`contracts/provider.ts`) — threads the step's correlated logger into `stream()` so + the `provider.request` span is a child of the step span (before↔after share + turnId/parentSpanId). Optional = non-breaking. +- **Unit K — kernel run-turn** (owner: kernel): pass the step span's logger into + `provider.stream(msgs, tools, { ...opts, logger })`. One file (`runtime/run-turn.ts`). +- **Unit P — provider-openai-compat** (owner): at the fetch edge, if `opts.logger`, + open a `provider.request` child span; capture the verbatim post-transform request + (URL, model, params, serialized body) + `cache_control` presence; on response, + status + cache-read/creation tokens (Usage) + (on error) raw error; **self-redact + the auth header in its own code** (graduated tiers, §6). First hermetic provider + HTTP test (`stream.test.ts`, mock `fetch` + real-capture fixtures). +- **Order:** contract frozen (done) → Unit K ∥ Unit P (disjoint: kernel vs provider). + +**DEFERRED — body-channel ABI (design decision, surface to user):** `LogRecord` has a +`body` field but the Logger/Span API exposes no way to set it — that's why +`prompt:before` (and now the request/response) use stringified `attributes`. Storing +large verbatim payloads in `body` (store-fat-serve-thin; both before & after) needs a +small ABI addition (e.g. `Span.setBody(body)`), worth doing before Phase B's query +layer. Until then captures use attributes — functional + reconstructable, just not +ideal for D9 `GROUP BY`. + *(Full per-extension prompt-segment provenance — D8 — comes later, with the context-filter chain.)* diff --git a/packages/kernel/src/contracts/provider.ts b/packages/kernel/src/contracts/provider.ts index 1d0fd75..62dc8e9 100644 --- a/packages/kernel/src/contracts/provider.ts +++ b/packages/kernel/src/contracts/provider.ts @@ -7,6 +7,7 @@ */ import type { ChatMessage } from "./conversation.js"; +import type { Logger } from "./logging.js"; import type { ToolContract } from "./tool.js"; /** @@ -92,6 +93,14 @@ export interface ProviderStreamOptions { readonly maxTokens?: number; /** System prompt to prepend. */ readonly systemPrompt?: string; + /** + * Correlated logger for this turn's step (Phase A logging ABI). When present, + * the provider should open a child `provider.request` span and capture the + * verbatim post-transform request + raw response/error there, self-redacting + * secrets in its own code. Optional so non-instrumented callers/tests still + * compile (the provider falls back to no capture). + */ + readonly logger?: Logger; } /** diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 48f0b5a..667476f 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -817,26 +817,26 @@ describe("runTurn", () => { } }); - describe("span instrumentation", () => { - function createTestLogger(): { - logger: Logger; - sink: LogSink & { records: LogRecord[] }; - deps: LogDeps; - } { - let idCounter = 0; - const deps: LogDeps = { - now: () => 1000 + idCounter * 100, - newId: () => `span-${++idCounter}`, - }; - const records: LogRecord[] = []; - const sink: LogSink & { records: LogRecord[] } = { - records, - emit: (record) => records.push(record), - }; - const logger = createLogger({ extensionId: "test" }, sink, deps); - return { logger, sink, deps }; - } + function createTestLogger(): { + logger: Logger; + sink: LogSink & { records: LogRecord[] }; + deps: LogDeps; + } { + let idCounter = 0; + const deps: LogDeps = { + now: () => 1000 + idCounter * 100, + newId: () => `span-${++idCounter}`, + }; + const records: LogRecord[] = []; + const sink: LogSink & { records: LogRecord[] } = { + records, + emit: (record) => records.push(record), + }; + const logger = createLogger({ extensionId: "test" }, sink, deps); + return { logger, sink, deps }; + } + describe("span instrumentation", () => { it("emits turn + step span open/close in order", async () => { const provider = createFakeProvider([ [ @@ -1040,4 +1040,69 @@ describe("runTurn", () => { } }); }); + + describe("provider logger threading", () => { + it("passes step span logger to provider.stream opts when logger provided", async () => { + let capturedOpts: Record<string, unknown> | undefined; + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedOpts = opts !== undefined ? { ...opts } : undefined; + return (async function* () { + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { logger } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedOpts).toBeDefined(); + expect(capturedOpts?.logger).toBeDefined(); + expect(typeof (capturedOpts?.logger as Record<string, unknown>).info).toBe("function"); + expect(typeof (capturedOpts?.logger as Record<string, unknown>).span).toBe("function"); + }); + + it("passes undefined for opts.logger when no logger provided", async () => { + let capturedOpts: Record<string, unknown> | undefined; + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedOpts = opts !== undefined ? { ...opts } : undefined; + return (async function* () { + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + }); + + expect(capturedOpts).toBeDefined(); + expect(capturedOpts?.logger).toBeUndefined(); + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 0f42ef3..a78c31d 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -78,6 +78,7 @@ interface StepContext { readonly conversationId: string; readonly turnId: string; readonly logger: Logger; + readonly stepLogger: Logger | undefined; readonly toolSpans: Map<string, Span>; } @@ -197,7 +198,10 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { ); try { - const stream = ctx.provider.stream(ctx.messages, ctx.tools); + const opts = { + ...(ctx.stepLogger !== undefined ? { logger: ctx.stepLogger } : {}), + }; + const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); for await (const event of stream) { if (ctx.signal.aborted) break; processEvent(event, chunks, toolCalls, dispatcher, ctx); @@ -351,6 +355,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { conversationId, turnId, logger: turnSpan?.log ?? logger ?? createNoopLogger(), + stepLogger: logger, toolSpans, }); diff --git a/packages/provider-openai-compat/src/stream.test.ts b/packages/provider-openai-compat/src/stream.test.ts new file mode 100644 index 0000000..0b8e643 --- /dev/null +++ b/packages/provider-openai-compat/src/stream.test.ts @@ -0,0 +1,511 @@ +import type { ChatMessage, Logger, ProviderEvent, Span } from "@dispatch/kernel"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { type StreamConfig, streamChat } from "./stream.js"; + +async function collectEvents(iter: AsyncIterable<ProviderEvent>): Promise<ProviderEvent[]> { + const events: ProviderEvent[] = []; + for await (const event of iter) { + events.push(event); + } + return events; +} + +function assertDefined<T>(v: T, msg?: string): asserts v is NonNullable<T> { + if (v === undefined || v === null) { + throw new Error(msg ?? "expected defined"); + } +} + +interface CapturedSpan { + name: string; + attrs: Record<string, string | number | boolean | null>; + endOutcome?: + | { err?: unknown; attrs?: Record<string, string | number | boolean | null> } + | undefined; +} + +function createFakeLogger(): { logger: Logger; spans: CapturedSpan[] } { + const spans: CapturedSpan[] = []; + let spanAttrBuffer: Record<string, string | number | boolean | null> = {}; + + const fakeSpan: Span = { + id: "fake-span-id", + log: {} as Logger, + setAttributes(attrs) { + Object.assign(spanAttrBuffer, attrs); + }, + addLink() {}, + child() { + return fakeSpan; + }, + end(outcome?) { + spans.push({ + name: "provider.request", + attrs: { ...spanAttrBuffer }, + endOutcome: outcome as CapturedSpan["endOutcome"], + }); + }, + }; + + const logger: Logger = { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return logger; + }, + span(_name, attrs) { + spanAttrBuffer = attrs ? { ...attrs } : {}; + return fakeSpan; + }, + }; + + return { logger, spans }; +} + +function makeConfig(apiKey = "sk-test-1234567890abcdef"): StreamConfig { + return { + baseURL: "https://api.example.com/v1", + apiKey, + model: "test-model", + }; +} + +function mockFetch(handler: (url: string | URL | Request, init?: RequestInit) => unknown): void { + globalThis.fetch = vi.fn(handler) as unknown as typeof globalThis.fetch; +} + +function makeMessages(): readonly ChatMessage[] { + return [ + { + role: "user", + chunks: [{ type: "text", text: "Hello" }], + }, + ]; +} + +function sseBody(...lines: string[]): ReadableStream<Uint8Array> { + const encoder = new TextEncoder(); + const chunks = lines.map((l) => encoder.encode(`${l}\n`)); + let index = 0; + return new ReadableStream<Uint8Array>({ + pull(controller) { + if (index < chunks.length) { + const chunk = chunks[index]; + assertDefined(chunk); + controller.enqueue(chunk); + index++; + } else { + controller.close(); + } + }, + }); +} + +describe("streamChat — provider.request AFTER capture", () => { + let originalFetch: typeof globalThis.fetch; + + beforeEach(() => { + originalFetch = globalThis.fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it("opens a provider.request span with verbatim request body", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-1","choices":[{"delta":{"content":"Hi"},"index":0}]}', + 'data: {"id":"cmpl-1","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + const events = await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + expect(events.some((e) => e.type === "text-delta")).toBe(true); + expect(spans).toHaveLength(1); + + assertDefined(spans[0]); + const span = spans[0]; + expect(span.name).toBe("provider.request"); + expect(span.attrs["request.method"]).toBe("POST"); + + const capturedBody = JSON.parse(span.attrs["request.body"] as string); + expect(capturedBody.model).toBe("test-model"); + expect(capturedBody.stream).toBe(true); + expect(capturedBody.messages).toEqual([{ role: "user", content: "Hello" }]); + + expect(span.endOutcome?.attrs?.status).toBe(200); + }); + + it("redacts a long API key (≥13 chars → reveal 3 each side)", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig("sk-abcdefghijkmnop"); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-2","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-2","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + const authHeader = span.attrs["request.headers.authorization"] as string; + expect(authHeader).toBe("Bearer sk-…redacted…nop"); + expect(authHeader).not.toContain("abcdefghijkm"); + }); + + it("redacts a medium API key (8–10 chars → reveal 1 each side)", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig("sk-abcde"); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-3","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-3","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + const authHeader = span.attrs["request.headers.authorization"] as string; + expect(authHeader).toBe("Bearer s…redacted…e"); + }); + + it("redacts a short API key (≤7 chars → full mask)", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig("secret!"); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-4","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-4","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + const authHeader = span.attrs["request.headers.authorization"] as string; + expect(authHeader).toBe("Bearer …redacted…"); + }); + + it("captures cache tokens from the response", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-5","choices":[{"delta":{"content":"Hi"},"index":0}]}', + 'data: {"id":"cmpl-5","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + 'data: {"id":"cmpl-5","usage":{"prompt_tokens":100,"completion_tokens":20,"cache_read_tokens":80,"cache_write_tokens":10}}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + expect(span.endOutcome?.attrs?.["usage.inputTokens"]).toBe(100); + expect(span.endOutcome?.attrs?.["usage.outputTokens"]).toBe(20); + expect(span.endOutcome?.attrs?.["usage.cacheReadTokens"]).toBe(80); + expect(span.endOutcome?.attrs?.["usage.cacheWriteTokens"]).toBe(10); + }); + + it("captures cache_read_tokens alone", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-6","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-6","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + 'data: {"id":"cmpl-6","usage":{"prompt_tokens":50,"completion_tokens":5,"cache_read_tokens":45}}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + expect(span.endOutcome?.attrs?.["usage.cacheReadTokens"]).toBe(45); + expect(span.endOutcome?.attrs?.["usage.cacheWriteTokens"]).toBeUndefined(); + }); + + it("records HTTP error status and error body without throwing", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response("Invalid request body", { + status: 400, + headers: { "Content-Type": "text/plain" }, + }), + ); + + const events = await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + type: "error", + message: "HTTP 400: Invalid request body", + code: "400", + retryable: false, + }); + + expect(spans).toHaveLength(1); + assertDefined(spans[0]); + const span = spans[0]; + expect(span.endOutcome?.attrs?.status).toBe(400); + expect(span.endOutcome?.attrs?.["response.error_body"]).toBe("Invalid request body"); + expect(span.endOutcome?.err).toBeInstanceOf(Error); + }); + + it("records network error without throwing", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch(() => { + throw new Error("connection refused"); + }); + + const events = await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + type: "error", + message: "connection refused", + retryable: true, + }); + + expect(spans).toHaveLength(1); + assertDefined(spans[0]); + const span = spans[0]; + expect(span.endOutcome?.err).toBeInstanceOf(Error); + expect((span.endOutcome?.err as Error).message).toBe("connection refused"); + }); + + it("detects cache_control breakpoint absence in a normal request body", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-7","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-7","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + expect(span.attrs["request.cache_control_present"]).toBe(false); + }); + + it("does not open a span when opts.logger is absent", async () => { + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-8","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-8","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + const events = await collectEvents(streamChat(config, makeMessages(), [])); + + expect(events.some((e) => e.type === "text-delta")).toBe(true); + }); + + it("fail-safe: logger throwing does not break stream()", async () => { + const brokenLogger: Logger = { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return brokenLogger; + }, + span() { + throw new Error("logger exploded"); + }, + }; + + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-9","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-9","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + const events = await collectEvents( + streamChat(config, makeMessages(), [], { logger: brokenLogger }), + ); + + expect(events.some((e) => e.type === "text-delta")).toBe(true); + expect(events.some((e) => e.type === "finish")).toBe(true); + }); + + it("redacts an 11-char API key (reveal 2 each side)", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig("sk-abcde1234"); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-10","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-10","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + const authHeader = span.attrs["request.headers.authorization"] as string; + expect(authHeader).toBe("Bearer sk…redacted…34"); + }); + + it("records server error (500) as retryable", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response("Internal Server Error", { + status: 500, + headers: { "Content-Type": "text/plain" }, + }), + ); + + const events = await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + type: "error", + message: "HTTP 500: Internal Server Error", + code: "500", + retryable: true, + }); + + expect(spans).toHaveLength(1); + assertDefined(spans[0]); + expect(spans[0].endOutcome?.attrs?.status).toBe(500); + }); + + it("captures model and url on the span", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-11","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-11","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents(streamChat(config, makeMessages(), [], { logger })); + + assertDefined(spans[0]); + const span = spans[0]; + expect(span.attrs.model).toBe("test-model"); + expect(span.attrs.url).toBe("https://api.example.com/v1/chat/completions"); + }); + + it("uses opts.model override in capture", async () => { + const { logger, spans } = createFakeLogger(); + const config = makeConfig(); + + mockFetch( + () => + new Response( + sseBody( + 'data: {"id":"cmpl-12","choices":[{"delta":{"content":"ok"},"index":0}]}', + 'data: {"id":"cmpl-12","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + "data: [DONE]", + ), + { status: 200, headers: { "Content-Type": "text/event-stream" } }, + ), + ); + + await collectEvents( + streamChat(config, makeMessages(), [], { logger, model: "override-model" }), + ); + + assertDefined(spans[0]); + const span = spans[0]; + expect(span.attrs.model).toBe("override-model"); + + const capturedBody = JSON.parse(span.attrs["request.body"] as string); + expect(capturedBody.model).toBe("override-model"); + }); +}); diff --git a/packages/provider-openai-compat/src/stream.ts b/packages/provider-openai-compat/src/stream.ts index 7021120..9e27a89 100644 --- a/packages/provider-openai-compat/src/stream.ts +++ b/packages/provider-openai-compat/src/stream.ts @@ -2,6 +2,7 @@ import type { ChatMessage, ProviderEvent, ProviderStreamOptions, + Span, ToolContract, } from "@dispatch/kernel"; import { convertMessages, type OpenAIMessage } from "./convert-messages.js"; @@ -13,6 +14,24 @@ export interface StreamConfig { readonly model: string; } +/** + * Graduated secret mask — §6 tiers. Reimplemented locally (isolation-over-dry). + * ≥13 → reveal 3 each side · 11–12 → 2 · 8–10 → 1 · ≤7 → full mask. + */ +function maskSecret(value: string): string { + const len = value.length; + if (len <= 7) return "…redacted…"; + let reveal: number; + if (len >= 13) { + reveal = 3; + } else if (len >= 11) { + reveal = 2; + } else { + reveal = 1; + } + return `${value.slice(0, reveal)}…redacted…${value.slice(-reveal)}`; +} + export async function* streamChat( config: StreamConfig, messages: readonly ChatMessage[], @@ -43,17 +62,56 @@ export async function* streamChat( body.max_tokens = opts.maxTokens; } + const url = `${config.baseURL}/chat/completions`; + const bodyString = JSON.stringify(body); + + let reqSpan: Span | undefined; + let totalInputTokens = 0; + let totalOutputTokens = 0; + let totalCacheReadTokens: number | undefined; + let totalCacheWriteTokens: number | undefined; + + if (opts?.logger) { + try { + const model = opts?.model ?? config.model; + reqSpan = opts.logger.span("provider.request", { + model, + url, + }); + const hasCacheBreakpoint = bodyString.includes("cache_control"); + reqSpan.setAttributes({ + "request.method": "POST", + "request.body": bodyString, + "request.cache_control_present": hasCacheBreakpoint, + "request.headers.content_type": "application/json", + "request.headers.authorization": `Bearer ${maskSecret(config.apiKey)}`, + }); + } catch { + // Fail-safe: capture must never break stream(). + } + } + let response: Response; try { - response = await fetch(`${config.baseURL}/chat/completions`, { + response = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${config.apiKey}`, }, - body: JSON.stringify(body), + body: bodyString, }); } catch (err) { + if (reqSpan) { + try { + reqSpan.end({ + err, + attrs: { status: 0 }, + }); + } catch { + // Fail-safe. + } + } yield { type: "error", message: err instanceof Error ? err.message : String(err), @@ -64,6 +122,20 @@ export async function* streamChat( if (!response.ok) { const text = await response.text().catch(() => "unknown"); + if (reqSpan) { + try { + reqSpan.setAttributes({ status: response.status }); + reqSpan.end({ + err: new Error(`HTTP ${response.status}: ${text}`), + attrs: { + status: response.status, + "response.error_body": text, + }, + }); + } catch { + // Fail-safe. + } + } yield { type: "error", message: `HTTP ${response.status}: ${text}`, @@ -74,14 +146,70 @@ export async function* streamChat( } if (!response.body) { + if (reqSpan) { + try { + reqSpan.end({ + err: new Error("Response body is null"), + attrs: { status: response.status }, + }); + } catch { + // Fail-safe. + } + } yield { type: "error", message: "Response body is null" }; return; } - yield* readSSEStream(response.body); + try { + yield* readSSEStream(response.body, (usage) => { + totalInputTokens = usage.inputTokens; + totalOutputTokens = usage.outputTokens; + totalCacheReadTokens = usage.cacheReadTokens; + totalCacheWriteTokens = usage.cacheWriteTokens; + }); + } catch (err) { + if (reqSpan) { + try { + reqSpan.end({ + err, + attrs: { status: response.status }, + }); + } catch { + // Fail-safe. + } + } + throw err; + } + + if (reqSpan) { + try { + const attrs: Record<string, string | number | boolean | null> = { + status: response.status, + "usage.inputTokens": totalInputTokens, + "usage.outputTokens": totalOutputTokens, + }; + if (totalCacheReadTokens !== undefined) { + attrs["usage.cacheReadTokens"] = totalCacheReadTokens; + } + if (totalCacheWriteTokens !== undefined) { + attrs["usage.cacheWriteTokens"] = totalCacheWriteTokens; + } + reqSpan.end({ attrs }); + } catch { + // Fail-safe. + } + } } -async function* readSSEStream(body: ReadableStream<Uint8Array>): AsyncIterable<ProviderEvent> { +async function* readSSEStream( + body: ReadableStream<Uint8Array>, + onUsage?: (usage: { + inputTokens: number; + outputTokens: number; + cacheReadTokens?: number; + cacheWriteTokens?: number; + }) => void, +): AsyncIterable<ProviderEvent> { const reader = body.getReader(); const decoder = new TextDecoder(); let buffer = ""; @@ -182,15 +310,39 @@ async function* readSSEStream(body: ReadableStream<Uint8Array>): AsyncIterable<P | { prompt_tokens?: number; completion_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; } | undefined; if (usage) { + const cacheRead = + usage.cache_read_tokens !== undefined ? usage.cache_read_tokens : undefined; + const cacheWrite = + usage.cache_write_tokens !== undefined ? usage.cache_write_tokens : undefined; + const usageObj: { + inputTokens: number; + outputTokens: number; + cacheReadTokens?: number; + cacheWriteTokens?: number; + } = { + inputTokens: usage.prompt_tokens ?? 0, + outputTokens: usage.completion_tokens ?? 0, + }; + if (cacheRead !== undefined) { + usageObj.cacheReadTokens = cacheRead; + } + if (cacheWrite !== undefined) { + usageObj.cacheWriteTokens = cacheWrite; + } + onUsage?.(usageObj); yield { type: "usage", usage: { inputTokens: usage.prompt_tokens ?? 0, outputTokens: usage.completion_tokens ?? 0, + ...(cacheRead !== undefined ? { cacheReadTokens: cacheRead } : {}), + ...(cacheWrite !== undefined ? { cacheWriteTokens: cacheWrite } : {}), }, }; } @@ -181,12 +181,27 @@ spans (open+close) + the `prompt:before` record carrying the verbatim messages a sink → journal file; the collector (process 2) is Phase B. Redaction is per-extension self-redaction (no shared helper — isolation over DRY). +### Phase A.2 — AFTER capture ✅ DONE + verified live +- [x] **Contract** (orchestrator): `ProviderStreamOptions.logger?` threads the step's + correlated logger into `stream()` (optional, non-breaking). +- [x] **Unit K — kernel run-turn**: passes the step span's logger into `provider.stream`. +- [x] **Unit P — provider-openai-compat**: `provider.request` span capturing the + verbatim post-transform request + status/cache-tokens/raw-error; **auth self-redacted + in its own code** (graduated tiers, no shared helper); fail-safe; **15 hermetic + fetch-mocked tests** (first provider HTTP coverage). +- typecheck clean, **267 tests** (250→+17), biome fully clean (0 warnings / 0 infos). + **Live:** `provider.request` shares the turn's `turnId` with `prompt:before` + (before↔after diffable); **auth-key leak count = 0** (self-redaction verified live). + Summons: prompts/phase-a2-{kernel-runturn,provider-after-capture}.md (+ 2 test cleanups). + ### Next (observability) -- **"AFTER" capture** — `provider.request` verbatim post-transform in - provider-openai-compat → full round-trip rebuild + before↔after diff (§10). -- Minor refinement: move the large `prompt:before` payload from `attributes` into the - record `body` field (store-fat-serve-thin) — currently a stringified attribute. -- Phase B: out-of-process collector → SQLite store + query (§11). +- **Body-channel ABI (design — surface to user):** add a way to set `LogRecord.body` + (e.g. `Span.setBody`) so large verbatim payloads (prompt:before + provider request) + use `body` not stringified `attributes` (store-fat-serve-thin; before Phase B query). +- **Phase B:** out-of-process collector → SQLite store + query (§11). +- **Record/replay test fixtures** (goal): turn captured verbatim provider.request/ + response traces into hermetic `stream.test.ts` fixtures (mock `fetch`, replay real + flash) for regression + deterministic repro. D5; §7. Complements contract-fakes. Summons: prompts/phase-a-{kernel-logging,journal-sink}.md; reports/phase-a-{kernel-logging,journal-sink}.md. |
