# frozen_string_literal: true module Dispatch module Adapter class Claude < Base # Stateful SSE (Server-Sent Events) parser for the Anthropic streaming API. # # Usage: # parser = SseParser.new # response.read_body do |chunk| # parser.feed(chunk) do |event_type, data_hash| # # called once per fully-parsed event # end # end # parser.flush # raises RequestError if dangling malformed data remains # # Wire format (Anthropic): # event: \n # data: \n # \n # # Special cases handled: # - `data: [DONE]` → silently ignored # - `event: ping` → silently dropped # - JSON::ParserError mid-stream → buffered (split chunk); re-raised on flush # - Blank / comment lines → ignored class SseParser def initialize # +""" creates a mutable UTF-8 string @buffer = +"" end # Feed a chunk of raw SSE bytes. # # Scans the internal buffer for complete SSE frames (terminated by a # blank line), parses and yields each complete `(event_type, data_hash)`. # # @param chunk [String] # @yield [event_type [String, nil], data_hash [Hash]] def feed(chunk, &) @buffer << chunk.to_s # Process all complete frames (double-newline delimited). while (frame_end = find_frame_end(@buffer)) frame = @buffer.slice!(0, frame_end) # Consume the terminating blank line(s) @buffer.lstrip! parse_frame(frame, &) end end # Call after the stream ends. # # If the buffer still contains non-whitespace data it means a frame # arrived without proper termination — raise RequestError. # Silent if the buffer is empty or whitespace-only. # # @raise [RequestError] when dangling malformed data remains def flush remaining = @buffer.strip return if remaining.empty? @buffer = +"" raise RequestError.new( "SSE stream ended with incomplete frame: #{remaining[0, 200].inspect}", provider: ClaudeErrors::PROVIDER ) end private # Return the index of the end of the first complete SSE frame in buf, # i.e. the position just past the terminating blank line, or nil if # no complete frame is present. # # A blank line is two consecutive `\n` characters, possibly with a # `\r` before each (CRLF or LF endings are both supported). def find_frame_end(buf) # Match `\n\n`, `\r\n\r\n`, or `\n\r\n` m = buf.match(/\r?\n\r?\n/) return nil unless m m.end(0) end # Parse a single SSE frame and yield `(event_type, data_hash)` if the # frame carries a data payload that should be surfaced to the caller. # # Lines inside a frame: # "event: " — sets event_type # "data: " — sets data_line # "id: " — ignored # "retry: " — ignored # ": " — ignored # "" — ignored (blank lines within the frame) def parse_frame(frame, &block) event_type = nil data_lines = [] frame.each_line do |raw_line| line = raw_line.chomp if line.start_with?("event:") event_type = line[6..].strip elsif line.start_with?("data:") data_lines << line[5..].strip end # id:, retry:, comments, blanks → ignored end return if data_lines.empty? data_str = data_lines.join("\n") # Silently skip [DONE] sentinel return if data_str == "[DONE]" # Silently drop ping events return if event_type == "ping" # Parse JSON payload begin data_hash = JSON.parse(data_str) rescue JSON::ParserError => e # By the time we reach parse_frame, the outer feed() loop has # already extracted a complete, blank-line-terminated frame from # the buffer — so a JSON parse failure here means the payload is # genuinely malformed, not split across chunks. (Cross-chunk # splits are handled correctly by find_frame_end returning nil.) # # Raising immediately is critical: previously this branch # re-prepended the bad frame back into @buffer, which caused # find_frame_end to re-discover it and parse_frame to fail again # forever — a silent CPU-bound infinite loop. raise RequestError.new( "SSE frame contained invalid JSON: #{e.message} " \ "(payload preview: #{data_str[0, 200].inspect})", provider: ClaudeErrors::PROVIDER ) end # Use explicit event: line type if present, otherwise fall through to # the "type" field in the data hash (Anthropic convention). resolved_type = event_type || data_hash["type"] block&.call(resolved_type, data_hash) end end end end end