# frozen_string_literal: true require "net/http" require "uri" require "json" require "securerandom" require "fileutils" require "digest" require "base64" require "dispatch/adapter/interface" require_relative "claude/version" require_relative "claude/errors" require_relative "claude/token_store" require_relative "claude/pkce" require_relative "claude/oauth" require_relative "claude/headers" require_relative "claude/cloaking" require_relative "claude/pricing_table" require_relative "claude/model_catalog" require_relative "claude/request_builder" require_relative "claude/response_builder" require_relative "claude/stream_collector" require_relative "claude/sse_parser" require_relative "claude/http_client" require_relative "claude/usage_client" require_relative "claude/rate_limit_headers" module Dispatch module Adapter class Claude < Base VERSION = ClaudeVersion::VERSION DEFAULT_MODEL = "claude-sonnet-4-5-20250929" MESSAGES_PATH = "/v1/messages" COUNT_TOKENS_PATH = "/v1/messages/count_tokens" MODELS_PATH = "/v1/models" DEFAULT_BASE_URL = "https://api.anthropic.com" # list_models cache TTL (1 hour in milliseconds) MODELS_CACHE_TTL_MS = 3_600_000 # Keys that must be stripped before POSTing to count_tokens — the # endpoint rejects them with a 400 error. COUNT_TOKENS_STRIP_KEYS = %i[stream max_tokens metadata output_config].freeze # Default minimum interval between outbound requests (seconds). DEFAULT_MIN_REQUEST_INTERVAL = 1.0 # @param model [String] Anthropic model ID, e.g. "claude-sonnet-4-5-20250929" # @param api_key [String, nil] Raw API key or OAuth token; nil → load from # the TokenStore at initialization time. # @param token_path [String, nil] Custom path for the OAuth token store file. # Ignored when token_store: is also provided. # @param base_url [String] API base URL (default: https://api.anthropic.com) # @param max_tokens [Integer, nil] Instance-level default max_tokens for chat. # @param thinking [String, Hash, nil, false] Instance-level default thinking config. # Defaults to "high" so adaptive/extended thinking is enabled wherever # the model supports it. Pass nil or false to disable, or pass a Hash # like {type: :enabled, budget_tokens: 8000} for explicit budget mode. # Per-call `thinking:` kwarg on chat() overrides this. # @param cache_retention [Symbol, nil] Instance-level default cache retention. # @param user_agent_override [String, nil] Custom User-Agent header value. # @param is_oauth [Boolean, nil] Override OAuth detection; nil = auto-detect # from token prefix. # @param token_store [TokenStore, nil] Custom credential store (for testing). # @param extra_betas [Array] Additional beta header values. def initialize( model: DEFAULT_MODEL, api_key: nil, token_path: nil, base_url: DEFAULT_BASE_URL, max_tokens: nil, thinking: "high", cache_retention: nil, user_agent_override: nil, is_oauth: nil, token_store: nil, min_request_interval: DEFAULT_MIN_REQUEST_INTERVAL, rate_limit: nil, extra_betas: [] ) super() @model = model.to_s @base_url = base_url.to_s.chomp("/") @max_tokens = max_tokens @thinking = thinking @cache_retention = cache_retention @user_agent_override = user_agent_override @extra_betas = Array(extra_betas) @token_store = token_store || (token_path ? TokenStore.new(path: token_path) : TokenStore.new) @is_oauth_override = is_oauth # nil = auto-detect # Track whether an explicit API key was supplied by the caller. # This distinguishes "has an explicit key" from "loaded from token store". @explicit_api_key = api_key && !api_key.to_s.strip.empty? ? api_key.to_s : nil # Resolve credentials at construction time so that later calls can # reliably inspect is_oauth? without triggering further I/O. @api_key = resolve_api_key(api_key) @is_oauth = resolve_is_oauth(@api_key, is_oauth) # When true, all subsequent requests skip strict tool-schema validation. # Set automatically when a "compiled grammar too large" 400 is received. @strict_disabled = false # Build the rate limiter using the same directory as the token store. rate_limit_path = File.join(File.dirname(@token_store.path), "claude_rate_limit") @rate_limiter = RateLimiter.new( rate_limit_path: rate_limit_path, min_request_interval: min_request_interval, rate_limit: rate_limit ) # Stores the most recent parsed rate-limit headers (populated after # each chat/count_tokens/list_models call). @rate_limit_info = nil @last_all_headers = nil # Path for the JSONL debug log of rate-limit header snapshots. # Defaults to ~/.config/dispatch/claude_ratelimit.jsonl @rate_limit_log_path = File.join(File.dirname(@token_store.path), "claude_ratelimit.jsonl") end # ── Adapter interface ─────────────────────────────────────────────────── def model_name @model end def provider_name "Anthropic (Claude)" end # Returns the context window size for the current model, or nil if unknown. # # @return [Integer, nil] def max_context_tokens PricingTable.context_window(@model) end # Returns the most recently captured rate-limit header info, or nil if # no API call has been made yet. # # @return [RateLimitHeaders::Info, nil] attr_reader :rate_limit_info # Returns ALL response headers from the most recent API call, as a Hash. # Useful for diagnosing which headers Anthropic actually sends. # # @return [Hash{String => String}, nil] attr_reader :last_response_headers # ── Auth lifecycle ────────────────────────────────────────────────────── # Ensure valid credentials are present, performing interactive login or # token refresh as needed. # # Returns: # :api_key — an explicit API key was supplied; nothing to do. # :cached — a non-expired OAuth token is already stored. # :refreshed — the stored token was expired; it was refreshed. # :logged_in — no credentials existed; interactive OAuth flow completed. # # @return [Symbol] def authenticate! return :api_key if explicit_api_key_present? creds = @token_store.load if creds && !expired?(creds) @api_key = creds["access_token"] @is_oauth = resolve_is_oauth(@api_key, @is_oauth_override) return :cached end if creds && creds["refresh_token"] refreshed = OAuth.refresh!(creds["refresh_token"]) @token_store.save(creds.merge(refreshed)) @api_key = refreshed["access_token"] @is_oauth = resolve_is_oauth(@api_key, @is_oauth_override) return :refreshed end fresh = OAuth.login(token_store: @token_store) @token_store.save(fresh) @api_key = fresh["access_token"] @is_oauth = resolve_is_oauth(@api_key, @is_oauth_override) :logged_in end # True iff the adapter has a usable credential (explicit key or stored token). # # @return [Boolean] def authenticated? return true if explicit_api_key_present? creds = @token_store.load !!(creds && creds["access_token"]) end # Remove stored OAuth credentials and clear the in-memory token. # Has no effect when using an explicit API key. # # @return [nil] def logout! @token_store.delete @api_key = nil @is_oauth = false nil end def list_models now = current_time_ms return @models_cache if @models_cache && (now - @models_cache_at) < MODELS_CACHE_TTL_MS with_rate_limit do runtime_list = fetch_runtime_models models = merge_model_lists(runtime_list) @models_cache = models @models_cache_at = current_time_ms models end end # Count the input tokens for a hypothetical chat request without # generating a response. Uses Anthropic's dedicated endpoint # `POST /v1/messages/count_tokens`. # # @param messages [Array] # @param system [String, Array, nil] # @param tools [Array, nil] # @return [Integer] token count, or -1 on any error def count_tokens(messages, system: nil, tools: []) with_rate_limit do params = RequestBuilder.build( model_id: @model, messages: Array(messages), system: system, tools: Array(tools), is_oauth: @is_oauth, base_url: @base_url, stream: false, max_tokens: nil, thinking: nil, tool_choice: nil, cache_retention: nil, metadata: nil, disable_strict_tools: false ) # Strip fields that count_tokens rejects. COUNT_TOKENS_STRIP_KEYS.each { |k| params.delete(k) } json = http_client(stream: false).post_json(COUNT_TOKENS_PATH, params) json["input_tokens"].to_i end rescue StandardError -1 end # Fetch subscription usage (OAuth-only). # # @return [UsageReport, nil] def usage_report with_rate_limit do with_auth_recovery do ensure_token! UsageClient.fetch( http_client_claude_code, is_oauth: @is_oauth, on_rate_limit: method(:rotate_token_for_usage).to_proc ) end end end # Send a chat request. # # When `stream: false` (default) the full response is buffered and a # Response is returned. When `stream: true` the SSE events drive the # StreamCollector; deltas are yielded to the block. # # `thinking: :default` and `cache_retention: :default` resolve to # the instance-level defaults set at construction time. # # On a 400 "compiled grammar too large" / "schema too complex" error, # the request is automatically retried once with strict tool schemas # disabled, and `@strict_disabled` is set for all subsequent calls. def chat( messages, system: nil, tools: [], stream: false, max_tokens: nil, thinking: :default, tool_choice: nil, cache_retention: :default, metadata: nil, betas: nil, # rubocop:disable Lint/UnusedMethodArgument &block ) with_rate_limit do with_auth_recovery do ensure_token! # Resolve :default sentinels to instance-level defaults. effective_thinking = thinking == :default ? @thinking : thinking effective_cache_retention = cache_retention == :default ? @cache_retention : cache_retention effective_max_tokens = max_tokens || @max_tokens params = build_chat_params( messages: messages, system: system, tools: tools, stream: stream, max_tokens: effective_max_tokens, thinking: effective_thinking, tool_choice: tool_choice, cache_retention: effective_cache_retention, metadata: metadata, disable_strict_tools: @strict_disabled ) if stream chat_streaming_with_strict_fallback(params, messages, system, tools, stream, effective_max_tokens, effective_thinking, tool_choice, effective_cache_retention, metadata, &block) else chat_non_streaming_with_strict_fallback(params, messages, system, tools, stream, effective_max_tokens, effective_thinking, tool_choice, effective_cache_retention, metadata) end end end end # ── Streaming constants ─────────────────────────────────────────────── # Maximum time (ms) to wait for the first SSE message_start event. # Reads the env var at load time; tests may override via stubbing # `stream_first_event_timeout_ms`. STREAM_FIRST_EVENT_TIMEOUT_MS = ENV.fetch("STREAM_FIRST_EVENT_TIMEOUT_MS", 60_000).to_i.freeze # Retry policy constants. STREAM_MAX_RETRIES = 3 STREAM_BASE_DELAY_MS = 2_000 private # ── Strict-fallback helpers ─────────────────────────────────────────── # Returns true iff the error is a 400 RequestError whose message # indicates the compiled grammar was too large or the schema was too # complex to compile. def strict_grammar_error?(err) return false unless err.is_a?(RequestError) && err.status_code == 400 msg = err.message.to_s !!( (msg.match?(/compiled grammar/i) && msg.match?(/too large/i)) || (msg.match?(/schema/i) && msg.match?(/too complex/i) && msg.match?(/compil/i)) ) end # Build request params, extracting common logic used in both the initial # attempt and the strict-fallback retry. def build_chat_params(messages:, system:, tools:, stream:, max_tokens:, thinking:, tool_choice:, cache_retention:, metadata:, disable_strict_tools:) RequestBuilder.build( model_id: @model, messages: Array(messages), system: system, tools: Array(tools), is_oauth: @is_oauth, base_url: @base_url, stream: stream ? true : false, max_tokens: max_tokens, thinking: thinking, tool_choice: tool_choice, cache_retention: cache_retention, metadata: metadata, disable_strict_tools: disable_strict_tools ) end # Non-streaming chat with automatic strict-tool fallback on grammar errors. def chat_non_streaming_with_strict_fallback(params, messages, system, tools, stream, max_tokens, thinking, tool_choice, cache_retention, metadata) capture_cb = method(:capture_rate_limit_headers) json = http_client(stream: false).post_json(MESSAGES_PATH, params, on_response: capture_cb) model_info = ModelCatalog.build(@model) ResponseBuilder.build(json, model_info: model_info, is_oauth: @is_oauth) rescue RequestError => e raise unless strict_grammar_error?(e) # Disable strict tools permanently for this adapter instance. @strict_disabled = true # Rebuild params without strict tool schemas and retry once. fallback_params = build_chat_params( messages: messages, system: system, tools: tools, stream: stream, max_tokens: max_tokens, thinking: thinking, tool_choice: tool_choice, cache_retention: cache_retention, metadata: metadata, disable_strict_tools: true ) json = http_client(stream: false).post_json(MESSAGES_PATH, fallback_params, on_response: capture_cb) model_info = ModelCatalog.build(@model) ResponseBuilder.build(json, model_info: model_info, is_oauth: @is_oauth) end # Streaming chat with automatic strict-tool fallback on grammar errors. def chat_streaming_with_strict_fallback(params, messages, system, tools, stream, max_tokens, thinking, tool_choice, cache_retention, metadata, &) chat_streaming(params, &) rescue RequestError => e raise unless strict_grammar_error?(e) # Disable strict tools permanently for this adapter instance. @strict_disabled = true # Rebuild params without strict tool schemas and retry once. fallback_params = build_chat_params( messages: messages, system: system, tools: tools, stream: stream, max_tokens: max_tokens, thinking: thinking, tool_choice: tool_choice, cache_retention: cache_retention, metadata: metadata, disable_strict_tools: true ) chat_streaming(fallback_params, &) end # ── Internal helpers ────────────────────────────────────────────────── # Streaming chat path. Yields StreamDelta events to the caller block # and returns a Response when the stream is complete. # # The SSE parser drives the StreamCollector. Transient failures that # occur before any content has been emitted to the consumer are retried # up to STREAM_MAX_RETRIES times with exponential back-off. def chat_streaming(params, &block) attempt = 0 loop do collector = StreamCollector.new(@model, is_oauth: @is_oauth) parser = SseParser.new retry_reason = nil last_error = nil begin deadline_ms = stream_first_event_timeout_ms request_started_at = current_time_ms http_client(stream: true).stream(MESSAGES_PATH, params) do |response| ClaudeErrors.handle_response!(response) unless response.is_a?(Net::HTTPSuccess) capture_rate_limit_headers(response) response.read_body do |chunk| # Watchdog: abort if message_start hasn't arrived in time. if !collector.saw_message_start? && (current_time_ms - request_started_at) > deadline_ms retry_reason = :first_event_timeout raise RetriableStreamError, "first-event timeout (#{deadline_ms}ms)" end parser.feed(chunk) do |event_type, data| collector.handle(event_type, data, &block) end end parser.flush end # ── Post-stream integrity checks ────────────────────────────── unless collector.saw_message_start? retry_reason = :no_message_start raise RetriableStreamError, "stream ended before message_start" end unless collector.saw_terminal? retry_reason = :no_terminal raise RetriableStreamError, "stream ended before message_stop/message_delta" end rescue RetriableStreamError => e last_error = e # fall through to retry decision below rescue RequestError, ConnectionError => e # An HTTP-level RequestError (status_code present) must surface # immediately — it may be a strict-grammar 400 that the caller # wants to catch, or a genuine 4xx that should not be retried. raise e if e.is_a?(RequestError) && e.status_code # A parse error or network error — only retry if no output yet. last_error = e retry_reason = if collector.consumer_output? nil # not safe to retry else :parse_error end rescue RateLimitError, ServerError, OverloadedError => e # Transient HTTP errors — retry if no consumer output yet. last_error = e retry_reason = if collector.consumer_output? nil else :transient_http end rescue StandardError => e last_error = e retry_reason = nil # non-retriable end # ── Retry decision ──────────────────────────────────────────────── if retry_reason && attempt < STREAM_MAX_RETRIES attempt += 1 sleep_ms = STREAM_BASE_DELAY_MS * (2**(attempt - 1)) sleep(sleep_ms / 1000.0) next end # ── Success or give-up ──────────────────────────────────────────── if last_error && retry_reason # Give up after exhausting retries: build an error response model_info = ModelCatalog.build(@model) usage = Usage.new(input_tokens: 0, output_tokens: 0) usage.cost = Pricing.calculate(usage, model_info) return Response.new( model: @model, stop_reason: :error, usage: usage ) end if last_error && !retry_reason # Non-retriable or consumer output present — surface the error raise last_error end # ── Build successful Response ───────────────────────────────────── model_info = ModelCatalog.build(collector.model) u = collector.usage usage = Usage.new( input_tokens: u[:input], output_tokens: u[:output], cache_read_tokens: u[:cache_read], cache_creation_tokens: u[:cache_creation] ) usage.cost = Pricing.calculate(usage, model_info) content_blocks, tool_calls = build_response_content(collector.content_blocks) stop_reason = ResponseBuilder::STOP_REASON_MAP.fetch( collector.finish_reason.to_s, :end_turn ) return Response.new( model: collector.model, stop_reason: stop_reason, content: content_blocks, tool_calls: tool_calls, usage: usage ) end end # Build content blocks and tool_calls arrays from the StreamCollector's # accumulated content_blocks. Mirrors the logic in ResponseBuilder but # operates on the collector's internal Hash format rather than raw JSON. # # @param blocks [Array] collector.content_blocks # @return [Array(Array, Array)] [content_blocks, tool_calls] def build_response_content(blocks) content = [] tool_calls = [] blocks.each do |blk| case blk[:kind] when "text" text = blk[:text].to_s content << TextBlock.new(text: text) unless text.empty? when "thinking" thinking = blk[:thinking].to_s signature = blk[:signature] content << ThinkingBlock.new(thinking: thinking, signature: signature) when "redacted_thinking" data = blk[:data].to_s content << RedactedThinkingBlock.new(data: data) unless data.empty? when "tool_use" tool_calls << ToolUseBlock.new( id: blk[:id].to_s, name: blk[:name].to_s, arguments: blk[:arguments] || {} ) end end [content, tool_calls] end # Simple marker error class for internally-flagged retry scenarios. class RetriableStreamError < StandardError; end private_constant :RetriableStreamError # Returns current monotonic time in milliseconds. # Extracted as a method so specs can stub it. def current_time_ms Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) end # Returns the configured first-event timeout in milliseconds. # Extracted as a method so specs can stub it. def stream_first_event_timeout_ms STREAM_FIRST_EVENT_TIMEOUT_MS end # ── list_models helpers ─────────────────────────────────────────────── # Fetch the runtime model list from GET /v1/models. # Returns an Array (the "data" entries) or [] on any error. def fetch_runtime_models json = http_client(stream: false).get_json("#{MODELS_PATH}?limit=200") Array(json["data"]) rescue StandardError [] end # Merge runtime entries with the bundled list. # Runtime models override the bundled name; entirely new models are # appended with pricing: nil. def merge_model_lists(runtime_entries) return PricingTable.known_ids.map { |id| ModelCatalog.build(id) } if runtime_entries.empty? seen_ids = {} result = [] # Build from runtime entries (order preserved) runtime_entries.each do |entry| id = entry["id"].to_s next if id.empty? seen_ids[id] = true result << ModelCatalog.build_from_api(entry) end # Append any bundled models not present in the runtime response PricingTable.known_ids.each do |id| next if seen_ids[id] result << ModelCatalog.build(id) end result end # Build (or reuse) an HttpClient for the given stream mode. # The headers_proc is evaluated fresh on every request so that OAuth # token refreshes are picked up without restarting. def http_client(stream: false) # rubocop:disable Lint/UnusedMethodArgument HttpClient.new( base_url: @base_url, headers_proc: build_headers_proc ) end # Build an HttpClient that emits the real-Claude-Code header set # (no anthropic-version, no x-stainless-*). Required for /api/oauth/* # endpoints which reject SDK-style requests with # "OAuth authentication is currently not supported." def http_client_claude_code HttpClient.new( base_url: @base_url, headers_proc: build_headers_proc(claude_code_only: true) ) end # Returns a Proc that, when called with `stream: `, yields the # correct headers. Called fresh per-request so that a token refresh # is automatically picked up. def build_headers_proc(claude_code_only: false) adapter = self base_url = @base_url extra_betas = @extra_betas cc_only = claude_code_only lambda do |stream: false| Headers.build( api_key: adapter.instance_variable_get(:@api_key), is_oauth: adapter.instance_variable_get(:@is_oauth), stream: stream, base_url: base_url, extra_betas: extra_betas, claude_code_only: cc_only ) end end # Resolve the API key: prefer the explicitly-supplied value, then fall # back to the TokenStore (OAuth creds). def resolve_api_key(explicit_key) return explicit_key.to_s if explicit_key && !explicit_key.to_s.strip.empty? creds = @token_store.load creds&.fetch("access_token", nil) end # Decide if this is an OAuth session. def resolve_is_oauth(api_key, override) return override unless override.nil? api_key.to_s.start_with?("sk-ant-oat") end # ── Auth helpers ────────────────────────────────────────────────────── # True iff an explicit API key was supplied at construction time. def explicit_api_key_present? !@explicit_api_key.nil? end # Throttle an outbound API call through the RateLimiter. def with_rate_limit @rate_limiter&.wait! yield end # ── Rate-limit header capture ───────────────────────────────────────── # Parse rate-limit unified headers from a Net::HTTP response, store the # result in @rate_limit_info, and append a JSON entry to the debug log. # # Errors are silently swallowed — header capture must never affect the # main request flow. def capture_rate_limit_headers(response) # Always log ALL response headers for debugging (one line to stderr). all_headers = {} response.each_header { |k, v| all_headers[k] = v } if response.respond_to?(:each_header) @last_all_headers = all_headers info = RateLimitHeaders.parse(response) return unless info @rate_limit_info = info log_rate_limit_info(info) rescue StandardError # Never raise — this is best-effort instrumentation only. end # Append a single JSON line to the rate-limit JSONL debug log. def log_rate_limit_info(info) FileUtils.mkdir_p(File.dirname(@rate_limit_log_path)) entry = info.to_log_hash.merge(model: @model) File.open(@rate_limit_log_path, "a") { |f| f.puts(JSON.generate(entry)) } rescue StandardError # Best-effort. end # True iff the stored credentials have an expired (or absent) expiry. # Treats missing/zero expires_at_ms as expired. def expired?(creds) expires_at_ms = creds["expires_at_ms"].to_i return true if expires_at_ms.zero? now_ms = (Time.now.to_f * 1000).to_i now_ms >= expires_at_ms end # Wrap a block such that an AuthenticationError (server-revoked # access token, refresh-token rejection, etc.) triggers a single # automatic recovery cycle: # 1. Wipe the stored credentials. # 2. Run the full interactive OAuth login flow (opens browser). # 3. Retry the block once with the fresh token. # # Disabled for explicit-API-key callers (no OAuth fallback exists). # Disabled when AUTH_RECOVERY env var is set to "0" (for tests / CI). def with_auth_recovery return yield if explicit_api_key_present? return yield if ENV["AUTH_RECOVERY"] == "0" attempts = 0 begin attempts += 1 yield rescue AuthenticationError raise if attempts > 1 warn "[claude] authentication failed (token may have been revoked); " \ "re-running OAuth login..." @token_store.delete fresh = OAuth.login(token_store: @token_store) @token_store.save(fresh) @api_key = fresh["access_token"] @is_oauth = resolve_is_oauth(@api_key, @is_oauth_override) retry end end # Force-rotate the OAuth access token by performing a refresh-token # exchange. Used as the on_rate_limit callback for /api/oauth/usage, # which enforces a per-access-token quota: each fresh token gets # ~5 calls before persistent 429. Rotating gets us a fresh window. # # Returns true on success, false otherwise. def rotate_token_for_usage return false if explicit_api_key_present? creds = @token_store.load refresh_token = creds && creds["refresh_token"] return false unless refresh_token refreshed = OAuth.refresh!(refresh_token) @token_store.save(creds.merge(refreshed)) @api_key = refreshed["access_token"] @is_oauth = resolve_is_oauth(@api_key, @is_oauth_override) true rescue StandardError false end # Lazily refreshes the OAuth token when it is within 5 minutes of # expiry. No-op when using an explicit API key or when the token is # still fresh. def ensure_token! return if explicit_api_key_present? creds = @token_store.load return unless creds return unless expired?(creds) return unless creds["refresh_token"] refreshed = OAuth.refresh!(creds["refresh_token"]) @token_store.save(creds.merge(refreshed)) @api_key = refreshed["access_token"] @is_oauth = resolve_is_oauth(@api_key, @is_oauth_override) rescue StandardError # Silently swallow errors in ensure_token! — the request will # likely fail with an auth error, which is more informative. nil end end end end