diff options
| author | Kujtim Hoxha <[email protected]> | 2025-04-08 20:32:57 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-04-08 20:32:57 +0200 |
| commit | fde04bbf85ea641a33a282b354d63f227f9945fb (patch) | |
| tree | 8b71df5743546e937f5c89977f0f8e0a1814bf56 /internal/llm | |
| parent | 124bd57c507fdcbb56ab27137cbe892f12e1b48f (diff) | |
| parent | 4385fb321903f335097119349aa1ebf9edb3f71a (diff) | |
| download | opencode-fde04bbf85ea641a33a282b354d63f227f9945fb.tar.gz opencode-fde04bbf85ea641a33a282b354d63f227f9945fb.zip | |
Merge pull request #22 from adamdottv/adam/retries
fix(anthropic): better 429/529 handling
Diffstat (limited to 'internal/llm')
| -rw-r--r-- | internal/llm/agent/agent.go | 19 | ||||
| -rw-r--r-- | internal/llm/provider/anthropic.go | 234 | ||||
| -rw-r--r-- | internal/llm/provider/provider.go | 5 |
3 files changed, 187 insertions, 71 deletions
diff --git a/internal/llm/agent/agent.go b/internal/llm/agent/agent.go index cb123e78c..baf78be65 100644 --- a/internal/llm/agent/agent.go +++ b/internal/llm/agent/agent.go @@ -15,6 +15,8 @@ import ( "github.com/kujtimiihoxha/termai/internal/llm/provider" "github.com/kujtimiihoxha/termai/internal/llm/tools" "github.com/kujtimiihoxha/termai/internal/message" + "github.com/kujtimiihoxha/termai/internal/pubsub" + "github.com/kujtimiihoxha/termai/internal/tui/util" ) type Agent interface { @@ -92,9 +94,24 @@ func (c *agent) processEvent( assistantMsg.AppendContent(event.Content) return c.Messages.Update(*assistantMsg) case provider.EventError: + // TODO: remove when realease log.Println("error", event.Error) + c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{ + Type: util.InfoTypeError, + Msg: event.Error.Error(), + }) return event.Error - + case provider.EventWarning: + c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{ + Type: util.InfoTypeWarn, + Msg: event.Info, + }) + return nil + case provider.EventInfo: + c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{ + Type: util.InfoTypeInfo, + Msg: event.Info, + }) case provider.EventComplete: assistantMsg.SetToolCalls(event.Response.ToolCalls) assistantMsg.AddFinish(event.Response.FinishReason) diff --git a/internal/llm/provider/anthropic.go b/internal/llm/provider/anthropic.go index 9e3775d33..02bd572f1 100644 --- a/internal/llm/provider/anthropic.go +++ b/internal/llm/provider/anthropic.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "errors" + "fmt" "strings" + "time" "github.com/anthropics/anthropic-sdk-go" "github.com/anthropics/anthropic-sdk-go/option" @@ -68,21 +70,24 @@ func (a *anthropicProvider) SendMessages(ctx context.Context, messages []message anthropicMessages := a.convertToAnthropicMessages(messages) anthropicTools := a.convertToAnthropicTools(tools) - response, err := a.client.Messages.New(ctx, anthropic.MessageNewParams{ - Model: anthropic.Model(a.model.APIModel), - MaxTokens: a.maxTokens, - Temperature: anthropic.Float(0), - Messages: anthropicMessages, - Tools: anthropicTools, - System: []anthropic.TextBlockParam{ - { - Text: a.systemMessage, - CacheControl: anthropic.CacheControlEphemeralParam{ - Type: "ephemeral", + response, err := a.client.Messages.New( + ctx, + anthropic.MessageNewParams{ + Model: anthropic.Model(a.model.APIModel), + MaxTokens: a.maxTokens, + Temperature: anthropic.Float(0), + Messages: anthropicMessages, + Tools: anthropicTools, + System: []anthropic.TextBlockParam{ + { + Text: a.systemMessage, + CacheControl: anthropic.CacheControlEphemeralParam{ + Type: "ephemeral", + }, }, }, }, - }) + ) if err != nil { return nil, err } @@ -121,83 +126,171 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa temperature = anthropic.Float(1) } - stream := a.client.Messages.NewStreaming(ctx, anthropic.MessageNewParams{ - Model: anthropic.Model(a.model.APIModel), - MaxTokens: a.maxTokens, - Temperature: temperature, - Messages: anthropicMessages, - Tools: anthropicTools, - Thinking: thinkingParam, - System: []anthropic.TextBlockParam{ - { - Text: a.systemMessage, - CacheControl: anthropic.CacheControlEphemeralParam{ - Type: "ephemeral", - }, - }, - }, - }) - eventChan := make(chan ProviderEvent) go func() { defer close(eventChan) - accumulatedMessage := anthropic.Message{} + const maxRetries = 8 + attempts := 0 - for stream.Next() { - event := stream.Current() - err := accumulatedMessage.Accumulate(event) - if err != nil { - eventChan <- ProviderEvent{Type: EventError, Error: err} - return + for { + // If this isn't the first attempt, we're retrying + if attempts > 0 { + if attempts > maxRetries { + eventChan <- ProviderEvent{ + Type: EventError, + Error: errors.New("maximum retry attempts reached for rate limit (429)"), + } + return + } + + // Inform user we're retrying with attempt number + eventChan <- ProviderEvent{ + Type: EventWarning, + Info: fmt.Sprintf("[Retrying due to rate limit... attempt %d of %d]", attempts, maxRetries), + } + + // Calculate backoff with exponential backoff and jitter + backoffMs := 2000 * (1 << (attempts - 1)) // 2s, 4s, 8s, 16s, 32s + jitterMs := int(float64(backoffMs) * 0.2) + totalBackoffMs := backoffMs + jitterMs + + // Sleep with backoff, respecting context cancellation + select { + case <-ctx.Done(): + eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()} + return + case <-time.After(time.Duration(totalBackoffMs) * time.Millisecond): + // Continue with retry + } } - switch event := event.AsAny().(type) { - case anthropic.ContentBlockStartEvent: - eventChan <- ProviderEvent{Type: EventContentStart} + attempts++ + + // Create new streaming request + stream := a.client.Messages.NewStreaming( + ctx, + anthropic.MessageNewParams{ + Model: anthropic.Model(a.model.APIModel), + MaxTokens: a.maxTokens, + Temperature: temperature, + Messages: anthropicMessages, + Tools: anthropicTools, + Thinking: thinkingParam, + System: []anthropic.TextBlockParam{ + { + Text: a.systemMessage, + CacheControl: anthropic.CacheControlEphemeralParam{ + Type: "ephemeral", + }, + }, + }, + }, + ) - case anthropic.ContentBlockDeltaEvent: - if event.Delta.Type == "thinking_delta" && event.Delta.Thinking != "" { - eventChan <- ProviderEvent{ - Type: EventThinkingDelta, - Thinking: event.Delta.Thinking, + // Process stream events + accumulatedMessage := anthropic.Message{} + streamSuccess := false + + // Process the stream until completion or error + for stream.Next() { + event := stream.Current() + err := accumulatedMessage.Accumulate(event) + if err != nil { + eventChan <- ProviderEvent{Type: EventError, Error: err} + return // Don't retry on accumulation errors + } + + switch event := event.AsAny().(type) { + case anthropic.ContentBlockStartEvent: + eventChan <- ProviderEvent{Type: EventContentStart} + + case anthropic.ContentBlockDeltaEvent: + if event.Delta.Type == "thinking_delta" && event.Delta.Thinking != "" { + eventChan <- ProviderEvent{ + Type: EventThinkingDelta, + Thinking: event.Delta.Thinking, + } + } else if event.Delta.Type == "text_delta" && event.Delta.Text != "" { + eventChan <- ProviderEvent{ + Type: EventContentDelta, + Content: event.Delta.Text, + } } - } else if event.Delta.Type == "text_delta" && event.Delta.Text != "" { - eventChan <- ProviderEvent{ - Type: EventContentDelta, - Content: event.Delta.Text, + + case anthropic.ContentBlockStopEvent: + eventChan <- ProviderEvent{Type: EventContentStop} + + case anthropic.MessageStopEvent: + streamSuccess = true + content := "" + for _, block := range accumulatedMessage.Content { + if text, ok := block.AsAny().(anthropic.TextBlock); ok { + content += text.Text + } } - } - case anthropic.ContentBlockStopEvent: - eventChan <- ProviderEvent{Type: EventContentStop} + toolCalls := a.extractToolCalls(accumulatedMessage.Content) + tokenUsage := a.extractTokenUsage(accumulatedMessage.Usage) - case anthropic.MessageStopEvent: - content := "" - for _, block := range accumulatedMessage.Content { - if text, ok := block.AsAny().(anthropic.TextBlock); ok { - content += text.Text + eventChan <- ProviderEvent{ + Type: EventComplete, + Response: &ProviderResponse{ + Content: content, + ToolCalls: toolCalls, + Usage: tokenUsage, + FinishReason: string(accumulatedMessage.StopReason), + }, } } + } - toolCalls := a.extractToolCalls(accumulatedMessage.Content) - tokenUsage := a.extractTokenUsage(accumulatedMessage.Usage) + // If the stream completed successfully, we're done + if streamSuccess { + return + } - eventChan <- ProviderEvent{ - Type: EventComplete, - Response: &ProviderResponse{ - Content: content, - ToolCalls: toolCalls, - Usage: tokenUsage, - FinishReason: string(accumulatedMessage.StopReason), - }, + // Check for stream errors + err := stream.Err() + if err != nil { + var apierr *anthropic.Error + if errors.As(err, &apierr) { + if apierr.StatusCode == 429 || apierr.StatusCode == 529 { + // Check for Retry-After header + if retryAfterValues := apierr.Response.Header.Values("Retry-After"); len(retryAfterValues) > 0 { + // Parse the retry after value (seconds) + var retryAfterSec int + if _, err := fmt.Sscanf(retryAfterValues[0], "%d", &retryAfterSec); err == nil { + retryMs := retryAfterSec * 1000 + + // Inform user of retry with specific wait time + eventChan <- ProviderEvent{ + Type: EventWarning, + Info: fmt.Sprintf("[Rate limited: waiting %d seconds as specified by API]", retryAfterSec), + } + + // Sleep respecting context cancellation + select { + case <-ctx.Done(): + eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()} + return + case <-time.After(time.Duration(retryMs) * time.Millisecond): + // Continue with retry after specified delay + continue + } + } + } + + // Fall back to exponential backoff if Retry-After parsing failed + continue + } } - } - } - if stream.Err() != nil { - eventChan <- ProviderEvent{Type: EventError, Error: stream.Err()} + // For non-rate limit errors, report and exit + eventChan <- ProviderEvent{Type: EventError, Error: err} + return + } } }() @@ -311,3 +404,4 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag return anthropicMessages } + diff --git a/internal/llm/provider/provider.go b/internal/llm/provider/provider.go index f40429738..cd3a7aa05 100644 --- a/internal/llm/provider/provider.go +++ b/internal/llm/provider/provider.go @@ -17,6 +17,8 @@ const ( EventContentStop EventType = "content_stop" EventComplete EventType = "complete" EventError EventType = "error" + EventWarning EventType = "warning" + EventInfo EventType = "info" ) type TokenUsage struct { @@ -40,6 +42,9 @@ type ProviderEvent struct { ToolCall *message.ToolCall Error error Response *ProviderResponse + + // Used for giving users info on e.x retry + Info string } type Provider interface { |
