summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-05 17:53:00 +0900
committerAdam Malczewski <[email protected]>2026-06-05 17:53:00 +0900
commit8c417472e7801369c3dfd004c9c85d7d69372f7c (patch)
tree3da8eac532855a14013b502afaf30d669010c6be
parent74986a54093f6492cc4420f5917e5215f42a8f89 (diff)
downloaddispatch-8c417472e7801369c3dfd004c9c85d7d69372f7c.tar.gz
dispatch-8c417472e7801369c3dfd004c9c85d7d69372f7c.zip
feat(observability): map DeepSeek nested cache tokens (prompt_tokens_details.cached_tokens) -> Usage.cacheReadTokens
The real flash fixture showed flash reports cache usage in the NESTED prompt_tokens_details.cached_tokens form (384 cached of 665 prompt); the parser only mapped the flat cache_read_tokens form, so cache tokens never surfaced. Now: cacheReadTokens = usage.cache_read_tokens ?? usage.prompt_tokens_details?.cached_tokens (flat wins; cacheWriteTokens flat-only, never fabricated; partial/null *_details safe). No kernel contract change (Usage already has the fields). +5 parser tests + a real-fixture regression (cacheReadTokens === 384). These counts (+ a future prefix.fingerprint) are the cheap signals for body de-duplication. The broader trace-body storage-growth concern (verbatim body stored per request -> ~O(N^2) for long conversations) is logged DEFERRED in tasks.md; mitigation already designed (D5 volume control + §6 retention/rotation), not yet built. 339 tests, typecheck + biome 0/0.
-rw-r--r--packages/provider-openai-compat/src/parse-sse.test.ts80
-rw-r--r--packages/provider-openai-compat/src/parse-sse.ts17
-rw-r--r--packages/provider-openai-compat/src/stream.test.ts1
-rw-r--r--packages/provider-openai-compat/src/stream.ts8
-rw-r--r--tasks.md17
5 files changed, 108 insertions, 15 deletions
diff --git a/packages/provider-openai-compat/src/parse-sse.test.ts b/packages/provider-openai-compat/src/parse-sse.test.ts
index 15d652e..1910833 100644
--- a/packages/provider-openai-compat/src/parse-sse.test.ts
+++ b/packages/provider-openai-compat/src/parse-sse.test.ts
@@ -1,3 +1,4 @@
+import type { ProviderEvent } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
import { parseSSELines } from "./parse-sse.js";
@@ -134,6 +135,85 @@ describe("parseSSELines", () => {
expect(events).toEqual([{ type: "text-delta", delta: "before" }]);
});
+ it("parses nested prompt_tokens_details.cached_tokens → cacheReadTokens", () => {
+ const lines = [
+ 'data: {"id":"chatcmpl-nested","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ 'data: {"id":"chatcmpl-nested","usage":{"prompt_tokens":665,"completion_tokens":90,"prompt_tokens_details":{"cached_tokens":384},"completion_tokens_details":{"reasoning_tokens":86}}}',
+ "data: [DONE]",
+ ];
+
+ const events = parseSSELines(lines);
+ const usageEvent = events.find((e) => e.type === "usage") as Extract<
+ ProviderEvent,
+ { type: "usage" }
+ >;
+ expect(usageEvent.usage.inputTokens).toBe(665);
+ expect(usageEvent.usage.outputTokens).toBe(90);
+ expect(usageEvent.usage.cacheReadTokens).toBe(384);
+ expect(usageEvent.usage.cacheWriteTokens).toBeUndefined();
+ });
+
+ it("flat cache_read_tokens takes precedence over nested cached_tokens", () => {
+ const lines = [
+ 'data: {"id":"chatcmpl-both","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ 'data: {"id":"chatcmpl-both","usage":{"prompt_tokens":100,"completion_tokens":20,"cache_read_tokens":50,"prompt_tokens_details":{"cached_tokens":99}}}',
+ "data: [DONE]",
+ ];
+
+ const events = parseSSELines(lines);
+ const usageEvent = events.find((e) => e.type === "usage") as Extract<
+ ProviderEvent,
+ { type: "usage" }
+ >;
+ expect(usageEvent.usage.cacheReadTokens).toBe(50);
+ });
+
+ it("returns undefined for cacheReadTokens when neither flat nor nested present", () => {
+ const lines = [
+ 'data: {"id":"chatcmpl-none","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ 'data: {"id":"chatcmpl-none","usage":{"prompt_tokens":10,"completion_tokens":5}}',
+ "data: [DONE]",
+ ];
+
+ const events = parseSSELines(lines);
+ const usageEvent = events.find((e) => e.type === "usage") as Extract<
+ ProviderEvent,
+ { type: "usage" }
+ >;
+ expect(usageEvent.usage.cacheReadTokens).toBeUndefined();
+ expect(usageEvent.usage.cacheWriteTokens).toBeUndefined();
+ });
+
+ it("handles missing/partial prompt_tokens_details safely", () => {
+ const lines = [
+ 'data: {"id":"chatcmpl-partial","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ 'data: {"id":"chatcmpl-partial","usage":{"prompt_tokens":50,"completion_tokens":10,"prompt_tokens_details":{}}}',
+ "data: [DONE]",
+ ];
+
+ const events = parseSSELines(lines);
+ const usageEvent = events.find((e) => e.type === "usage") as Extract<
+ ProviderEvent,
+ { type: "usage" }
+ >;
+ expect(usageEvent.usage.cacheReadTokens).toBeUndefined();
+ });
+
+ it("handles empty prompt_tokens_details object safely", () => {
+ const lines = [
+ 'data: {"id":"chatcmpl-empty","choices":[{"delta":{},"finish_reason":"stop","index":0}]}',
+ 'data: {"id":"chatcmpl-empty","usage":{"prompt_tokens":30,"completion_tokens":8,"prompt_tokens_details":null}}',
+ "data: [DONE]",
+ ];
+
+ const events = parseSSELines(lines);
+ const usageEvent = events.find((e) => e.type === "usage") as Extract<
+ ProviderEvent,
+ { type: "usage" }
+ >;
+ expect(usageEvent.usage.cacheReadTokens).toBeUndefined();
+ });
+
it("handles a complete turn with text, tool call, usage, and finish", () => {
const lines = [
'data: {"id":"chatcmpl-9","choices":[{"delta":{"content":"Let me check."},"index":0}]}',
diff --git a/packages/provider-openai-compat/src/parse-sse.ts b/packages/provider-openai-compat/src/parse-sse.ts
index 0c0fd66..cfeb5b0 100644
--- a/packages/provider-openai-compat/src/parse-sse.ts
+++ b/packages/provider-openai-compat/src/parse-sse.ts
@@ -22,6 +22,10 @@ interface SSEChunkChoice {
index: number;
}
+interface SSEChunkUsageDetails {
+ cached_tokens?: number;
+}
+
interface SSEChunk {
id?: string;
choices?: SSEChunkChoice[];
@@ -30,6 +34,8 @@ interface SSEChunk {
completion_tokens?: number;
cache_read_tokens?: number;
cache_write_tokens?: number;
+ prompt_tokens_details?: SSEChunkUsageDetails;
+ completion_tokens_details?: Record<string, unknown>;
};
}
@@ -105,17 +111,16 @@ export function parseSSELines(lines: readonly string[]): ProviderEvent[] {
}
if (chunk.usage) {
+ const cacheRead =
+ chunk.usage.cache_read_tokens ?? chunk.usage.prompt_tokens_details?.cached_tokens;
+ const cacheWrite = chunk.usage.cache_write_tokens;
events.push({
type: "usage",
usage: {
inputTokens: chunk.usage.prompt_tokens ?? 0,
outputTokens: chunk.usage.completion_tokens ?? 0,
- ...(chunk.usage.cache_read_tokens !== undefined
- ? { cacheReadTokens: chunk.usage.cache_read_tokens }
- : {}),
- ...(chunk.usage.cache_write_tokens !== undefined
- ? { cacheWriteTokens: chunk.usage.cache_write_tokens }
- : {}),
+ ...(cacheRead !== undefined ? { cacheReadTokens: cacheRead } : {}),
+ ...(cacheWrite !== undefined ? { cacheWriteTokens: cacheWrite } : {}),
},
});
}
diff --git a/packages/provider-openai-compat/src/stream.test.ts b/packages/provider-openai-compat/src/stream.test.ts
index 3cea838..0650153 100644
--- a/packages/provider-openai-compat/src/stream.test.ts
+++ b/packages/provider-openai-compat/src/stream.test.ts
@@ -557,6 +557,7 @@ describe("streamChat — hermetic replay (trace-replay)", () => {
expect(usageEvents).toHaveLength(1);
expect(usageEvents[0]?.usage.inputTokens).toBe(665);
expect(usageEvents[0]?.usage.outputTokens).toBe(90);
+ expect(usageEvents[0]?.usage.cacheReadTokens).toBe(384);
const captured = getCapturedRequest();
assertDefined(captured);
diff --git a/packages/provider-openai-compat/src/stream.ts b/packages/provider-openai-compat/src/stream.ts
index 7cefb25..b60efc1 100644
--- a/packages/provider-openai-compat/src/stream.ts
+++ b/packages/provider-openai-compat/src/stream.ts
@@ -351,14 +351,14 @@ async function* readSSEStream(
completion_tokens?: number;
cache_read_tokens?: number;
cache_write_tokens?: number;
+ prompt_tokens_details?: { cached_tokens?: number };
+ completion_tokens_details?: Record<string, unknown>;
}
| undefined;
if (usage) {
- const cacheRead =
- usage.cache_read_tokens !== undefined ? usage.cache_read_tokens : undefined;
- const cacheWrite =
- usage.cache_write_tokens !== undefined ? usage.cache_write_tokens : undefined;
+ const cacheRead = usage.cache_read_tokens ?? usage.prompt_tokens_details?.cached_tokens;
+ const cacheWrite = usage.cache_write_tokens;
const usageObj: {
inputTokens: number;
outputTokens: number;
diff --git a/tasks.md b/tasks.md
index a44a781..e643d02 100644
--- a/tasks.md
+++ b/tasks.md
@@ -266,11 +266,18 @@ independent of the SQLite trace-store; the lib is redaction-free (caller self-re
(`src/__fixtures__/flash-text-turn.json`); reply "Hello there friend"; text-turn replay
assertions updated to real values (inputTokens 665 / outputTokens 90); secret-free
re-verified pre-commit. **334 tests**, typecheck + biome 0/0.
-- [ ] **FINDING → decide** (real-data, D5): flash returns cache tokens in DeepSeek's NESTED
- `prompt_tokens_details.cached_tokens` (665 prompt / 384 cached); the openai-compat SSE
- parser only maps the FLAT `cache_read/creation` form, so cache tokens never surface — an
- observability gap for the §3.1 cache-debugging goal. Surfaced to user: fix the parser
- (provider unit) or defer?
+- [x] **Cache-token mapping fixed** (real-data, D5): parser now maps nested
+ `prompt_tokens_details.cached_tokens` → `Usage.cacheReadTokens` (flat `cache_read_tokens`
+ still wins via `??`; `cacheWriteTokens` flat-only, never fabricated; partial/null details
+ safe; no contract change). +5 parser tests + real-fixture regression (`cacheReadTokens===384`).
+ 339 tests. reports/provider-cache-tokens.md.
+- [ ] **DEFERRED — trace body de-dup / storage growth** (user-flagged): the `provider.request`
+ span stores the FULL post-transform request body on EVERY request → ~O(N²) body text for
+ long conversations (history is resent each turn; cache hits are the signature of it).
+ Mitigation already DESIGNED, not built: D5 "Volume control" (persist body only when
+ `prefix.fingerprint` changed) + §6 retention/rotation/compression; thin/fat split already
+ built. `cacheReadTokens` (just added) + the future `prefix.fingerprint` are the cheap dedup
+ signals. Revisit when cache-warming / longer conversations land.
Summons: prompts/phase-a-{kernel-logging,journal-sink}.md;
reports/phase-a-{kernel-logging,journal-sink}.md.