diff options
| author | Adam Malczewski <[email protected]> | 2026-06-05 17:53:00 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-05 17:53:00 +0900 |
| commit | 8c417472e7801369c3dfd004c9c85d7d69372f7c (patch) | |
| tree | 3da8eac532855a14013b502afaf30d669010c6be | |
| parent | 74986a54093f6492cc4420f5917e5215f42a8f89 (diff) | |
| download | dispatch-8c417472e7801369c3dfd004c9c85d7d69372f7c.tar.gz dispatch-8c417472e7801369c3dfd004c9c85d7d69372f7c.zip | |
feat(observability): map DeepSeek nested cache tokens (prompt_tokens_details.cached_tokens) -> Usage.cacheReadTokens
The real flash fixture showed flash reports cache usage in the NESTED prompt_tokens_details.cached_tokens form (384 cached of 665 prompt); the parser only mapped the flat cache_read_tokens form, so cache tokens never surfaced. Now: cacheReadTokens = usage.cache_read_tokens ?? usage.prompt_tokens_details?.cached_tokens (flat wins; cacheWriteTokens flat-only, never fabricated; partial/null *_details safe). No kernel contract change (Usage already has the fields). +5 parser tests + a real-fixture regression (cacheReadTokens === 384).
These counts (+ a future prefix.fingerprint) are the cheap signals for body de-duplication. The broader trace-body storage-growth concern (verbatim body stored per request -> ~O(N^2) for long conversations) is logged DEFERRED in tasks.md; mitigation already designed (D5 volume control + §6 retention/rotation), not yet built. 339 tests, typecheck + biome 0/0.
| -rw-r--r-- | packages/provider-openai-compat/src/parse-sse.test.ts | 80 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/parse-sse.ts | 17 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/stream.test.ts | 1 | ||||
| -rw-r--r-- | packages/provider-openai-compat/src/stream.ts | 8 | ||||
| -rw-r--r-- | tasks.md | 17 |
5 files changed, 108 insertions, 15 deletions
diff --git a/packages/provider-openai-compat/src/parse-sse.test.ts b/packages/provider-openai-compat/src/parse-sse.test.ts index 15d652e..1910833 100644 --- a/packages/provider-openai-compat/src/parse-sse.test.ts +++ b/packages/provider-openai-compat/src/parse-sse.test.ts @@ -1,3 +1,4 @@ +import type { ProviderEvent } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; import { parseSSELines } from "./parse-sse.js"; @@ -134,6 +135,85 @@ describe("parseSSELines", () => { expect(events).toEqual([{ type: "text-delta", delta: "before" }]); }); + it("parses nested prompt_tokens_details.cached_tokens → cacheReadTokens", () => { + const lines = [ + 'data: {"id":"chatcmpl-nested","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + 'data: {"id":"chatcmpl-nested","usage":{"prompt_tokens":665,"completion_tokens":90,"prompt_tokens_details":{"cached_tokens":384},"completion_tokens_details":{"reasoning_tokens":86}}}', + "data: [DONE]", + ]; + + const events = parseSSELines(lines); + const usageEvent = events.find((e) => e.type === "usage") as Extract< + ProviderEvent, + { type: "usage" } + >; + expect(usageEvent.usage.inputTokens).toBe(665); + expect(usageEvent.usage.outputTokens).toBe(90); + expect(usageEvent.usage.cacheReadTokens).toBe(384); + expect(usageEvent.usage.cacheWriteTokens).toBeUndefined(); + }); + + it("flat cache_read_tokens takes precedence over nested cached_tokens", () => { + const lines = [ + 'data: {"id":"chatcmpl-both","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + 'data: {"id":"chatcmpl-both","usage":{"prompt_tokens":100,"completion_tokens":20,"cache_read_tokens":50,"prompt_tokens_details":{"cached_tokens":99}}}', + "data: [DONE]", + ]; + + const events = parseSSELines(lines); + const usageEvent = events.find((e) => e.type === "usage") as Extract< + ProviderEvent, + { type: "usage" } + >; + expect(usageEvent.usage.cacheReadTokens).toBe(50); + }); + + it("returns undefined for cacheReadTokens when neither flat nor nested present", () => { + const lines = [ + 'data: {"id":"chatcmpl-none","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + 'data: {"id":"chatcmpl-none","usage":{"prompt_tokens":10,"completion_tokens":5}}', + "data: [DONE]", + ]; + + const events = parseSSELines(lines); + const usageEvent = events.find((e) => e.type === "usage") as Extract< + ProviderEvent, + { type: "usage" } + >; + expect(usageEvent.usage.cacheReadTokens).toBeUndefined(); + expect(usageEvent.usage.cacheWriteTokens).toBeUndefined(); + }); + + it("handles missing/partial prompt_tokens_details safely", () => { + const lines = [ + 'data: {"id":"chatcmpl-partial","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + 'data: {"id":"chatcmpl-partial","usage":{"prompt_tokens":50,"completion_tokens":10,"prompt_tokens_details":{}}}', + "data: [DONE]", + ]; + + const events = parseSSELines(lines); + const usageEvent = events.find((e) => e.type === "usage") as Extract< + ProviderEvent, + { type: "usage" } + >; + expect(usageEvent.usage.cacheReadTokens).toBeUndefined(); + }); + + it("handles empty prompt_tokens_details object safely", () => { + const lines = [ + 'data: {"id":"chatcmpl-empty","choices":[{"delta":{},"finish_reason":"stop","index":0}]}', + 'data: {"id":"chatcmpl-empty","usage":{"prompt_tokens":30,"completion_tokens":8,"prompt_tokens_details":null}}', + "data: [DONE]", + ]; + + const events = parseSSELines(lines); + const usageEvent = events.find((e) => e.type === "usage") as Extract< + ProviderEvent, + { type: "usage" } + >; + expect(usageEvent.usage.cacheReadTokens).toBeUndefined(); + }); + it("handles a complete turn with text, tool call, usage, and finish", () => { const lines = [ 'data: {"id":"chatcmpl-9","choices":[{"delta":{"content":"Let me check."},"index":0}]}', diff --git a/packages/provider-openai-compat/src/parse-sse.ts b/packages/provider-openai-compat/src/parse-sse.ts index 0c0fd66..cfeb5b0 100644 --- a/packages/provider-openai-compat/src/parse-sse.ts +++ b/packages/provider-openai-compat/src/parse-sse.ts @@ -22,6 +22,10 @@ interface SSEChunkChoice { index: number; } +interface SSEChunkUsageDetails { + cached_tokens?: number; +} + interface SSEChunk { id?: string; choices?: SSEChunkChoice[]; @@ -30,6 +34,8 @@ interface SSEChunk { completion_tokens?: number; cache_read_tokens?: number; cache_write_tokens?: number; + prompt_tokens_details?: SSEChunkUsageDetails; + completion_tokens_details?: Record<string, unknown>; }; } @@ -105,17 +111,16 @@ export function parseSSELines(lines: readonly string[]): ProviderEvent[] { } if (chunk.usage) { + const cacheRead = + chunk.usage.cache_read_tokens ?? chunk.usage.prompt_tokens_details?.cached_tokens; + const cacheWrite = chunk.usage.cache_write_tokens; events.push({ type: "usage", usage: { inputTokens: chunk.usage.prompt_tokens ?? 0, outputTokens: chunk.usage.completion_tokens ?? 0, - ...(chunk.usage.cache_read_tokens !== undefined - ? { cacheReadTokens: chunk.usage.cache_read_tokens } - : {}), - ...(chunk.usage.cache_write_tokens !== undefined - ? { cacheWriteTokens: chunk.usage.cache_write_tokens } - : {}), + ...(cacheRead !== undefined ? { cacheReadTokens: cacheRead } : {}), + ...(cacheWrite !== undefined ? { cacheWriteTokens: cacheWrite } : {}), }, }); } diff --git a/packages/provider-openai-compat/src/stream.test.ts b/packages/provider-openai-compat/src/stream.test.ts index 3cea838..0650153 100644 --- a/packages/provider-openai-compat/src/stream.test.ts +++ b/packages/provider-openai-compat/src/stream.test.ts @@ -557,6 +557,7 @@ describe("streamChat — hermetic replay (trace-replay)", () => { expect(usageEvents).toHaveLength(1); expect(usageEvents[0]?.usage.inputTokens).toBe(665); expect(usageEvents[0]?.usage.outputTokens).toBe(90); + expect(usageEvents[0]?.usage.cacheReadTokens).toBe(384); const captured = getCapturedRequest(); assertDefined(captured); diff --git a/packages/provider-openai-compat/src/stream.ts b/packages/provider-openai-compat/src/stream.ts index 7cefb25..b60efc1 100644 --- a/packages/provider-openai-compat/src/stream.ts +++ b/packages/provider-openai-compat/src/stream.ts @@ -351,14 +351,14 @@ async function* readSSEStream( completion_tokens?: number; cache_read_tokens?: number; cache_write_tokens?: number; + prompt_tokens_details?: { cached_tokens?: number }; + completion_tokens_details?: Record<string, unknown>; } | undefined; if (usage) { - const cacheRead = - usage.cache_read_tokens !== undefined ? usage.cache_read_tokens : undefined; - const cacheWrite = - usage.cache_write_tokens !== undefined ? usage.cache_write_tokens : undefined; + const cacheRead = usage.cache_read_tokens ?? usage.prompt_tokens_details?.cached_tokens; + const cacheWrite = usage.cache_write_tokens; const usageObj: { inputTokens: number; outputTokens: number; @@ -266,11 +266,18 @@ independent of the SQLite trace-store; the lib is redaction-free (caller self-re (`src/__fixtures__/flash-text-turn.json`); reply "Hello there friend"; text-turn replay assertions updated to real values (inputTokens 665 / outputTokens 90); secret-free re-verified pre-commit. **334 tests**, typecheck + biome 0/0. -- [ ] **FINDING → decide** (real-data, D5): flash returns cache tokens in DeepSeek's NESTED - `prompt_tokens_details.cached_tokens` (665 prompt / 384 cached); the openai-compat SSE - parser only maps the FLAT `cache_read/creation` form, so cache tokens never surface — an - observability gap for the §3.1 cache-debugging goal. Surfaced to user: fix the parser - (provider unit) or defer? +- [x] **Cache-token mapping fixed** (real-data, D5): parser now maps nested + `prompt_tokens_details.cached_tokens` → `Usage.cacheReadTokens` (flat `cache_read_tokens` + still wins via `??`; `cacheWriteTokens` flat-only, never fabricated; partial/null details + safe; no contract change). +5 parser tests + real-fixture regression (`cacheReadTokens===384`). + 339 tests. reports/provider-cache-tokens.md. +- [ ] **DEFERRED — trace body de-dup / storage growth** (user-flagged): the `provider.request` + span stores the FULL post-transform request body on EVERY request → ~O(N²) body text for + long conversations (history is resent each turn; cache hits are the signature of it). + Mitigation already DESIGNED, not built: D5 "Volume control" (persist body only when + `prefix.fingerprint` changed) + §6 retention/rotation/compression; thin/fat split already + built. `cacheReadTokens` (just added) + the future `prefix.fingerprint` are the cheap dedup + signals. Revisit when cache-warming / longer conversations land. Summons: prompts/phase-a-{kernel-logging,journal-sink}.md; reports/phase-a-{kernel-logging,journal-sink}.md. |
