# frozen_string_literal: true module Dispatch module Adapter class Claude < Base # Accumulates streaming event state for an Anthropic SSE response. # # The collector is passed to each stream event handler so that state # builds up incrementally. Once the stream is complete the # content_blocks list holds fully-built block hashes ready for # ResponseBuilder or direct inspection. # # State shape: # { # response_id: nil | String, # model: String, # seed from caller, updated on message_start # content_blocks: [], # Array in arrival order; each has :index, :kind, … # finish_reason: nil | String, # usage: { input: 0, output: 0, cache_read: 0, cache_creation: 0 }, # saw_message_start: false | true, # saw_terminal: false | true, # message_stop / message_delta with stop_reason # } # # Content block Hash shapes (after content_block_stop): # # text block: # { index:, kind: "text", text: String } # # thinking block: # { index:, kind: "thinking", thinking: String, signature: String|nil } # # redacted_thinking block: # { index:, kind: "redacted_thinking", data: String } # # tool_use block: # { index:, kind: "tool_use", id: String, name: String, # partial_json: String, arguments: Hash } class StreamCollector # SSE event types that are valid BEFORE message_start ALLOWED_PRE_MESSAGE_START = %w[ping].freeze # SSE event types that signal the stream is done TERMINAL_EVENT_TYPES = %w[message_stop message_delta].freeze attr_reader :state # @param model_id [String] seed model ID; overridden by message_start # @param is_oauth [Boolean] when true, strip proxy_ prefix from tool names def initialize(model_id, is_oauth: false) @is_oauth = is_oauth @state = { response_id: nil, model: model_id.to_s, content_blocks: [], finish_reason: nil, usage: { input: 0, output: 0, cache_read: 0, cache_creation: 0 }, saw_message_start: false, saw_terminal: false } end # ── Event dispatchers ───────────────────────────────────────────────── # Process one parsed SSE event. # # @param event_type [String, nil] # @param data [Hash] # @yield [StreamDelta] (optional) caller block receives streaming deltas def handle(event_type, data, &) type = event_type.to_s # ── Pre-message_start guard ────────────────────────────────────── unless @state[:saw_message_start] if type == "ping" || type.empty? return # ping / unknown preamble → ignore end if type != "message_start" raise RequestError.new( "stream envelope: received #{type.inspect} before message_start", provider: ClaudeErrors::PROVIDER ) end end case type when "message_start" then handle_message_start(data) when "content_block_start" then handle_content_block_start(data, &) when "content_block_delta" then handle_content_block_delta(data, &) when "content_block_stop" then handle_content_block_stop(data, &) when "message_delta" then handle_message_delta(data) when "message_stop" then handle_message_stop when "ping" then nil # always safe to drop end end # ── State accessors ─────────────────────────────────────────────────── def saw_message_start? @state[:saw_message_start] end def response_id @state[:response_id] end def model @state[:model] end def usage @state[:usage] end # Return the content_blocks list (in arrival order). def content_blocks @state[:content_blocks] end # The raw Anthropic stop_reason string captured from message_delta. def finish_reason @state[:finish_reason] end # True once message_stop (or message_delta) has been received. def saw_terminal? @state[:saw_terminal] end # True if any content-block delta (text or tool_use JSON) has been # yielded to the consumer block. Used by the retry logic to determine # whether the stream is safe to replay from scratch. def consumer_output? @state[:content_blocks].any? do |blk| case blk[:kind] when "text" then blk[:text].to_s.length.positive? when "tool_use" then blk[:partial_json].to_s.length.positive? else false end end end alias has_consumer_output? consumer_output? private # ── message_start ───────────────────────────────────────────────────── # Extract response_id, model, and initial usage from message_start. # No StreamDelta is yielded — this is an envelope-only event. def handle_message_start(data) message = data["message"] || {} @state[:saw_message_start] = true @state[:response_id] = message["id"] @state[:model] = message["model"] if message["model"] usage_data = message["usage"] || {} @state[:usage][:input] = usage_data["input_tokens"].to_i @state[:usage][:output] = usage_data["output_tokens"].to_i @state[:usage][:cache_read] = usage_data["cache_read_input_tokens"].to_i @state[:usage][:cache_creation] = usage_data["cache_creation_input_tokens"].to_i end # ── message_delta ───────────────────────────────────────────────────── # Capture the final stop_reason and update usage with the authoritative # counts the API sends at the end of the stream. # No StreamDelta is yielded — this is an envelope-only event. def handle_message_delta(data) delta = data["delta"] || {} stop_reason = delta["stop_reason"] @state[:finish_reason] = stop_reason if stop_reason u = data["usage"] || {} @state[:usage][:input] = u["input_tokens"] || @state[:usage][:input] @state[:usage][:output] = u["output_tokens"] || @state[:usage][:output] @state[:usage][:cache_read] = u["cache_read_input_tokens"] || @state[:usage][:cache_read] @state[:usage][:cache_creation] = u["cache_creation_input_tokens"] || @state[:usage][:cache_creation] @state[:saw_terminal] = true end # ── message_stop ────────────────────────────────────────────────────── # Mark the stream as done. No state other than saw_terminal is mutated. def handle_message_stop @state[:saw_terminal] = true end # ── content_block_start ─────────────────────────────────────────────── # Append a new in-progress block to content_blocks and yield the # appropriate opening StreamDelta. def handle_content_block_start(data, &block) index = data["index"].to_i cb = data["content_block"] || {} kind = cb["type"].to_s case kind when "text" new_blk = { index: index, kind: kind, text: "" } @state[:content_blocks] << new_blk block&.call(StreamDelta.new(type: :text_start)) when "thinking" new_blk = { index: index, kind: kind, thinking: "", signature: nil } @state[:content_blocks] << new_blk block&.call(StreamDelta.new(type: :thinking_start)) when "tool_use" raw_name = cb["name"].to_s name = @is_oauth ? Cloaking.strip_prefix(raw_name) : raw_name new_blk = { index: index, kind: kind, id: cb["id"].to_s, name: name, partial_json: "", arguments: nil } @state[:content_blocks] << new_blk block&.call(StreamDelta.new( type: :tool_use_start, tool_call_id: new_blk[:id], tool_name: name )) when "redacted_thinking" new_blk = { index: index, kind: kind, data: cb["data"].to_s } @state[:content_blocks] << new_blk # No StreamDelta for redacted_thinking end end # ── content_block_delta ─────────────────────────────────────────────── # Append text/thinking/json fragments and yield the matching StreamDelta. def handle_content_block_delta(data, &block) index = data["index"].to_i delta = data["delta"] || {} delta_type = delta["type"].to_s blk = find_block(index) return unless blk # unknown index → skip case delta_type when "text_delta" text = delta["text"].to_s blk[:text] = blk[:text].to_s + text block&.call(StreamDelta.new(type: :text_delta, text: text)) when "thinking_delta" thinking = delta["thinking"].to_s blk[:thinking] = blk[:thinking].to_s + thinking block&.call(StreamDelta.new(type: :thinking_delta, text: thinking)) when "signature_delta" # Accumulate signature; no StreamDelta emitted sig = delta["signature"].to_s blk[:signature] = blk[:signature].to_s + sig when "input_json_delta" json_str = delta["partial_json"].to_s blk[:partial_json] = blk[:partial_json].to_s + json_str block&.call(StreamDelta.new(type: :tool_use_delta, argument_delta: json_str)) end end # ── content_block_stop ──────────────────────────────────────────────── # Finalise the block and yield the closing StreamDelta. def handle_content_block_stop(data, &block) index = data["index"].to_i blk = find_block(index) return unless blk case blk[:kind] when "text" block&.call(StreamDelta.new(type: :text_end)) when "thinking" block&.call(StreamDelta.new(type: :thinking_end)) when "tool_use" # Parse the accumulated partial JSON → arguments hash. # Tolerate broken/incomplete JSON by falling back to {}. begin blk[:arguments] = JSON.parse(blk[:partial_json] || "{}") rescue JSON::ParserError blk[:arguments] = {} end block&.call(StreamDelta.new(type: :tool_use_end)) when "redacted_thinking" nil # No StreamDelta for redacted_thinking end end # ── Helpers ─────────────────────────────────────────────────────────── # Look up an active block by its SSE index. def find_block(index) @state[:content_blocks].find { |b| b[:index] == index } end end end end end