summaryrefslogtreecommitdiffhomepage
path: root/lib/dispatch/adapter/claude/sse_parser.rb
blob: b4b65be15068541b8524e623aa92153ceb320b78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# frozen_string_literal: true

module Dispatch
  module Adapter
    class Claude < Base
      # Stateful SSE (Server-Sent Events) parser for the Anthropic streaming API.
      #
      # Usage:
      #   parser = SseParser.new
      #   response.read_body do |chunk|
      #     parser.feed(chunk) do |event_type, data_hash|
      #       # called once per fully-parsed event
      #     end
      #   end
      #   parser.flush   # raises RequestError if dangling malformed data remains
      #
      # Wire format (Anthropic):
      #   event: <event_type>\n
      #   data: <json_payload>\n
      #   \n
      #
      # Special cases handled:
      #   - `data: [DONE]`        → silently ignored
      #   - `event: ping`         → silently dropped
      #   - JSON::ParserError mid-stream → buffered (split chunk); re-raised on flush
      #   - Blank / comment lines  → ignored
      class SseParser
        def initialize
          # +""" creates a mutable UTF-8 string
          @buffer = +""
        end

        # Feed a chunk of raw SSE bytes.
        #
        # Scans the internal buffer for complete SSE frames (terminated by a
        # blank line), parses and yields each complete `(event_type, data_hash)`.
        #
        # @param chunk [String]
        # @yield [event_type [String, nil], data_hash [Hash]]
        def feed(chunk, &)
          @buffer << chunk.to_s

          # Process all complete frames (double-newline delimited).
          while (frame_end = find_frame_end(@buffer))
            frame = @buffer.slice!(0, frame_end)
            # Consume the terminating blank line(s)
            @buffer.lstrip!

            parse_frame(frame, &)
          end
        end

        # Call after the stream ends.
        #
        # If the buffer still contains non-whitespace data it means a frame
        # arrived without proper termination — raise RequestError.
        # Silent if the buffer is empty or whitespace-only.
        #
        # @raise [RequestError] when dangling malformed data remains
        def flush
          remaining = @buffer.strip
          return if remaining.empty?

          @buffer = +""
          raise RequestError.new(
            "SSE stream ended with incomplete frame: #{remaining[0, 200].inspect}",
            provider: ClaudeErrors::PROVIDER
          )
        end

        private

        # Return the index of the end of the first complete SSE frame in buf,
        # i.e. the position just past the terminating blank line, or nil if
        # no complete frame is present.
        #
        # A blank line is two consecutive `\n` characters, possibly with a
        # `\r` before each (CRLF or LF endings are both supported).
        def find_frame_end(buf)
          # Match `\n\n`, `\r\n\r\n`, or `\n\r\n`
          m = buf.match(/\r?\n\r?\n/)
          return nil unless m

          m.end(0)
        end

        # Parse a single SSE frame and yield `(event_type, data_hash)` if the
        # frame carries a data payload that should be surfaced to the caller.
        #
        # Lines inside a frame:
        #   "event: <name>"     — sets event_type
        #   "data: <payload>"   — sets data_line
        #   "id: <id>"          — ignored
        #   "retry: <ms>"       — ignored
        #   ": <comment>"       — ignored
        #   ""                  — ignored (blank lines within the frame)
        def parse_frame(frame, &block)
          event_type = nil
          data_lines = []

          frame.each_line do |raw_line|
            line = raw_line.chomp

            if line.start_with?("event:")
              event_type = line[6..].strip
            elsif line.start_with?("data:")
              data_lines << line[5..].strip
            end
            # id:, retry:, comments, blanks → ignored
          end

          return if data_lines.empty?

          data_str = data_lines.join("\n")

          # Silently skip [DONE] sentinel
          return if data_str == "[DONE]"

          # Silently drop ping events
          return if event_type == "ping"

          # Parse JSON payload
          begin
            data_hash = JSON.parse(data_str)
          rescue JSON::ParserError => e
            # By the time we reach parse_frame, the outer feed() loop has
            # already extracted a complete, blank-line-terminated frame from
            # the buffer — so a JSON parse failure here means the payload is
            # genuinely malformed, not split across chunks. (Cross-chunk
            # splits are handled correctly by find_frame_end returning nil.)
            #
            # Raising immediately is critical: previously this branch
            # re-prepended the bad frame back into @buffer, which caused
            # find_frame_end to re-discover it and parse_frame to fail again
            # forever — a silent CPU-bound infinite loop.
            raise RequestError.new(
              "SSE frame contained invalid JSON: #{e.message} " \
              "(payload preview: #{data_str[0, 200].inspect})",
              provider: ClaudeErrors::PROVIDER
            )
          end

          # Use explicit event: line type if present, otherwise fall through to
          # the "type" field in the data hash (Anthropic convention).
          resolved_type = event_type || data_hash["type"]

          block&.call(resolved_type, data_hash)
        end
      end
    end
  end
end