# frozen_string_literal: true RSpec.describe Dispatch::Adapter::Claude::SseParser do subject(:parser) { described_class.new } # Helper: feed the entire string at once and collect yielded events def events_from(input) collected = [] parser.feed(input) { |type, data| collected << [type, data] } collected end # Helper: feed in multiple chunks and collect events from each def events_from_chunks(*chunks) collected = [] chunks.each { |c| parser.feed(c) { |type, data| collected << [type, data] } } collected end # ── Single-chunk stream ─────────────────────────────────────────────────── describe "#feed — single chunk" do let(:single_frame) do "event: message_start\n" \ "data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_01\"}}\n" \ "\n" end it "yields one event" do expect(events_from(single_frame).size).to eq(1) end it "yields the correct event type" do type, _data = events_from(single_frame).first expect(type).to eq("message_start") end it "yields the parsed data hash" do _type, data = events_from(single_frame).first expect(data).to eq({ "type" => "message_start", "message" => { "id" => "msg_01" } }) end context "with multiple frames in a single chunk" do let(:two_frames) do "event: message_start\n" \ "data: {\"type\":\"message_start\"}\n" \ "\n" \ "event: content_block_start\n" \ "data: {\"type\":\"content_block_start\",\"index\":0}\n" \ "\n" end it "yields both events in order" do evts = events_from(two_frames) expect(evts.size).to eq(2) expect(evts.map(&:first)).to eq(%w[message_start content_block_start]) end end end # ── [DONE] sentinel ──────────────────────────────────────────────────────── describe "#feed — [DONE] is ignored" do it "does not yield an event for a [DONE] data line" do chunk = "data: [DONE]\n\n" expect(events_from(chunk)).to be_empty end it "still yields events that precede [DONE]" do chunk = "data: {\"type\":\"message_stop\"}\n\ndata: [DONE]\n\n" evts = events_from(chunk) expect(evts.size).to eq(1) end end # ── ping events ──────────────────────────────────────────────────────────── describe "#feed — ping events are silently dropped" do it "does not yield a ping event" do chunk = "event: ping\ndata: {}\n\n" expect(events_from(chunk)).to be_empty end it "still yields non-ping events surrounding a ping" do chunk = "event: message_start\n" \ "data: {\"type\":\"message_start\"}\n" \ "\n" \ "event: ping\n" \ "data: {}\n" \ "\n" \ "event: content_block_start\n" \ "data: {\"type\":\"content_block_start\",\"index\":0}\n" \ "\n" evts = events_from(chunk) expect(evts.size).to eq(2) expect(evts.map(&:first)).to eq(%w[message_start content_block_start]) end end # ── Split chunks ────────────────────────────────────────────────────────── describe "#feed — stream split mid-JSON resumes correctly" do it "buffers a partial frame and emits it when the rest arrives" do first_half = "event: content_block_delta\ndata: {\"type\":\"content_block_de" second_half = "lta\",\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n" evts = events_from_chunks(first_half, second_half) expect(evts.size).to eq(1) type, data = evts.first expect(type).to eq("content_block_delta") expect(data["delta"]["text"]).to eq("hi") end it "buffers a frame split at the blank-line boundary" do first_half = "event: message_start\ndata: {\"type\":\"message_start\"}\n" second_half = "\n" evts = events_from_chunks(first_half, second_half) expect(evts.size).to eq(1) expect(evts.first.first).to eq("message_start") end it "handles three-chunk splits correctly" do chunks = [ "event: content_block_delta\n", "data: {\"type\":\"content_block_delta\",", "\"index\":0}\n\n" ] evts = events_from_chunks(*chunks) expect(evts.size).to eq(1) expect(evts.first.last["type"]).to eq("content_block_delta") end end # ── event type fallback ──────────────────────────────────────────────────── describe "event type resolution" do it "uses explicit event: line when present" do chunk = "event: content_block_stop\ndata: {\"type\":\"something_else\"}\n\n" type, _data = events_from(chunk).first expect(type).to eq("content_block_stop") end it "falls back to data hash 'type' when no event: line" do chunk = "data: {\"type\":\"message_delta\"}\n\n" type, _data = events_from(chunk).first expect(type).to eq("message_delta") end it "yields nil type when neither event: nor type in data" do chunk = "data: {\"foo\":\"bar\"}\n\n" type, _data = events_from(chunk).first expect(type).to be_nil end end # ── CRLF line endings ───────────────────────────────────────────────────── describe "#feed — CRLF line endings" do it "handles \\r\\n line endings" do chunk = "event: message_start\r\ndata: {\"type\":\"message_start\"}\r\n\r\n" evts = events_from(chunk) expect(evts.size).to eq(1) expect(evts.first.first).to eq("message_start") end end # ── flush ───────────────────────────────────────────────────────────────── describe "#flush" do it "does not raise when buffer is empty" do expect { parser.flush }.not_to raise_error end it "does not raise when buffer contains only whitespace" do parser.feed(" \n ") expect { parser.flush }.not_to raise_error end it "raises RequestError when there is a dangling incomplete frame" do # Feed a frame without the blank-line terminator parser.feed("event: content_block_delta\ndata: {\"type\":\"content_block_delta\"}") expect { parser.flush }.to raise_error(Dispatch::Adapter::RequestError, /incomplete frame/i) end it "clears the buffer after a flush error" do parser.feed("partial data without terminator") begin parser.flush rescue Dispatch::Adapter::RequestError nil end expect { parser.flush }.not_to raise_error end end # ── Real Anthropic stream fixture ───────────────────────────────────────── describe "realistic Anthropic stream" do let(:stream) do <<~SSE event: message_start data: {"type":"message_start","message":{"id":"msg_01","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4-6","stop_reason":null,"usage":{"input_tokens":10,"output_tokens":0}}} event: content_block_start data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} event: ping data: {} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}} event: content_block_delta data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" World"}} event: content_block_stop data: {"type":"content_block_stop","index":0} event: message_delta data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":5}} event: message_stop data: {"type":"message_stop"} SSE end it "yields events for all non-ping frames" do evts = events_from(stream) types = evts.map(&:first) expect(types).to include("message_start", "content_block_start", "content_block_delta", "content_block_stop", "message_delta", "message_stop") end it "does not yield a ping event" do types = events_from(stream).map(&:first) expect(types).not_to include("ping") end it "yields the correct number of non-ping events (7 expected)" do # message_start + content_block_start + 2×content_block_delta + # content_block_stop + message_delta + message_stop = 7 expect(events_from(stream).size).to eq(7) end it "text deltas contain the right text" do evts = events_from(stream) deltas = evts.select { |_t, d| d["type"] == "content_block_delta" } texts = deltas.map { |_t, d| d.dig("delta", "text") } expect(texts).to eq(["Hello", " World"]) end it "flush is a no-op after a complete stream" do events_from(stream) expect { parser.flush }.not_to raise_error end end end