diff options
| -rw-r--r-- | bun.lock | 6 | ||||
| -rw-r--r-- | packages/trace-replay/package.json | 8 | ||||
| -rw-r--r-- | packages/trace-replay/src/fixture.test.ts | 166 | ||||
| -rw-r--r-- | packages/trace-replay/src/fixture.ts | 77 | ||||
| -rw-r--r-- | packages/trace-replay/src/index.ts | 4 | ||||
| -rw-r--r-- | packages/trace-replay/src/record.test.ts | 112 | ||||
| -rw-r--r-- | packages/trace-replay/src/record.ts | 113 | ||||
| -rw-r--r-- | packages/trace-replay/src/replay.test.ts | 136 | ||||
| -rw-r--r-- | packages/trace-replay/src/replay.ts | 144 | ||||
| -rw-r--r-- | packages/trace-replay/src/types.ts | 24 | ||||
| -rw-r--r-- | packages/trace-replay/tsconfig.json | 6 | ||||
| -rw-r--r-- | tasks.md | 42 | ||||
| -rw-r--r-- | tsconfig.json | 1 |
13 files changed, 831 insertions, 8 deletions
@@ -88,6 +88,10 @@ "@dispatch/kernel": "workspace:*", }, }, + "packages/trace-replay": { + "name": "@dispatch/trace-replay", + "version": "0.0.0", + }, "packages/trace-store": { "name": "@dispatch/trace-store", "version": "0.0.0", @@ -144,6 +148,8 @@ "@dispatch/tool-read-file": ["@dispatch/tool-read-file@workspace:packages/tool-read-file"], + "@dispatch/trace-replay": ["@dispatch/trace-replay@workspace:packages/trace-replay"], + "@dispatch/trace-store": ["@dispatch/trace-store@workspace:packages/trace-store"], "@dispatch/transport-http": ["@dispatch/transport-http@workspace:packages/transport-http"], diff --git a/packages/trace-replay/package.json b/packages/trace-replay/package.json new file mode 100644 index 0000000..9717cd4 --- /dev/null +++ b/packages/trace-replay/package.json @@ -0,0 +1,8 @@ +{ + "name": "@dispatch/trace-replay", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts" +} diff --git a/packages/trace-replay/src/fixture.test.ts b/packages/trace-replay/src/fixture.test.ts new file mode 100644 index 0000000..cca6585 --- /dev/null +++ b/packages/trace-replay/src/fixture.test.ts @@ -0,0 +1,166 @@ +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { loadFixture, parseFixture, saveFixture, serializeFixture } from "./fixture.js"; +import type { HttpExchangeFixture } from "./types.js"; + +const fixture: HttpExchangeFixture = { + request: { + method: "POST", + url: "https://api.example.com/v1/chat", + headers: { "content-type": "application/json" }, + body: '{"model":"gpt-4"}', + }, + response: { + status: 200, + statusText: "OK", + headers: { "content-type": "application/json" }, + body: '{"choices":[]}', + }, + meta: { model: "gpt-4", capturedAt: 1700000000000, label: null }, +}; + +const minimalFixture: HttpExchangeFixture = { + request: { method: "GET", url: "https://x", headers: {}, body: null }, + response: { status: 204, headers: {}, body: "" }, +}; + +describe("serializeFixture", () => { + it("produces valid JSON", () => { + const json = serializeFixture(fixture); + expect(() => JSON.parse(json)).not.toThrow(); + }); + + it("round-trips through parseFixture", () => { + const json = serializeFixture(fixture); + const parsed = parseFixture(json); + expect(parsed).toEqual(fixture); + }); + + it("round-trips minimal fixture", () => { + const json = serializeFixture(minimalFixture); + const parsed = parseFixture(json); + expect(parsed).toEqual(minimalFixture); + }); + + it("produces pretty-printed JSON with trailing newline", () => { + const json = serializeFixture(fixture); + expect(json).toContain("\n"); + expect(json.endsWith("\n")).toBe(true); + }); +}); + +describe("parseFixture", () => { + it("throws on invalid JSON", () => { + expect(() => parseFixture("not json")).toThrow("Invalid JSON"); + }); + + it("throws on non-object", () => { + expect(() => parseFixture('"hello"')).toThrow("Fixture must be an object"); + }); + + it("throws on null", () => { + expect(() => parseFixture("null")).toThrow("Fixture must be an object"); + }); + + it("throws on missing request", () => { + expect(() => parseFixture('{"response":{"status":200,"headers":{},"body":""}}')).toThrow( + "request", + ); + }); + + it("throws on missing response", () => { + expect(() => + parseFixture('{"request":{"method":"GET","url":"x","headers":{},"body":null}}'), + ).toThrow("response"); + }); + + it("throws on non-string request.method", () => { + const bad = { + request: { method: 123, url: "x", headers: {}, body: null }, + response: { status: 200, headers: {}, body: "" }, + }; + expect(() => parseFixture(JSON.stringify(bad))).toThrow("request.method must be a string"); + }); + + it("throws on non-number response.status", () => { + const bad = { + request: { method: "GET", url: "x", headers: {}, body: null }, + response: { status: "200", headers: {}, body: "" }, + }; + expect(() => parseFixture(JSON.stringify(bad))).toThrow("response.status must be a number"); + }); + + it("throws on non-string response.body", () => { + const bad = { + request: { method: "GET", url: "x", headers: {}, body: null }, + response: { status: 200, headers: {}, body: 123 }, + }; + expect(() => parseFixture(JSON.stringify(bad))).toThrow("response.body must be a string"); + }); + + it("throws on non-string header value", () => { + const bad = { + request: { method: "GET", url: "x", headers: { bad: 123 }, body: null }, + response: { status: 200, headers: {}, body: "" }, + }; + expect(() => parseFixture(JSON.stringify(bad))).toThrow("headers.bad must be a string"); + }); + + it("throws on invalid meta value", () => { + const bad = { + request: { method: "GET", url: "x", headers: {}, body: null }, + response: { status: 200, headers: {}, body: "" }, + meta: { bad: {} }, + }; + expect(() => parseFixture(JSON.stringify(bad))).toThrow("meta.bad must be"); + }); + + it("accepts valid meta with null/string/number/boolean", () => { + const withMeta = { + ...minimalFixture, + meta: { a: "s", b: 1, c: true, d: null }, + }; + const parsed = parseFixture(JSON.stringify(withMeta)); + expect(parsed.meta).toEqual({ a: "s", b: 1, c: true, d: null }); + }); + + it("accepts fixture without meta", () => { + const parsed = parseFixture(JSON.stringify(minimalFixture)); + expect(parsed.meta).toBeUndefined(); + }); +}); + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await mkdtemp(join(tmpdir(), "trace-replay-test-")); +}); + +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +describe("saveFixture / loadFixture", () => { + it("writes and reads back a fixture (real fs)", () => { + const path = join(tmpDir, "test-fixture.json"); + saveFixture(path, fixture); + const loaded = loadFixture(path); + expect(loaded).toEqual(fixture); + }); + + it("writes and reads back a minimal fixture", () => { + const path = join(tmpDir, "minimal.json"); + saveFixture(path, minimalFixture); + const loaded = loadFixture(path); + expect(loaded).toEqual(minimalFixture); + }); + + it("loadFixture throws on malformed file", () => { + const path = join(tmpDir, "bad.json"); + const { writeFileSync } = require("node:fs") as typeof import("node:fs"); + writeFileSync(path, "not json", "utf-8"); + expect(() => loadFixture(path)).toThrow("Invalid JSON"); + }); +}); diff --git a/packages/trace-replay/src/fixture.ts b/packages/trace-replay/src/fixture.ts new file mode 100644 index 0000000..423185d --- /dev/null +++ b/packages/trace-replay/src/fixture.ts @@ -0,0 +1,77 @@ +import { readFileSync, writeFileSync } from "node:fs"; +import type { HttpExchangeFixture } from "./types.js"; + +export function serializeFixture(fx: HttpExchangeFixture): string { + return `${JSON.stringify(fx, null, 2)}\n`; +} + +export function parseFixture(text: string): HttpExchangeFixture { + let raw: unknown; + try { + raw = JSON.parse(text); + } catch { + throw new Error("Invalid JSON"); + } + assertFixture(raw); + return raw; +} + +function assertFixture(raw: unknown): asserts raw is HttpExchangeFixture { + if (raw === null || typeof raw !== "object" || Array.isArray(raw)) { + throw new Error("Fixture must be an object"); + } + const obj = raw as Record<string, unknown>; + + if (obj.request === null || typeof obj.request !== "object" || Array.isArray(obj.request)) { + throw new Error("Fixture.request must be an object"); + } + const req = obj.request as Record<string, unknown>; + if (typeof req.method !== "string") throw new Error("Fixture.request.method must be a string"); + if (typeof req.url !== "string") throw new Error("Fixture.request.url must be a string"); + assertStringRecord(req.headers, "Fixture.request.headers"); + if (req.body !== null && typeof req.body !== "string") { + throw new Error("Fixture.request.body must be a string or null"); + } + + if (obj.response === null || typeof obj.response !== "object" || Array.isArray(obj.response)) { + throw new Error("Fixture.response must be an object"); + } + const res = obj.response as Record<string, unknown>; + if (typeof res.status !== "number") throw new Error("Fixture.response.status must be a number"); + if (res.statusText !== undefined && typeof res.statusText !== "string") { + throw new Error("Fixture.response.statusText must be a string if present"); + } + assertStringRecord(res.headers, "Fixture.response.headers"); + if (typeof res.body !== "string") throw new Error("Fixture.response.body must be a string"); + + if (obj.meta !== undefined) { + if (obj.meta === null || typeof obj.meta !== "object" || Array.isArray(obj.meta)) { + throw new Error("Fixture.meta must be an object if present"); + } + const meta = obj.meta as Record<string, unknown>; + for (const [k, v] of Object.entries(meta)) { + if (v !== null && typeof v !== "string" && typeof v !== "number" && typeof v !== "boolean") { + throw new Error(`Fixture.meta.${k} must be a string, number, boolean, or null`); + } + } + } +} + +function assertStringRecord(val: unknown, label: string): asserts val is Record<string, string> { + if (val === null || typeof val !== "object" || Array.isArray(val)) { + throw new Error(`${label} must be an object`); + } + for (const [k, v] of Object.entries(val as Record<string, unknown>)) { + if (typeof v !== "string") { + throw new Error(`${label}.${k} must be a string`); + } + } +} + +export function saveFixture(path: string, fx: HttpExchangeFixture): void { + writeFileSync(path, serializeFixture(fx), "utf-8"); +} + +export function loadFixture(path: string): HttpExchangeFixture { + return parseFixture(readFileSync(path, "utf-8")); +} diff --git a/packages/trace-replay/src/index.ts b/packages/trace-replay/src/index.ts new file mode 100644 index 0000000..87243be --- /dev/null +++ b/packages/trace-replay/src/index.ts @@ -0,0 +1,4 @@ +export { loadFixture, parseFixture, saveFixture, serializeFixture } from "./fixture.js"; +export { recordFetch } from "./record.js"; +export { replayFetch } from "./replay.js"; +export type { CapturedRequest, FetchLike, HttpExchangeFixture } from "./types.js"; diff --git a/packages/trace-replay/src/record.test.ts b/packages/trace-replay/src/record.test.ts new file mode 100644 index 0000000..af9dbd0 --- /dev/null +++ b/packages/trace-replay/src/record.test.ts @@ -0,0 +1,112 @@ +import { describe, expect, it } from "vitest"; +import { recordFetch } from "./record.js"; +import type { FetchLike, HttpExchangeFixture } from "./types.js"; + +function fakeResponse(body: string, init?: ResponseInit): Response { + return new Response(body, { + status: 200, + statusText: "OK", + headers: { "content-type": "application/json" }, + ...init, + }); +} + +describe("recordFetch", () => { + it("onExchange receives fixture matching request+response", async () => { + const fixtures: HttpExchangeFixture[] = []; + const fakeFetch: FetchLike = async () => fakeResponse('{"ok":true}'); + const recorded = recordFetch(fakeFetch, (fx) => fixtures.push(fx)); + + await recorded("https://api.example.com/v1/chat", { + method: "POST", + headers: { "content-type": "application/json" }, + body: '{"prompt":"hi"}', + }); + + expect(fixtures).toHaveLength(1); + const fx = fixtures[0] as HttpExchangeFixture; + expect(fx.request.method).toBe("POST"); + expect(fx.request.url).toBe("https://api.example.com/v1/chat"); + expect(fx.request.headers["content-type"]).toBe("application/json"); + expect(fx.request.body).toBe('{"prompt":"hi"}'); + expect(fx.response.status).toBe(200); + expect(fx.response.statusText).toBe("OK"); + expect(fx.response.headers["content-type"]).toBe("application/json"); + expect(fx.response.body).toBe('{"ok":true}'); + }); + + it("response returned to caller is still fully readable after recording", async () => { + const fixtures: HttpExchangeFixture[] = []; + const fakeFetch: FetchLike = async () => fakeResponse("full body content"); + const recorded = recordFetch(fakeFetch, (fx) => fixtures.push(fx)); + + const res = await recorded("https://x"); + expect(await res.text()).toBe("full body content"); + }); + + it("response body can be read even after fixture is captured", async () => { + const fixtures: HttpExchangeFixture[] = []; + const fakeFetch: FetchLike = async () => fakeResponse("stream data"); + const recorded = recordFetch(fakeFetch, (fx) => fixtures.push(fx)); + + const res = await recorded("https://x"); + expect(fixtures[0]?.response.body).toBe("stream data"); + expect(await res.text()).toBe("stream data"); + }); + + it("captures Request object input with method", async () => { + const fixtures: HttpExchangeFixture[] = []; + const fakeFetch: FetchLike = async () => fakeResponse("ok"); + const recorded = recordFetch(fakeFetch, (fx) => fixtures.push(fx)); + + const req = new Request("https://from-req.com/path", { method: "PUT" }); + await recorded(req); + + expect(fixtures[0]?.request.url).toBe("https://from-req.com/path"); + expect(fixtures[0]?.request.method).toBe("PUT"); + }); + + it("defaults method to GET", async () => { + const fixtures: HttpExchangeFixture[] = []; + const fakeFetch: FetchLike = async () => fakeResponse("ok"); + const recorded = recordFetch(fakeFetch, (fx) => fixtures.push(fx)); + + await recorded("https://x"); + expect(fixtures[0]?.request.method).toBe("GET"); + }); + + it("captures null request body", async () => { + const fixtures: HttpExchangeFixture[] = []; + const fakeFetch: FetchLike = async () => fakeResponse("ok"); + const recorded = recordFetch(fakeFetch, (fx) => fixtures.push(fx)); + + await recorded("https://x", { method: "GET" }); + expect(fixtures[0]?.request.body).toBeNull(); + }); + + it("captures response error status", async () => { + const fixtures: HttpExchangeFixture[] = []; + const fakeFetch: FetchLike = async () => + new Response('{"error":"bad request"}', { + status: 400, + statusText: "Bad Request", + }); + const recorded = recordFetch(fakeFetch, (fx) => fixtures.push(fx)); + + const res = await recorded("https://x"); + expect(fixtures[0]?.response.status).toBe(400); + expect(res.status).toBe(400); + }); + + it("passes through to real fetch", async () => { + let called = false; + const fakeFetch: FetchLike = async () => { + called = true; + return new Response("from fake", { status: 201 }); + }; + const recorded = recordFetch(fakeFetch, () => {}); + const res = await recorded("https://x"); + expect(called).toBe(true); + expect(res.status).toBe(201); + }); +}); diff --git a/packages/trace-replay/src/record.ts b/packages/trace-replay/src/record.ts new file mode 100644 index 0000000..f145457 --- /dev/null +++ b/packages/trace-replay/src/record.ts @@ -0,0 +1,113 @@ +import type { FetchLike, HttpExchangeFixture } from "./types.js"; + +export type { FetchLike } from "./types.js"; + +export function recordFetch( + realFetch: FetchLike, + onExchange: (fx: HttpExchangeFixture) => void, +): FetchLike { + return async (input, init) => { + const url = typeof input === "string" ? input : input instanceof URL ? input.href : input.url; + const method = extractMethod(input, init); + const headers = normalizeHeaders( + init?.headers as Record<string, string> | Headers | [string, string][] | undefined, + ); + const reqBody = + init?.body != null + ? await readBody( + init.body as + | string + | Blob + | ArrayBuffer + | ArrayBufferView + | URLSearchParams + | ReadableStream, + ) + : null; + + const response = await realFetch(input, init); + const responseClone = response.clone(); + const responseBody = await responseClone.text(); + + const fixture: HttpExchangeFixture = { + request: { method, url, headers, body: reqBody }, + response: { + status: response.status, + statusText: response.statusText, + headers: normalizeResponseHeaders(response.headers), + body: responseBody, + }, + }; + + onExchange(fixture); + return response; + }; +} + +function extractMethod(input: string | URL | Request, init: RequestInit | undefined): string { + if (init?.method) return init.method; + if (typeof input !== "string" && !(input instanceof URL) && "method" in input) { + return input.method; + } + return "GET"; +} + +function normalizeHeaders( + raw: Headers | Record<string, string> | [string, string][] | undefined, +): Record<string, string> { + if (!raw) return {}; + if (raw instanceof Headers) { + const out: Record<string, string> = {}; + raw.forEach((v, k) => { + out[k] = v; + }); + return out; + } + if (Array.isArray(raw)) { + const out: Record<string, string> = {}; + for (const [k, v] of raw) out[k] = v; + return out; + } + return { ...raw }; +} + +function normalizeResponseHeaders(headers: Headers): Record<string, string> { + const out: Record<string, string> = {}; + headers.forEach((v, k) => { + out[k] = v; + }); + return out; +} + +async function readBody( + body: string | Blob | ArrayBuffer | ArrayBufferView | URLSearchParams | ReadableStream, +): Promise<string | null> { + if (typeof body === "string") return body; + if (body instanceof Blob) return body.text(); + if (body instanceof ArrayBuffer) return new TextDecoder().decode(body); + if (ArrayBuffer.isView(body)) return new TextDecoder().decode(body.buffer); + if (body instanceof URLSearchParams) return body.toString(); + if (body instanceof ReadableStream) { + const reader = body.getReader(); + const chunks: Uint8Array[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + return new TextDecoder().decode(concatUint8Arrays(chunks)); + } + return null; +} + +function concatUint8Arrays(arrays: Uint8Array[]): Uint8Array { + let totalLen = 0; + for (const a of arrays) totalLen += a.byteLength; + const out = new Uint8Array(totalLen); + let offset = 0; + for (const a of arrays) { + out.set(a, offset); + offset += a.byteLength; + } + return out; +} diff --git a/packages/trace-replay/src/replay.test.ts b/packages/trace-replay/src/replay.test.ts new file mode 100644 index 0000000..6745523 --- /dev/null +++ b/packages/trace-replay/src/replay.test.ts @@ -0,0 +1,136 @@ +import { describe, expect, it } from "vitest"; +import { replayFetch } from "./replay.js"; +import type { HttpExchangeFixture } from "./types.js"; + +const fixture: HttpExchangeFixture = { + request: { + method: "POST", + url: "https://api.example.com/v1/chat", + headers: { "content-type": "application/json", authorization: "Bearer sk-abc" }, + body: '{"model":"gpt-4"}', + }, + response: { + status: 200, + statusText: "OK", + headers: { "content-type": "application/json", "x-request-id": "req-123" }, + body: '{"choices":[{"text":"hello"}]}', + }, + meta: { model: "gpt-4", capturedAt: 1700000000000 }, +}; + +describe("replayFetch", () => { + it("returns a Response with fixture status/statusText/headers", async () => { + const { fetch } = replayFetch(fixture); + const res = await fetch("https://api.example.com/v1/chat", { method: "POST" }); + expect(res.status).toBe(200); + expect(res.statusText).toBe("OK"); + expect(res.headers.get("content-type")).toBe("application/json"); + expect(res.headers.get("x-request-id")).toBe("req-123"); + }); + + it("body reads exactly fixture.response.body", async () => { + const { fetch } = replayFetch(fixture); + const res = await fetch("https://api.example.com/v1/chat"); + expect(await res.text()).toBe('{"choices":[{"text":"hello"}]}'); + }); + + it("chunkBytes splits body into correct number of chunks and reassembles", async () => { + const body = "abcdefghij"; // 10 bytes + const fx: HttpExchangeFixture = { + ...fixture, + response: { ...fixture.response, body }, + }; + const { fetch } = replayFetch(fx, { chunkBytes: 3 }); + const res = await fetch("https://x"); + const responseBody = res.body; + if (!responseBody) throw new Error("body is null"); + + const reader = responseBody.getReader(); + const chunks: Uint8Array[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + expect(chunks).toHaveLength(4); // 3+3+3+1 + + let reassembled = ""; + for (const c of chunks) reassembled += new TextDecoder().decode(c); + expect(reassembled).toBe("abcdefghij"); + }); + + it("getCapturedRequest returns undefined before first call", () => { + const { getCapturedRequest } = replayFetch(fixture); + expect(getCapturedRequest()).toBeUndefined(); + }); + + it("getCapturedRequest returns the request after fetch", async () => { + const { fetch, getCapturedRequest } = replayFetch(fixture); + await fetch("https://api.example.com/v1/chat", { + method: "POST", + headers: { "content-type": "application/json" }, + body: '{"prompt":"hi"}', + }); + const captured = getCapturedRequest(); + expect(captured).toBeDefined(); + expect(captured?.method).toBe("POST"); + expect(captured?.url).toBe("https://api.example.com/v1/chat"); + expect(captured?.headers["content-type"]).toBe("application/json"); + expect(captured?.body).toBe('{"prompt":"hi"}'); + }); + + it("getCapturedRequest updates on subsequent calls", async () => { + const { fetch, getCapturedRequest } = replayFetch(fixture); + await fetch("https://first", { method: "GET" }); + expect(getCapturedRequest()?.url).toBe("https://first"); + await fetch("https://second", { method: "PUT" }); + expect(getCapturedRequest()?.url).toBe("https://second"); + expect(getCapturedRequest()?.method).toBe("PUT"); + }); + + it("captures URL and method from Request object input", async () => { + const { fetch, getCapturedRequest } = replayFetch(fixture); + const req = new Request("https://from-request.com/path", { method: "DELETE" }); + await fetch(req); + expect(getCapturedRequest()?.url).toBe("https://from-request.com/path"); + expect(getCapturedRequest()?.method).toBe("DELETE"); + }); + + it("defaults method to GET when no init", async () => { + const { fetch, getCapturedRequest } = replayFetch(fixture); + await fetch("https://x"); + expect(getCapturedRequest()?.method).toBe("GET"); + }); + + it("handles null request body", async () => { + const { fetch, getCapturedRequest } = replayFetch(fixture); + await fetch("https://x", { method: "GET" }); + expect(getCapturedRequest()?.body).toBeNull(); + }); + + it("handles headers as array of tuples", async () => { + const { fetch, getCapturedRequest } = replayFetch(fixture); + await fetch("https://x", { + headers: [["x-custom", "val"]], + }); + expect(getCapturedRequest()?.headers["x-custom"]).toBe("val"); + }); + + it("handles headers as Headers object", async () => { + const { fetch, getCapturedRequest } = replayFetch(fixture); + const h = new Headers({ "x-h": "v" }); + await fetch("https://x", { headers: h }); + expect(getCapturedRequest()?.headers["x-h"]).toBe("v"); + }); + + it("works with minimal response (no statusText, status 204, empty body)", async () => { + const fx: HttpExchangeFixture = { + request: { method: "GET", url: "https://x", headers: {}, body: null }, + response: { status: 204, headers: {}, body: "" }, + }; + const { fetch } = replayFetch(fx); + const res = await fetch("https://x"); + expect(res.status).toBe(204); + expect(await res.text()).toBe(""); + }); +}); diff --git a/packages/trace-replay/src/replay.ts b/packages/trace-replay/src/replay.ts new file mode 100644 index 0000000..84e95b9 --- /dev/null +++ b/packages/trace-replay/src/replay.ts @@ -0,0 +1,144 @@ +import type { CapturedRequest, FetchLike, HttpExchangeFixture } from "./types.js"; + +export interface ReplayOptions { + chunkBytes?: number; +} + +export interface ReplayResult { + fetch: FetchLike; + getCapturedRequest: () => CapturedRequest | undefined; +} + +export function replayFetch(fixture: HttpExchangeFixture, opts?: ReplayOptions): ReplayResult { + let captured: CapturedRequest | undefined; + + const replay: FetchLike = async (input, init) => { + const url = typeof input === "string" ? input : input instanceof URL ? input.href : input.url; + const method = extractMethod(input, init); + const headers = normalizeHeaders( + init?.headers as Record<string, string> | Headers | [string, string][] | undefined, + ); + const body = + init?.body != null + ? await readBody( + init.body as + | string + | Blob + | ArrayBuffer + | ArrayBufferView + | URLSearchParams + | ReadableStream, + ) + : null; + + captured = { method, url, headers, body }; + + const isNullBodyStatus = + fixture.response.status === 204 || + fixture.response.status === 205 || + fixture.response.status === 304; + const responseBody = isNullBodyStatus + ? null + : buildBodyStream(fixture.response.body, opts?.chunkBytes); + const responseInit: { status: number; headers: Record<string, string>; statusText?: string } = { + status: fixture.response.status, + headers: fixture.response.headers, + }; + if (fixture.response.statusText !== undefined) { + responseInit.statusText = fixture.response.statusText; + } + return new Response(responseBody, responseInit); + }; + + return { + fetch: replay, + getCapturedRequest: () => captured, + }; +} + +function extractMethod(input: string | URL | Request, init: RequestInit | undefined): string { + if (init?.method) return init.method; + if (typeof input !== "string" && !(input instanceof URL) && "method" in input) { + return input.method; + } + return "GET"; +} + +function normalizeHeaders( + raw: Headers | Record<string, string> | [string, string][] | undefined, +): Record<string, string> { + if (!raw) return {}; + if (raw instanceof Headers) { + const out: Record<string, string> = {}; + raw.forEach((v, k) => { + out[k] = v; + }); + return out; + } + if (Array.isArray(raw)) { + const out: Record<string, string> = {}; + for (const [k, v] of raw) out[k] = v; + return out; + } + return { ...raw }; +} + +async function readBody( + body: string | Blob | ArrayBuffer | ArrayBufferView | URLSearchParams | ReadableStream, +): Promise<string | null> { + if (typeof body === "string") return body; + if (body instanceof Blob) return body.text(); + if (body instanceof ArrayBuffer) return new TextDecoder().decode(body); + if (ArrayBuffer.isView(body)) return new TextDecoder().decode(body.buffer); + if (body instanceof URLSearchParams) return body.toString(); + if (body instanceof ReadableStream) { + const reader = body.getReader(); + const chunks: Uint8Array[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + return new TextDecoder().decode(concatUint8Arrays(chunks)); + } + return null; +} + +function concatUint8Arrays(arrays: Uint8Array[]): Uint8Array { + let totalLen = 0; + for (const a of arrays) totalLen += a.byteLength; + const out = new Uint8Array(totalLen); + let offset = 0; + for (const a of arrays) { + out.set(a, offset); + offset += a.byteLength; + } + return out; +} + +function buildBodyStream(body: string, chunkBytes: number | undefined): ReadableStream<Uint8Array> { + const encoder = new TextEncoder(); + const encoded = encoder.encode(body); + + if (!chunkBytes || chunkBytes <= 0 || chunkBytes >= encoded.byteLength) { + return new ReadableStream<Uint8Array>({ + start(controller) { + controller.enqueue(encoded); + controller.close(); + }, + }); + } + + let offset = 0; + return new ReadableStream<Uint8Array>({ + pull(controller) { + if (offset >= encoded.byteLength) { + controller.close(); + return; + } + const end = Math.min(offset + chunkBytes, encoded.byteLength); + controller.enqueue(encoded.slice(offset, end)); + offset = end; + }, + }); +} diff --git a/packages/trace-replay/src/types.ts b/packages/trace-replay/src/types.ts new file mode 100644 index 0000000..235b41b --- /dev/null +++ b/packages/trace-replay/src/types.ts @@ -0,0 +1,24 @@ +export interface HttpExchangeFixture { + readonly request: { + readonly method: string; + readonly url: string; + readonly headers: Record<string, string>; + readonly body: string | null; + }; + readonly response: { + readonly status: number; + readonly statusText?: string; + readonly headers: Record<string, string>; + readonly body: string; + }; + readonly meta?: Record<string, string | number | boolean | null>; +} + +export interface CapturedRequest { + method: string; + url: string; + headers: Record<string, string>; + body: string | null; +} + +export type FetchLike = (input: string | URL | Request, init?: RequestInit) => Promise<Response>; diff --git a/packages/trace-replay/tsconfig.json b/packages/trace-replay/tsconfig.json new file mode 100644 index 0000000..0937214 --- /dev/null +++ b/packages/trace-replay/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [] +} @@ -219,14 +219,40 @@ per-extension self-redaction (no shared helper — isolation over DRY). app → journal → collector → SQLite → `trace <turnId>` easy-view. Summons: prompts/phase-b-{trace-store,observability-collector}.md. -### Next (observability) -- **Span nesting fix (kernel run-turn):** spans are currently flat (all `parent=ROOT`); - nest `step`←`turn` and `prompt`/`provider.request`←`step` (pass the step span's logger - into `provider.stream`) so the trace is a tree. (`renderEasyView` already nests once - parents exist.) -- **host-bin supervision** (deferred): spawn-first / drain-last / restart the collector. -- **Record/replay test fixtures** (goal): captured verbatim provider.request/response - traces → hermetic `stream.test.ts` fixtures (mock `fetch`, replay real flash). D5; §7. +### Phase B — span nesting + supervision ✅ DONE + verified live +- [x] **Span nesting fix (kernel run-turn):** `step`←`turn` (`turnSpan.child`), + `prompt`/`provider.request`←`step` (step span's logger into `provider.stream`) → + the trace is now a tree. Also fixed a latent `buildSpanOpen` parent-propagation bug + in `logging/logger.ts`. 279 tests. `2bf8f9e` +- [x] **host-bin supervision:** `collector-supervisor.ts` (injected spawn → unit-tested + with a fake): spawn-first before `Bun.serve`, restart on unexpected exit (backoff + + restart-guard cap), drain-last on SIGINT/SIGTERM (collector final-drain, SIGKILL + fallback). Collector failure never crashes the app (D3). 288 tests. `dded4cc` +- [x] **Orchestrator scar-doc:** the `[x]` bracket trick for `ps`/`pgrep`/`pkill` + (avoids self-match killing the parent shell) — ORCHESTRATOR.md §8. `966caf7` +- typecheck clean, **288 tests** (279 vitest + 9 supervisor) + 72 bun, biome 0/0. + **Live (clean run):** supervisor spawns exactly 1 collector; trace DB auto-populated + with the nested turn→step→{prompt,provider.request} easy-view; 0 collectors after + graceful shutdown. Summons: prompts/phase-b-{span-nesting,supervision}.md. + +### Record/replay fixtures — IN PROGRESS (user chose: shared library unit, §5.2) +Boundary call (user): build a reusable **`trace-replay`** library (NOT provider-local), +so future edge extensions can reuse it. Realizes the §7 / D5 replay affordance as +hermetic fixtures: capture one real flash exchange → commit → replay via a fixture-driven +`fetch` double (mock `fetch`, no network). NOTE: "trace" here = a captured HTTP exchange, +independent of the SQLite trace-store; the lib is redaction-free (caller self-redacts). +- [x] **Unit 1 — `trace-replay`** (`packages/trace-replay/`, mimo-v2.5-pro): generic + HTTP-exchange record/replay lib — DONE + verified. `replayFetch` (PURE: fixture → fetch + double + captured request), `recordFetch` (tee real fetch → fixture), `serialize/parse` + + `save/load` fixture I/O. **Redaction-free** (caller self-redacts — isolation over DRY), + zero `@dispatch/*` deps, NO `bun:sqlite`. 39 tests (replay 12 / record 8 / fixture 19); + root tsconfig ref wired → **327 vitest**, typecheck + biome 0/0. reports/trace-replay.md. +- [ ] **Unit 2 — provider-openai-compat consumer**: env-gated record mode at its fetch + edge (self-redacts auth in its OWN code before `saveFixture`); `stream.test.ts` replays + a committed real-flash fixture via `replayFetch` → asserts ProviderEvents + that the + outgoing request still matches (transform-drift regression). Summon AFTER Unit 1 lands. +- [~] **Build wiring** (orchestrator): root tsconfig ref for trace-replay ✓ + `bun install` ✓; + provider dep on `@dispatch/trace-replay` pending (with Unit 2). Summons: prompts/phase-a-{kernel-logging,journal-sink}.md; reports/phase-a-{kernel-logging,journal-sink}.md. diff --git a/tsconfig.json b/tsconfig.json index a824844..b0fd70a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,6 +12,7 @@ { "path": "./packages/journal-sink" }, { "path": "./packages/trace-store" }, { "path": "./packages/observability-collector" }, + { "path": "./packages/trace-replay" }, { "path": "./packages/host-bin" } ] } |
