1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
# frozen_string_literal: true
RSpec.describe Dispatch::Adapter::Claude::SseParser do
subject(:parser) { described_class.new }
# Helper: feed the entire string at once and collect yielded events
def events_from(input)
collected = []
parser.feed(input) { |type, data| collected << [type, data] }
collected
end
# Helper: feed in multiple chunks and collect events from each
def events_from_chunks(*chunks)
collected = []
chunks.each { |c| parser.feed(c) { |type, data| collected << [type, data] } }
collected
end
# ── Single-chunk stream ───────────────────────────────────────────────────
describe "#feed — single chunk" do
let(:single_frame) do
"event: message_start\n" \
"data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_01\"}}\n" \
"\n"
end
it "yields one event" do
expect(events_from(single_frame).size).to eq(1)
end
it "yields the correct event type" do
type, _data = events_from(single_frame).first
expect(type).to eq("message_start")
end
it "yields the parsed data hash" do
_type, data = events_from(single_frame).first
expect(data).to eq({ "type" => "message_start", "message" => { "id" => "msg_01" } })
end
context "with multiple frames in a single chunk" do
let(:two_frames) do
"event: message_start\n" \
"data: {\"type\":\"message_start\"}\n" \
"\n" \
"event: content_block_start\n" \
"data: {\"type\":\"content_block_start\",\"index\":0}\n" \
"\n"
end
it "yields both events in order" do
evts = events_from(two_frames)
expect(evts.size).to eq(2)
expect(evts.map(&:first)).to eq(%w[message_start content_block_start])
end
end
end
# ── [DONE] sentinel ────────────────────────────────────────────────────────
describe "#feed — [DONE] is ignored" do
it "does not yield an event for a [DONE] data line" do
chunk = "data: [DONE]\n\n"
expect(events_from(chunk)).to be_empty
end
it "still yields events that precede [DONE]" do
chunk = "data: {\"type\":\"message_stop\"}\n\ndata: [DONE]\n\n"
evts = events_from(chunk)
expect(evts.size).to eq(1)
end
end
# ── ping events ────────────────────────────────────────────────────────────
describe "#feed — ping events are silently dropped" do
it "does not yield a ping event" do
chunk = "event: ping\ndata: {}\n\n"
expect(events_from(chunk)).to be_empty
end
it "still yields non-ping events surrounding a ping" do
chunk =
"event: message_start\n" \
"data: {\"type\":\"message_start\"}\n" \
"\n" \
"event: ping\n" \
"data: {}\n" \
"\n" \
"event: content_block_start\n" \
"data: {\"type\":\"content_block_start\",\"index\":0}\n" \
"\n"
evts = events_from(chunk)
expect(evts.size).to eq(2)
expect(evts.map(&:first)).to eq(%w[message_start content_block_start])
end
end
# ── Split chunks ──────────────────────────────────────────────────────────
describe "#feed — stream split mid-JSON resumes correctly" do
it "buffers a partial frame and emits it when the rest arrives" do
first_half = "event: content_block_delta\ndata: {\"type\":\"content_block_de"
second_half = "lta\",\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n"
evts = events_from_chunks(first_half, second_half)
expect(evts.size).to eq(1)
type, data = evts.first
expect(type).to eq("content_block_delta")
expect(data["delta"]["text"]).to eq("hi")
end
it "buffers a frame split at the blank-line boundary" do
first_half = "event: message_start\ndata: {\"type\":\"message_start\"}\n"
second_half = "\n"
evts = events_from_chunks(first_half, second_half)
expect(evts.size).to eq(1)
expect(evts.first.first).to eq("message_start")
end
it "handles three-chunk splits correctly" do
chunks = [
"event: content_block_delta\n",
"data: {\"type\":\"content_block_delta\",",
"\"index\":0}\n\n"
]
evts = events_from_chunks(*chunks)
expect(evts.size).to eq(1)
expect(evts.first.last["type"]).to eq("content_block_delta")
end
end
# ── event type fallback ────────────────────────────────────────────────────
describe "event type resolution" do
it "uses explicit event: line when present" do
chunk = "event: content_block_stop\ndata: {\"type\":\"something_else\"}\n\n"
type, _data = events_from(chunk).first
expect(type).to eq("content_block_stop")
end
it "falls back to data hash 'type' when no event: line" do
chunk = "data: {\"type\":\"message_delta\"}\n\n"
type, _data = events_from(chunk).first
expect(type).to eq("message_delta")
end
it "yields nil type when neither event: nor type in data" do
chunk = "data: {\"foo\":\"bar\"}\n\n"
type, _data = events_from(chunk).first
expect(type).to be_nil
end
end
# ── CRLF line endings ─────────────────────────────────────────────────────
describe "#feed — CRLF line endings" do
it "handles \\r\\n line endings" do
chunk = "event: message_start\r\ndata: {\"type\":\"message_start\"}\r\n\r\n"
evts = events_from(chunk)
expect(evts.size).to eq(1)
expect(evts.first.first).to eq("message_start")
end
end
# ── flush ─────────────────────────────────────────────────────────────────
describe "#flush" do
it "does not raise when buffer is empty" do
expect { parser.flush }.not_to raise_error
end
it "does not raise when buffer contains only whitespace" do
parser.feed(" \n ")
expect { parser.flush }.not_to raise_error
end
it "raises RequestError when there is a dangling incomplete frame" do
# Feed a frame without the blank-line terminator
parser.feed("event: content_block_delta\ndata: {\"type\":\"content_block_delta\"}")
expect { parser.flush }.to raise_error(Dispatch::Adapter::RequestError, /incomplete frame/i)
end
it "clears the buffer after a flush error" do
parser.feed("partial data without terminator")
begin
parser.flush
rescue Dispatch::Adapter::RequestError
nil
end
expect { parser.flush }.not_to raise_error
end
end
# ── Real Anthropic stream fixture ─────────────────────────────────────────
describe "realistic Anthropic stream" do
let(:stream) do
<<~SSE
event: message_start
data: {"type":"message_start","message":{"id":"msg_01","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4-6","stop_reason":null,"usage":{"input_tokens":10,"output_tokens":0}}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: ping
data: {}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" World"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":5}}
event: message_stop
data: {"type":"message_stop"}
SSE
end
it "yields events for all non-ping frames" do
evts = events_from(stream)
types = evts.map(&:first)
expect(types).to include("message_start", "content_block_start",
"content_block_delta", "content_block_stop",
"message_delta", "message_stop")
end
it "does not yield a ping event" do
types = events_from(stream).map(&:first)
expect(types).not_to include("ping")
end
it "yields the correct number of non-ping events (7 expected)" do
# message_start + content_block_start + 2×content_block_delta +
# content_block_stop + message_delta + message_stop = 7
expect(events_from(stream).size).to eq(7)
end
it "text deltas contain the right text" do
evts = events_from(stream)
deltas = evts.select { |_t, d| d["type"] == "content_block_delta" }
texts = deltas.map { |_t, d| d.dig("delta", "text") }
expect(texts).to eq(["Hello", " World"])
end
it "flush is a no-op after a complete stream" do
events_from(stream)
expect { parser.flush }.not_to raise_error
end
end
end
|