diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 17:02:06 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 17:02:06 +0900 |
| commit | 64a66d5e41c5ab54a1d0a7712866be722b742d4b (patch) | |
| tree | 46743ba580a7d5f69d7f7e932826b2f479841a72 | |
| parent | 4fd658f9612a960c6b185e53fa52c064406dad4f (diff) | |
| download | dispatch-64a66d5e41c5ab54a1d0a7712866be722b742d4b.tar.gz dispatch-64a66d5e41c5ab54a1d0a7712866be722b742d4b.zip | |
feat(observability): provider record/replay via @dispatch/trace-replay — env-gated capture + hermetic fixture tests (331 tests)
provider-openai-compat now consumes @dispatch/trace-replay. (A) Opt-in record mode: when DISPATCH_RECORD_FIXTURE is set, the fetch edge wraps recordFetch and saves a fixture of the verbatim post-transform request + raw SSE response, self-redacting the auth header in the provider's OWN code (reuses its existing maskSecret graduated-tier mask — no shared helper, isolation over DRY). Zero overhead when unset; fail-safe. (B) Hermetic replay tests: stream.test.ts drives the provider off committed SSE fixtures via replayFetch (chunk-split to exercise SSE parsing across boundaries), asserting ProviderEvents + that the outgoing request still matches the recorded one (transform-drift regression). Injectable fetch via an internal StreamConfig.fetchFn — NO kernel contract change.
2 committed fixtures (text-turn + tool-call, currently hand-authored-faithful; a real flash text-turn swap follows). Verified: tsc -b clean, 331 vitest (327 -> +4: 2 replay + 2 redaction), biome 0/0. Provider 44 -> 48.
| -rw-r--r-- | bun.lock | 1 | ||||
| -rw-r--r-- | packages/provider-openai-compat/package.json | 3 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/__fixtures__/flash-text-turn.json | 25 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/__fixtures__/tool-call-turn.json | 25 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/stream.test.ts | 213 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/stream.ts | 36 | ||||
| -rw-r--r-- | packages/provider-openai-compat/tsconfig.json | 2 | ||||
| -rw-r--r-- | tasks.md | 18 |
8 files changed, 314 insertions, 9 deletions
@@ -64,6 +64,7 @@ "version": "0.0.0", "dependencies": { "@dispatch/kernel": "workspace:*", + "@dispatch/trace-replay": "workspace:*", }, }, "packages/session-orchestrator": { diff --git a/packages/provider-openai-compat/package.json b/packages/provider-openai-compat/package.json index df15fd3..36db8a5 100644 --- a/packages/provider-openai-compat/package.json +++ b/packages/provider-openai-compat/package.json @@ -6,6 +6,7 @@ "main": "dist/index.js", "types": "dist/index.d.ts", "dependencies": { - "@dispatch/kernel": "workspace:*" + "@dispatch/kernel": "workspace:*", + "@dispatch/trace-replay": "workspace:*" } } diff --git a/packages/provider-openai-compat/src/__fixtures__/flash-text-turn.json b/packages/provider-openai-compat/src/__fixtures__/flash-text-turn.json new file mode 100644 index 0000000..adab32b --- /dev/null +++ b/packages/provider-openai-compat/src/__fixtures__/flash-text-turn.json @@ -0,0 +1,25 @@ +{ + "request": { + "method": "POST", + "url": "https://api.example.com/v1/chat/completions", + "headers": { + "content-type": "application/json", + "authorization": "Bearer sk-…redacted…xyz" + }, + "body": "{\"model\":\"deepseek-v4-flash\",\"messages\":[{\"role\":\"user\",\"content\":\"Hello, how are you?\"}],\"stream\":true}" + }, + "response": { + "status": 200, + "statusText": "OK", + "headers": { + "content-type": "text/event-stream", + "cache-control": "no-cache" + }, + "body": "data: {\"id\":\"chatcmpl-fixture-001\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-001\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"I'm doing\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-001\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\" well, thank\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-001\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\" you! How can I\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-001\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\" help you today?\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-001\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":12,\"completion_tokens\":16,\"cache_read_tokens\":0,\"cache_write_tokens\":0}}\n\ndata: [DONE]\n" + }, + "meta": { + "description": "Simple text-turn fixture: user greeting → short assistant reply with stop finish.", + "captured_by": "provider-openai-compat record mode", + "version": 1 + } +} diff --git a/packages/provider-openai-compat/src/__fixtures__/tool-call-turn.json b/packages/provider-openai-compat/src/__fixtures__/tool-call-turn.json new file mode 100644 index 0000000..48bdb8d --- /dev/null +++ b/packages/provider-openai-compat/src/__fixtures__/tool-call-turn.json @@ -0,0 +1,25 @@ +{ + "request": { + "method": "POST", + "url": "https://api.example.com/v1/chat/completions", + "headers": { + "content-type": "application/json", + "authorization": "Bearer sk-…redacted…xyz" + }, + "body": "{\"model\":\"deepseek-v4-flash\",\"messages\":[{\"role\":\"user\",\"content\":\"What is the weather in Tokyo?\"}],\"tools\":[{\"type\":\"function\",\"function\":{\"name\":\"get_weather\",\"description\":\"Get current weather for a location\",\"parameters\":{\"type\":\"object\",\"properties\":{\"location\":{\"type\":\"string\"}},\"required\":[\"location\"]}}}],\"stream\":true}" + }, + "response": { + "status": 200, + "statusText": "OK", + "headers": { + "content-type": "text/event-stream", + "cache-control": "no-cache" + }, + "body": "data: {\"id\":\"chatcmpl-fixture-002\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-002\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_abc123\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\",\"arguments\":\"\"}}]},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-002\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"{\\\"locat\"}}]},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-002\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"ion\\\":\\\"Tokyo\\\"}\"}}]},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-fixture-002\",\"object\":\"chat.completion.chunk\",\"created\":1700000000,\"model\":\"deepseek-v4-flash\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"tool_calls\"}],\"usage\":{\"prompt_tokens\":45,\"completion_tokens\":12,\"cache_read_tokens\":30,\"cache_write_tokens\":5}}\n\ndata: [DONE]\n" + }, + "meta": { + "description": "Tool-call fixture: user asks about weather → model calls get_weather tool with split argument chunks.", + "captured_by": "provider-openai-compat record mode", + "version": 1 + } +} diff --git a/packages/provider-openai-compat/src/stream.test.ts b/packages/provider-openai-compat/src/stream.test.ts index faaf75a..ad0f37e 100644 --- a/packages/provider-openai-compat/src/stream.test.ts +++ b/packages/provider-openai-compat/src/stream.test.ts @@ -1,4 +1,6 @@ import type { ChatMessage, Logger, ProviderEvent, Span } from "@dispatch/kernel"; +import type { HttpExchangeFixture } from "@dispatch/trace-replay"; +import { loadFixture, recordFetch, replayFetch, serializeFixture } from "@dispatch/trace-replay"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { type StreamConfig, streamChat } from "./stream.js"; @@ -516,3 +518,214 @@ describe("streamChat — provider.request AFTER capture", () => { expect(capturedBody.model).toBe("override-model"); }); }); + +describe("streamChat — hermetic replay (trace-replay)", () => { + const testDir = new URL(".", import.meta.url).pathname; + const fixturePath = `${testDir}__fixtures__/flash-text-turn.json`; + const toolFixturePath = `${testDir}__fixtures__/tool-call-turn.json`; + + it("replays a text-turn fixture and produces correct ProviderEvents", async () => { + const fixture = loadFixture(fixturePath); + const { fetch: replayFetchFn, getCapturedRequest } = replayFetch(fixture, { chunkBytes: 64 }); + + const config: StreamConfig = { + baseURL: "https://api.example.com/v1", + apiKey: "sk-test-1234567890abcdef", + model: "deepseek-v4-flash", + fetchFn: replayFetchFn, + }; + + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "Hello, how are you?" }] }, + ]; + + const events = await collectEvents(streamChat(config, messages, [])); + + const textDeltas = events.filter( + (e): e is Extract<ProviderEvent, { type: "text-delta" }> => e.type === "text-delta", + ); + const fullText = textDeltas.map((e) => e.delta).join(""); + expect(fullText).toBe("I'm doing well, thank you! How can I help you today?"); + + const finishEvents = events.filter((e) => e.type === "finish"); + expect(finishEvents).toHaveLength(1); + expect(finishEvents[0]).toEqual({ type: "finish", reason: "stop" }); + + const usageEvents = events.filter( + (e): e is Extract<ProviderEvent, { type: "usage" }> => e.type === "usage", + ); + expect(usageEvents).toHaveLength(1); + expect(usageEvents[0]?.usage.inputTokens).toBe(12); + expect(usageEvents[0]?.usage.outputTokens).toBe(16); + + const captured = getCapturedRequest(); + assertDefined(captured); + expect(captured.method).toBe("POST"); + expect(captured.url).toBe("https://api.example.com/v1/chat/completions"); + expect(captured.headers["Content-Type"]).toBe("application/json"); + expect(captured.headers.Authorization).toBe("Bearer sk-test-1234567890abcdef"); + + assertDefined(captured.body); + const capturedBody = JSON.parse(captured.body); + expect(capturedBody.model).toBe("deepseek-v4-flash"); + expect(capturedBody.stream).toBe(true); + expect(capturedBody.messages).toEqual([{ role: "user", content: "Hello, how are you?" }]); + }); + + it("replays a tool-call-turn fixture and produces tool-call + finish events", async () => { + const fixture = loadFixture(toolFixturePath); + const { fetch: replayFetchFn, getCapturedRequest } = replayFetch(fixture, { chunkBytes: 48 }); + + const config: StreamConfig = { + baseURL: "https://api.example.com/v1", + apiKey: "sk-test-1234567890abcdef", + model: "deepseek-v4-flash", + fetchFn: replayFetchFn, + }; + + const messages: ChatMessage[] = [ + { role: "user", chunks: [{ type: "text", text: "What is the weather in Tokyo?" }] }, + ]; + + const weatherTool = { + name: "get_weather", + description: "Get current weather for a location", + parameters: { + type: "object" as const, + properties: { location: { type: "string" as const } }, + required: ["location"], + }, + execute: async () => ({ content: "" }), + }; + + const events = await collectEvents(streamChat(config, messages, [weatherTool])); + + const toolCalls = events.filter( + (e): e is Extract<ProviderEvent, { type: "tool-call" }> => e.type === "tool-call", + ); + expect(toolCalls).toHaveLength(1); + expect(toolCalls[0]?.toolCallId).toBe("call_abc123"); + expect(toolCalls[0]?.toolName).toBe("get_weather"); + expect(toolCalls[0]?.input).toEqual({ location: "Tokyo" }); + + const finishEvents = events.filter((e) => e.type === "finish"); + expect(finishEvents).toHaveLength(1); + expect(finishEvents[0]).toEqual({ type: "finish", reason: "tool_calls" }); + + const usageEvents = events.filter( + (e): e is Extract<ProviderEvent, { type: "usage" }> => e.type === "usage", + ); + expect(usageEvents).toHaveLength(1); + expect(usageEvents[0]?.usage.inputTokens).toBe(45); + expect(usageEvents[0]?.usage.outputTokens).toBe(12); + expect(usageEvents[0]?.usage.cacheReadTokens).toBe(30); + expect(usageEvents[0]?.usage.cacheWriteTokens).toBe(5); + + const captured = getCapturedRequest(); + assertDefined(captured); + expect(captured.method).toBe("POST"); + assertDefined(captured.body); + const capturedBody = JSON.parse(captured.body); + expect(capturedBody.tools).toHaveLength(1); + expect(capturedBody.tools[0].function.name).toBe("get_weather"); + }); +}); + +describe("streamChat — record-mode redaction (trace-replay)", () => { + /** + * Graduated secret mask — §6 tiers. Duplicated locally (isolation-over-dry). + * ≥13 → reveal 3 each side · 11–12 → 2 · 8–10 → 1 · ≤7 → full mask. + */ + function maskSecret(value: string): string { + const len = value.length; + if (len <= 7) return "…redacted…"; + let reveal: number; + if (len >= 13) { + reveal = 3; + } else if (len >= 11) { + reveal = 2; + } else { + reveal = 1; + } + return `${value.slice(0, reveal)}…redacted…${value.slice(-reveal)}`; + } + + it("self-redacts auth header in onExchange and produces a secret-free fixture", async () => { + const apiKey = "sk-abcdefghijkmnop"; + const responseBody = + 'data: {"id":"cmpl-r","choices":[{"delta":{"content":"ok"},"index":0}],"usage":{"prompt_tokens":5,"completion_tokens":1,"cache_read_tokens":0,"cache_write_tokens":0}}\n\ndata: [DONE]\n'; + + let capturedFixture: HttpExchangeFixture | undefined; + const wrappedFetch = recordFetch( + async () => + new Response(responseBody, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }), + (fx) => { + const redactedHeaders = { ...fx.request.headers }; + if (redactedHeaders.authorization) { + redactedHeaders.authorization = `Bearer ${maskSecret(redactedHeaders.authorization.replace(/^Bearer\s+/, ""))}`; + } + capturedFixture = { + request: { ...fx.request, headers: redactedHeaders }, + response: fx.response, + ...(fx.meta !== undefined ? { meta: fx.meta } : {}), + }; + }, + ); + + await wrappedFetch("https://api.example.com/v1/chat/completions", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: `Bearer ${apiKey}`, + }, + body: '{"model":"test","messages":[{"role":"user","content":"hi"}],"stream":true}', + }); + + assertDefined(capturedFixture); + + expect(capturedFixture.request.headers.authorization).toBe("Bearer sk-…redacted…nop"); + expect(capturedFixture.request.headers.authorization).not.toContain("abcdefghijkm"); + expect(capturedFixture.request.headers["content-type"]).toBe("application/json"); + + expect(capturedFixture.request.body).toContain('"model":"test"'); + expect(capturedFixture.request.body).toContain('"content":"hi"'); + + expect(capturedFixture.response.status).toBe(200); + expect(capturedFixture.response.body).toBe(responseBody); + + const serialized = serializeFixture(capturedFixture); + expect(serialized).toContain("Bearer sk-…redacted…nop"); + expect(serialized).not.toContain("abcdefghijkm"); + expect(serialized).toContain("content"); + expect(serialized).toContain("hi"); + }); + + it("redacts a short API key (≤7 chars → full mask)", async () => { + let capturedFixture: HttpExchangeFixture | undefined; + const wrappedFetch = recordFetch( + async () => new Response("data: [DONE]\n", { status: 200 }), + (fx) => { + const redactedHeaders = { ...fx.request.headers }; + if (redactedHeaders.authorization) { + redactedHeaders.authorization = `Bearer ${maskSecret(redactedHeaders.authorization.replace(/^Bearer\s+/, ""))}`; + } + capturedFixture = { + request: { ...fx.request, headers: redactedHeaders }, + response: fx.response, + }; + }, + ); + + await wrappedFetch("https://api.example.com/v1/chat/completions", { + method: "POST", + headers: { authorization: "Bearer secret!" }, + body: null, + }); + + assertDefined(capturedFixture); + expect(capturedFixture.request.headers.authorization).toBe("Bearer …redacted…"); + }); +}); diff --git a/packages/provider-openai-compat/src/stream.ts b/packages/provider-openai-compat/src/stream.ts index 1a721ab..7d1f2c8 100644 --- a/packages/provider-openai-compat/src/stream.ts +++ b/packages/provider-openai-compat/src/stream.ts @@ -5,6 +5,7 @@ import type { Span, ToolContract, } from "@dispatch/kernel"; +import type { FetchLike, HttpExchangeFixture } from "@dispatch/trace-replay"; import { convertMessages, type OpenAIMessage } from "./convert-messages.js"; import { convertTools, type OpenAITool } from "./convert-tools.js"; @@ -12,6 +13,11 @@ export interface StreamConfig { readonly baseURL: string; readonly apiKey: string; readonly model: string; + /** + * Internal injectable fetch — used by replay tests and record mode. + * When absent, falls back to globalThis.fetch (production default). + */ + readonly fetchFn?: FetchLike; } /** @@ -91,9 +97,37 @@ export async function* streamChat( } } + let effectiveFetch: FetchLike = config.fetchFn ?? fetch; + + const recordPath = + typeof process !== "undefined" ? process.env.DISPATCH_RECORD_FIXTURE : undefined; + if (recordPath && !config.fetchFn) { + try { + const { recordFetch: rf, saveFixture } = await import("@dispatch/trace-replay"); + effectiveFetch = rf(effectiveFetch, (fx: HttpExchangeFixture) => { + try { + const redactedHeaders = { ...fx.request.headers }; + if (redactedHeaders.authorization) { + redactedHeaders.authorization = `Bearer ${maskSecret(redactedHeaders.authorization.replace(/^Bearer\s+/, ""))}`; + } + const redacted: HttpExchangeFixture = { + request: { ...fx.request, headers: redactedHeaders }, + response: fx.response, + ...(fx.meta !== undefined ? { meta: fx.meta } : {}), + }; + saveFixture(recordPath, redacted); + } catch { + // Fail-safe: capture/write must never break the turn. + } + }); + } catch { + // Fail-safe: dynamic import or wrapping failure must never break the turn. + } + } + let response: Response; try { - response = await fetch(url, { + response = await effectiveFetch(url, { method: "POST", headers: { "Content-Type": "application/json", diff --git a/packages/provider-openai-compat/tsconfig.json b/packages/provider-openai-compat/tsconfig.json index ff99a43..c5997ed 100644 --- a/packages/provider-openai-compat/tsconfig.json +++ b/packages/provider-openai-compat/tsconfig.json @@ -2,5 +2,5 @@ "extends": "../../tsconfig.base.json", "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, "include": ["src/**/*.ts"], - "references": [{ "path": "../kernel" }] + "references": [{ "path": "../kernel" }, { "path": "../trace-replay" }] } @@ -247,12 +247,18 @@ independent of the SQLite trace-store; the lib is redaction-free (caller self-re + `save/load` fixture I/O. **Redaction-free** (caller self-redacts — isolation over DRY), zero `@dispatch/*` deps, NO `bun:sqlite`. 39 tests (replay 12 / record 8 / fixture 19); root tsconfig ref wired → **327 vitest**, typecheck + biome 0/0. reports/trace-replay.md. -- [ ] **Unit 2 — provider-openai-compat consumer**: env-gated record mode at its fetch - edge (self-redacts auth in its OWN code before `saveFixture`); `stream.test.ts` replays - a committed real-flash fixture via `replayFetch` → asserts ProviderEvents + that the - outgoing request still matches (transform-drift regression). Summon AFTER Unit 1 lands. -- [~] **Build wiring** (orchestrator): root tsconfig ref for trace-replay ✓ + `bun install` ✓; - provider dep on `@dispatch/trace-replay` pending (with Unit 2). +- [x] **Unit 2 — provider-openai-compat consumer** — DONE + verified (hermetic). Internal + `fetchFn?: FetchLike` on `StreamConfig` (injectable fetch — P3, NOT a kernel contract + change); env-gated record mode (`DISPATCH_RECORD_FIXTURE`) self-redacts auth via the + provider's existing `maskSecret()` (no shared helper); 2 committed SSE fixtures + (text-turn + tool-call) + 4 new tests (2 replay w/ chunk-split, 2 redaction). Provider + 44→48 tests → **331 vitest**, typecheck + biome 0/0. CR: none. + prompts/provider-trace-replay.md, reports/provider-trace-replay.md. +- [x] **Build wiring** (orchestrator): root tsconfig ref ✓; provider dep `@dispatch/trace-replay` ✓; `bun install` ✓. +- [ ] **Live capture** (orchestrator): boot with `DISPATCH_RECORD_FIXTURE` + real key → + capture a real flash exchange → secret-free check → re-summon provider to swap the + synthetic text-turn fixture for the real one + update expected text (validates our SSE + assumptions against reality — D5). Summons: prompts/phase-a-{kernel-logging,journal-sink}.md; reports/phase-a-{kernel-logging,journal-sink}.md. |
