1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
import type { ProviderEvent } from "@dispatch/kernel";
interface ToolCallAccumulator {
id: string;
name: string;
arguments: string;
}
interface SSEChunkDelta {
content?: string;
reasoning_content?: string;
tool_calls?: Array<{
index: number;
id?: string;
function?: { name?: string; arguments?: string };
}>;
}
interface SSEChunkChoice {
delta: SSEChunkDelta;
finish_reason?: string | null;
index: number;
}
interface SSEChunkUsageDetails {
cached_tokens?: number;
}
interface SSEChunk {
id?: string;
choices?: SSEChunkChoice[];
usage?: {
prompt_tokens?: number;
completion_tokens?: number;
cache_read_tokens?: number;
cache_write_tokens?: number;
prompt_tokens_details?: SSEChunkUsageDetails;
completion_tokens_details?: Record<string, unknown>;
};
}
export function parseSSELines(lines: readonly string[]): ProviderEvent[] {
const events: ProviderEvent[] = [];
const toolCalls = new Map<number, ToolCallAccumulator>();
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed.startsWith("data:")) continue;
const data = trimmed.slice(5).trim();
if (data === "[DONE]") break;
let chunk: SSEChunk;
try {
chunk = JSON.parse(data) as SSEChunk;
} catch {
events.push({ type: "error", message: `Invalid JSON in SSE data: ${data}` });
continue;
}
if (chunk.choices) {
for (const choice of chunk.choices) {
const delta = choice.delta;
if (delta.content) {
events.push({ type: "text-delta", delta: delta.content });
}
if (delta.reasoning_content) {
events.push({ type: "reasoning-delta", delta: delta.reasoning_content });
}
if (delta.tool_calls) {
for (const tc of delta.tool_calls) {
const existing = toolCalls.get(tc.index);
if (existing) {
if (tc.function?.arguments) {
existing.arguments += tc.function.arguments;
}
} else {
toolCalls.set(tc.index, {
id: tc.id ?? "",
name: tc.function?.name ?? "",
arguments: tc.function?.arguments ?? "",
});
}
}
}
if (choice.finish_reason) {
const sortedIndices = [...toolCalls.keys()].sort((a, b) => a - b);
for (const idx of sortedIndices) {
const acc = toolCalls.get(idx);
if (!acc) continue;
let input: unknown;
try {
input = JSON.parse(acc.arguments);
} catch {
input = acc.arguments;
}
events.push({
type: "tool-call",
toolCallId: acc.id,
toolName: acc.name,
input,
});
}
events.push({ type: "finish", reason: choice.finish_reason });
}
}
}
if (chunk.usage) {
const cacheRead =
chunk.usage.cache_read_tokens ?? chunk.usage.prompt_tokens_details?.cached_tokens;
const cacheWrite = chunk.usage.cache_write_tokens;
events.push({
type: "usage",
usage: {
inputTokens: chunk.usage.prompt_tokens ?? 0,
outputTokens: chunk.usage.completion_tokens ?? 0,
...(cacheRead !== undefined ? { cacheReadTokens: cacheRead } : {}),
...(cacheWrite !== undefined ? { cacheWriteTokens: cacheWrite } : {}),
},
});
}
}
return events;
}
|