# frozen_string_literal: true require "time" module Dispatch module Adapter class Claude < Base # Fetches and maps the Anthropic OAuth usage endpoint into a UsageReport. # # Endpoint: GET https://api.anthropic.com/api/oauth/usage # (OAuth-only; returns nil immediately for API-key mode) module UsageClient USAGE_PATH = "/api/oauth/usage" PROFILE_PATH = "/api/oauth/profile" # Maps raw bucket keys to configuration metadata. BUCKET_CONFIG = { "five_hour" => { window_id: "5h", duration_ms: 18_000_000, tier: nil, shared: true }, "seven_day" => { window_id: "7d", duration_ms: 604_800_000, tier: nil, shared: true }, "seven_day_opus" => { window_id: "7d", duration_ms: 604_800_000, tier: "opus", shared: false }, "seven_day_sonnet" => { window_id: "7d", duration_ms: 604_800_000, tier: "sonnet", shared: false } }.freeze # Window labels by window_id WINDOW_LABELS = { "5h" => "5 Hour", "7d" => "7 Day" }.freeze # Retry policy MAX_RETRIES = 3 BASE_DELAY_MS = 500 module_function # Fetch the usage report. # # @param http_client [HttpClient] a non-streaming HttpClient # @param is_oauth [Boolean] must be true for this to do anything # @param on_rate_limit [Proc, nil] called with no args when a 429 is # received from the usage endpoint. The endpoint enforces a # per-access-token quota of ~5 calls; rotating the token via # OAuth refresh resets the window. The callback should refresh # the adapter's access token and return truthy on success; this # method will then retry the request once with the fresh token. # @return [UsageReport, nil] def fetch(http_client, is_oauth:, on_rate_limit: nil) return nil unless is_oauth payload = fetch_with_retry(http_client, USAGE_PATH, on_rate_limit: on_rate_limit) return nil unless payload limits = parse_limits(payload) return nil if limits.empty? metadata = extract_metadata(payload) if metadata[:email].nil? || metadata[:account_id].nil? profile = begin fetch_with_retry(http_client, PROFILE_PATH) rescue StandardError nil end if profile metadata[:email] ||= profile["email"] metadata[:account_id] ||= profile["account_id"] || profile["id"] end end UsageReport.new( provider: ClaudeErrors::PROVIDER, limits: limits, fetched_at: Time.now, metadata: metadata, raw: payload ) end # GET the given path with up to MAX_RETRIES on transient errors. # Returns the parsed JSON hash, or nil on failure. # # When a 429 RateLimitError is received and on_rate_limit is # provided, the callback is invoked once (typically to rotate the # OAuth access token) and the request is retried once with what is # presumed to be a fresh token. This works around Anthropic's # /api/oauth/usage per-token quota of ~5 calls per access token. def fetch_with_retry(http_client, path, on_rate_limit: nil) attempt = 0 rate_limit_recovery_used = false begin http_client.get_json(path) rescue RateLimitError if on_rate_limit && !rate_limit_recovery_used rate_limit_recovery_used = true retry if on_rate_limit.call end attempt += 1 if attempt < MAX_RETRIES sleep((BASE_DELAY_MS * (2**(attempt - 1))) / 1000.0) retry end nil rescue ServerError, OverloadedError, ConnectionError attempt += 1 if attempt < MAX_RETRIES sleep((BASE_DELAY_MS * (2**(attempt - 1))) / 1000.0) retry end nil rescue StandardError nil end end # Parse all recognised usage buckets from the payload. def parse_limits(payload) limits = [] BUCKET_CONFIG.each do |bucket_key, config| bucket = payload[bucket_key] next unless bucket.is_a?(Hash) entry = build_limit_entry(bucket, config) limits << entry if entry end limits end # Build a single UsageLimitEntry from a bucket hash and its config. def build_limit_entry(bucket, config) utilization = bucket["utilization"].to_f window_id = config[:window_id] duration_ms = config[:duration_ms] tier = config[:tier] shared = config[:shared] window_label = WINDOW_LABELS[window_id] || window_id entry_label = "Claude #{window_label}#{" (#{tier.capitalize})" if tier}" id_parts = ["anthropic", window_id] id_parts << tier if tier entry_id = id_parts.join(":") UsageLimitEntry.new( id: entry_id, label: entry_label, scope: { provider: ClaudeErrors::PROVIDER, tier: tier, shared: shared, window_id: window_id }, window: UsageWindow.new( id: window_id, label: window_label, duration_ms: duration_ms, resets_at: parse_iso(bucket["resets_at"]) ), amount: UsageAmount.new( used: utilization.clamp(0, 100), limit: 100, remaining: (100 - utilization).clamp(0, 100), used_fraction: utilization / 100.0, remaining_fraction: 1.0 - (utilization / 100.0), unit: :percent ), status: derive_status(utilization) ) end # Derive a status symbol from a utilization percentage. def derive_status(utilization) if utilization >= 100 :exhausted elsif utilization >= 90 :warning else :ok end end # Parse an ISO 8601 timestamp string to a Time object, or nil. def parse_iso(str) return nil if str.nil? || str.empty? Time.parse(str) rescue ArgumentError, TypeError nil end # Extract metadata fields from the top-level payload. def extract_metadata(payload) { email: payload["email"], account_id: payload["account_id"] } end end end end end