diff options
| author | Adam Malczewski <[email protected]> | 2026-04-30 18:06:07 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-04-30 18:06:07 +0900 |
| commit | 9be8821368deff024eafedeea55a614f9a9468cf (patch) | |
| tree | 43d70e2e8d6ac31e288f8f99b71555c051db0b19 /lib | |
| parent | 5c9b8f5142198bdf230d500b5101322a22235670 (diff) | |
| download | dispatch-adapter-copilot-main.tar.gz dispatch-adapter-copilot-main.zip | |
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/dispatch/adapter/copilot.rb | 386 |
1 files changed, 367 insertions, 19 deletions
diff --git a/lib/dispatch/adapter/copilot.rb b/lib/dispatch/adapter/copilot.rb index 7355df8..445adff 100644 --- a/lib/dispatch/adapter/copilot.rb +++ b/lib/dispatch/adapter/copilot.rb @@ -82,29 +82,38 @@ module Dispatch def chat(messages, system: nil, tools: [], stream: false, max_tokens: nil, thinking: :default, &) ensure_authenticated! - wire_messages = build_wire_messages(messages, system) - wire_tools = build_wire_tools(tools) effective_max_tokens = max_tokens || @default_max_tokens effective_thinking = thinking == :default ? @default_thinking : thinking validate_thinking_level!(effective_thinking) - body = { - model: @model, - messages: wire_messages, - stream: stream - } - if uses_max_completion_tokens? - body[:max_completion_tokens] = effective_max_tokens + if uses_responses_api? + if stream + chat_streaming_responses(messages, system, tools, effective_max_tokens, effective_thinking, &) + else + chat_non_streaming_responses(messages, system, tools, effective_max_tokens, effective_thinking) + end else - body[:max_tokens] = effective_max_tokens - end - body[:tools] = wire_tools unless wire_tools.empty? - body[:reasoning_effort] = effective_thinking if effective_thinking + wire_messages = build_wire_messages(messages, system) + wire_tools = build_wire_tools(tools) - if stream - chat_streaming(body, &) - else - chat_non_streaming(body) + body = { + model: @model, + messages: wire_messages, + stream: stream + } + if uses_max_completion_tokens? + body[:max_completion_tokens] = effective_max_tokens + else + body[:max_tokens] = effective_max_tokens + end + body[:tools] = wire_tools unless wire_tools.empty? + body[:reasoning_effort] = effective_thinking if effective_thinking + + if stream + chat_streaming(body, &) + else + chat_non_streaming(body) + end end end @@ -189,6 +198,14 @@ module Dispatch @model.match?(/o[1-9]|gpt-5|gemini/) end + # Returns true when the selected model requires the /v1/responses endpoint. + # This applies to GPT-5 reasoning models. These models reject tool calls on + # /v1/chat/completions and return a 400 RequestError directing callers to + # use /v1/responses instead. + def uses_responses_api? + @model.match?(/\Agpt-5/) + end + def default_token_path File.join(Dir.home, ".config", "dispatch", "copilot_github_token") end @@ -440,6 +457,92 @@ module Dispatch merge_consecutive_roles(wire) end + # Converts canonical messages to the flat `input` array required by + # POST /v1/responses. System prompt is prepended as a system-role item. + # The Responses API does not support a top-level `system` parameter — + # the system message must be the first element of `input`. + def build_responses_api_input(messages, system) + input = [] + input << { role: "system", content: system } if system + + messages.each do |msg| + input.concat(convert_message_to_responses_input(msg)) + end + + input + end + + # Converts a single canonical Message to one or more Responses API input + # items. Returns an Array (always) so results can be flat-concatenated. + def convert_message_to_responses_input(msg) + case msg.content + when String + [{ role: msg.role, content: msg.content }] + when Array + convert_content_blocks_to_responses_input(msg) + else + [{ role: msg.role, content: msg.content.to_s }] + end + end + + # Converts an array of content blocks (TextBlock, ToolUseBlock, + # ToolResultBlock) from a single Message into Responses API input items. + # + # Key differences from the Chat Completions conversion: + # - ToolUseBlock → top-level {type: "function_call", ...} item (not nested + # under an assistant message role) + # - ToolResultBlock → top-level {type: "function_call_output", ...} item + # - TextBlock in assistant message → {role: "assistant", content: [{type: + # "output_text", text: "..."}]} + def convert_content_blocks_to_responses_input(msg) + items = [] + text_parts = [] + + msg.content.each do |block| + case block + when TextBlock + text_parts << block.text + when ImageBlock + raise NotImplementedError, "ImageBlock is not yet supported by the Copilot adapter" + when ToolUseBlock + # Flush any accumulated text first as an assistant message + unless text_parts.empty? + items << { + role: "assistant", + content: [{ type: "output_text", text: text_parts.join("\n") }] + } + text_parts = [] + end + items << { + type: "function_call", + call_id: block.id, + name: block.name, + arguments: JSON.generate(block.arguments) + } + when ToolResultBlock + items << { + type: "function_call_output", + call_id: block.tool_use_id, + output: tool_result_content(block) + } + end + end + + # Flush any remaining text + unless text_parts.empty? + items << if msg.role == "assistant" + { + role: "assistant", + content: [{ type: "output_text", text: text_parts.join("\n") }] + } + else + { role: msg.role, content: text_parts.join("\n") } + end + end + + items + end + def convert_message(msg) case msg.content when String @@ -544,6 +647,45 @@ module Dispatch end end + # Assembles the full request body for POST /v1/responses. + # + # Key differences from the Chat Completions body: + # - Uses `input` instead of `messages`. + # - Uses `max_output_tokens` instead of `max_tokens`/`max_completion_tokens`. + # - Uses `reasoning: {effort:}` instead of `reasoning_effort`. + # - Tool definitions omit the `function` wrapper — name/description/parameters + # are top-level inside the tool object. + def build_responses_api_body(messages, system, tools, stream, max_tokens, thinking) + input = build_responses_api_input(messages, system) + wire_tools = build_responses_api_tools(tools) + + body = { + model: @model, + input: input, + stream: stream, + max_output_tokens: max_tokens + } + + body[:tools] = wire_tools unless wire_tools.empty? + body[:reasoning] = { effort: thinking } if thinking + + body + end + + # Converts ToolDefinition structs (or plain hashes) to the Responses API + # tool format. Unlike Chat Completions, there is no `function` wrapper — + # name, description, and parameters are direct keys on the tool object. + def build_responses_api_tools(tools) + tools.map do |td| + { + type: "function", + name: tool_attr(td, :name), + description: tool_attr(td, :description), + parameters: tool_attr(td, :parameters) + } + end + end + # --- Chat (non-streaming) --- def chat_non_streaming(body) @@ -603,6 +745,78 @@ module Dispatch ) end + # Non-streaming chat via POST /v1/responses. + # Called when uses_responses_api? is true and stream is false. + def chat_non_streaming_responses(messages, system, tools, max_tokens, thinking) + @rate_limiter.wait! + body = build_responses_api_body(messages, system, tools, false, max_tokens, thinking) + wire_messages = build_responses_api_input(messages, system) + + uri = URI("#{API_BASE}/responses") + request = Net::HTTP::Post.new(uri) + apply_headers!(request, initiator: x_initiator_for_responses(wire_messages)) + request.body = JSON.generate(deep_utf8(body)) + + response = execute_request(uri, request) + data = parse_response!(response) + build_response_from_responses_api(data) + end + + # Builds a canonical Response from a /v1/responses non-streaming body. + def build_response_from_responses_api(data) + output = data["output"] || [] + text_parts = [] + tool_calls = [] + + output.each do |item| + case item["type"] + when "message" + (item["content"] || []).each do |part| + text_parts << part["text"] if part["type"] == "output_text" && part["text"] + end + when "function_call" + tool_calls << ToolUseBlock.new( + id: item["call_id"] || item["id"], + name: item["name"], + arguments: parse_tool_arguments(item["arguments"]) + ) + end + end + + stop_reason = tool_calls.any? ? :tool_use : :end_turn + content = text_parts.empty? ? nil : text_parts.join + + usage_data = data["usage"] || {} + usage = Usage.new( + input_tokens: usage_data["input_tokens"] || 0, + output_tokens: usage_data["output_tokens"] || 0 + ) + + Response.new( + content: content, + tool_calls: tool_calls, + model: data["model"] || @model, + stop_reason: stop_reason, + usage: usage + ) + end + + # Determines X-Initiator for a Responses API call. + # Same logic as x_initiator_for but operates on the already-built `input` + # array where items use `type: "function_call"` / `type: "function_call_output"` + # instead of role-based items. + def x_initiator_for_responses(input_items) + if input_items.any? do |item| + item[:role].to_s == "assistant" || + item[:type].to_s == "function_call" || + item[:type].to_s == "function_call_output" + end + "agent" + else + "user" + end + end + # Recursively coerces every String inside a wire-body to valid UTF-8. # # Tool results (grep output, file reads, shell stdout) frequently arrive @@ -625,8 +839,6 @@ module Dispatch obj.map { |v| deep_utf8(v) } when Hash obj.each_with_object({}) { |(k, v), h| h[k] = deep_utf8(v) } - when Symbol - obj else obj end @@ -772,6 +984,142 @@ module Dispatch ) ) end + + # Streaming chat via POST /v1/responses. + # Called when uses_responses_api? is true and stream is true. + def chat_streaming_responses(messages, system, tools, max_tokens, thinking, &block) + @rate_limiter.wait! + body = build_responses_api_body(messages, system, tools, true, max_tokens, thinking) + wire_input = build_responses_api_input(messages, system) + + uri = URI("#{API_BASE}/responses") + request = Net::HTTP::Post.new(uri) + apply_headers!(request, initiator: x_initiator_for_responses(wire_input)) + request.body = JSON.generate(deep_utf8(body)) + + collector = new_responses_stream_collector + + execute_streaming_request(uri, request) do |response| + buffer = +"" + response.read_body do |chunk| + buffer << chunk + process_responses_sse_buffer(buffer, collector, &block) + end + end + + build_streaming_response_from_responses(collector) + end + + def new_responses_stream_collector + { + # text_parts: Hash<output_index => String> — accumulated text fragments + text_parts: Hash.new { |h, k| h[k] = +"" }, + # tool_calls: Hash<item_id => {call_id:, name:, arguments:}> + tool_calls: {}, + # order: Array of [:text, output_index] or [:tool, item_id] in appearance order + order: [], + model: @model, + input_tokens: 0, + output_tokens: 0 + } + end + + def process_responses_sse_buffer(buffer, collector, &) + while (line_end = buffer.index("\n")) + line = buffer.slice!(0..line_end).strip + next if line.empty? + next unless line.start_with?("data: ") + + data_str = line.delete_prefix("data: ") + next if data_str == "[DONE]" + + data = JSON.parse(data_str) + process_responses_stream_event(data, collector, &) + end + rescue JSON::ParserError + nil + end + + def process_responses_stream_event(data, collector, &block) + case data["type"] + when "response.output_item.added" + handle_responses_output_item_added(data, collector, &block) + when "response.output_text.delta" + output_index = data["output_index"] || 0 + fragment = data["delta"].to_s + collector[:text_parts][output_index] << fragment + block.call(StreamDelta.new(type: :text_delta, text: fragment)) + when "response.function_call_arguments.delta" + handle_responses_arguments_delta(data, collector, &block) + when "response.completed" + usage = data.dig("response", "usage") || {} + collector[:input_tokens] = usage["input_tokens"] || collector[:input_tokens] + collector[:output_tokens] = usage["output_tokens"] || collector[:output_tokens] + model = data.dig("response", "model") + collector[:model] = model if model + end + end + + def handle_responses_output_item_added(data, collector, &block) + item = data["item"] || {} + case item["type"] + when "function_call" + item_id = item["id"] + collector[:tool_calls][item_id] = { + call_id: item["call_id"] || item_id, + name: item["name"] || "", + arguments: +"" + } + collector[:order] << [:tool, item_id] + block.call(StreamDelta.new( + type: :tool_use_start, + tool_call_id: item["call_id"] || item_id, + tool_name: item["name"] || "" + )) + when "message" + output_index = data["output_index"] || 0 + collector[:order] << [:text, output_index] unless collector[:order].any? { |t, i| t == :text && i == output_index } + end + end + + def handle_responses_arguments_delta(data, collector, &block) + item_id = data["item_id"] + fragment = data["delta"].to_s + tc = collector[:tool_calls][item_id] + return unless tc + + tc[:arguments] << fragment + block.call(StreamDelta.new( + type: :tool_use_delta, + tool_call_id: tc[:call_id], + argument_delta: fragment + )) + end + + def build_streaming_response_from_responses(collector) + tool_calls = collector[:tool_calls].values.map do |tc| + ToolUseBlock.new( + id: tc[:call_id], + name: tc[:name], + arguments: parse_tool_arguments(tc[:arguments]) + ) + end + + all_text = collector[:text_parts].keys.sort.map { |idx| collector[:text_parts][idx] }.join + content = all_text.empty? ? nil : all_text + stop_reason = tool_calls.any? ? :tool_use : :end_turn + + Response.new( + content: content, + tool_calls: tool_calls, + model: collector[:model], + stop_reason: stop_reason, + usage: Usage.new( + input_tokens: collector[:input_tokens], + output_tokens: collector[:output_tokens] + ) + ) + end end end end |
