diff options
| author | Kujtim Hoxha <[email protected]> | 2025-04-09 19:19:45 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-04-09 19:19:45 +0200 |
| commit | 2af1bbb82852ebebb59ef431e5362c0f3993e5a0 (patch) | |
| tree | cf20e499c14fb4fd6c1e56c99adc35626faead9c /internal/llm | |
| parent | b12ca55594c4a4c5cc0e81971df719a382f1f344 (diff) | |
| parent | 635324d386d52e117efea6fcbe9dbf306ec75653 (diff) | |
| download | opencode-2af1bbb82852ebebb59ef431e5362c0f3993e5a0.tar.gz opencode-2af1bbb82852ebebb59ef431e5362c0f3993e5a0.zip | |
Merge pull request #25 from kujtimiihoxha/cleanup-logs-status
Cleanup Logs and Status
Diffstat (limited to 'internal/llm')
| -rw-r--r-- | internal/llm/agent/agent.go | 20 | ||||
| -rw-r--r-- | internal/llm/agent/mcp-tools.go | 12 | ||||
| -rw-r--r-- | internal/llm/provider/anthropic.go | 116 | ||||
| -rw-r--r-- | internal/llm/provider/gemini.go | 6 | ||||
| -rw-r--r-- | internal/llm/tools/fetch.go | 6 |
5 files changed, 55 insertions, 105 deletions
diff --git a/internal/llm/agent/agent.go b/internal/llm/agent/agent.go index 78062d060..43ba0fc26 100644 --- a/internal/llm/agent/agent.go +++ b/internal/llm/agent/agent.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log" "strings" "sync" @@ -15,8 +14,6 @@ import ( "github.com/kujtimiihoxha/termai/internal/llm/provider" "github.com/kujtimiihoxha/termai/internal/llm/tools" "github.com/kujtimiihoxha/termai/internal/message" - "github.com/kujtimiihoxha/termai/internal/pubsub" - "github.com/kujtimiihoxha/termai/internal/tui/util" ) type Agent interface { @@ -94,24 +91,13 @@ func (c *agent) processEvent( assistantMsg.AppendContent(event.Content) return c.Messages.Update(*assistantMsg) case provider.EventError: - // TODO: remove when realease - log.Println("error", event.Error) - c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{ - Type: util.InfoTypeError, - Msg: event.Error.Error(), - }) + c.App.Logger.PersistError(event.Error.Error()) return event.Error case provider.EventWarning: - c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{ - Type: util.InfoTypeWarn, - Msg: event.Info, - }) + c.App.Logger.PersistWarn(event.Info) return nil case provider.EventInfo: - c.App.Status.Publish(pubsub.UpdatedEvent, util.InfoMsg{ - Type: util.InfoTypeInfo, - Msg: event.Info, - }) + c.App.Logger.PersistInfo(event.Info) case provider.EventComplete: assistantMsg.SetToolCalls(event.Response.ToolCalls) assistantMsg.AddFinish(event.Response.FinishReason) diff --git a/internal/llm/agent/mcp-tools.go b/internal/llm/agent/mcp-tools.go index 64b5f639b..dcf880fe7 100644 --- a/internal/llm/agent/mcp-tools.go +++ b/internal/llm/agent/mcp-tools.go @@ -4,10 +4,10 @@ import ( "context" "encoding/json" "fmt" - "log" "github.com/kujtimiihoxha/termai/internal/config" "github.com/kujtimiihoxha/termai/internal/llm/tools" + "github.com/kujtimiihoxha/termai/internal/logging" "github.com/kujtimiihoxha/termai/internal/permission" "github.com/kujtimiihoxha/termai/internal/version" @@ -22,6 +22,8 @@ type mcpTool struct { permissions permission.Service } +var logger = logging.Get() + type MCPClient interface { Initialize( ctx context.Context, @@ -141,13 +143,13 @@ func getTools(ctx context.Context, name string, m config.MCPServer, permissions _, err := c.Initialize(ctx, initRequest) if err != nil { - log.Printf("error initializing mcp client: %s", err) + logger.Error("error initializing mcp client", "error", err) return stdioTools } toolsRequest := mcp.ListToolsRequest{} tools, err := c.ListTools(ctx, toolsRequest) if err != nil { - log.Printf("error listing tools: %s", err) + logger.Error("error listing tools", "error", err) return stdioTools } for _, t := range tools.Tools { @@ -170,7 +172,7 @@ func GetMcpTools(ctx context.Context, permissions permission.Service) []tools.Ba m.Args..., ) if err != nil { - log.Printf("error creating mcp client: %s", err) + logger.Error("error creating mcp client", "error", err) continue } @@ -181,7 +183,7 @@ func GetMcpTools(ctx context.Context, permissions permission.Service) []tools.Ba client.WithHeaders(m.Headers), ) if err != nil { - log.Printf("error creating mcp client: %s", err) + logger.Error("error creating mcp client", "error", err) continue } mcpTools = append(mcpTools, getTools(ctx, name, m, permissions, c)...) diff --git a/internal/llm/provider/anthropic.go b/internal/llm/provider/anthropic.go index 625976a95..fbecbd43c 100644 --- a/internal/llm/provider/anthropic.go +++ b/internal/llm/provider/anthropic.go @@ -159,40 +159,9 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa attempts := 0 for { - // If this isn't the first attempt, we're retrying - if attempts > 0 { - if attempts > maxRetries { - eventChan <- ProviderEvent{ - Type: EventError, - Error: errors.New("maximum retry attempts reached for rate limit (429)"), - } - return - } - - // Inform user we're retrying with attempt number - eventChan <- ProviderEvent{ - Type: EventWarning, - Info: fmt.Sprintf("[Retrying due to rate limit... attempt %d of %d]", attempts, maxRetries), - } - - // Calculate backoff with exponential backoff and jitter - backoffMs := 2000 * (1 << (attempts - 1)) // 2s, 4s, 8s, 16s, 32s - jitterMs := int(float64(backoffMs) * 0.2) - totalBackoffMs := backoffMs + jitterMs - - // Sleep with backoff, respecting context cancellation - select { - case <-ctx.Done(): - eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()} - return - case <-time.After(time.Duration(totalBackoffMs) * time.Millisecond): - // Continue with retry - } - } attempts++ - // Create new streaming request stream := a.client.Messages.NewStreaming( ctx, anthropic.MessageNewParams{ @@ -213,11 +182,8 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa }, ) - // Process stream events accumulatedMessage := anthropic.Message{} - streamSuccess := false - // Process the stream until completion or error for stream.Next() { event := stream.Current() err := accumulatedMessage.Accumulate(event) @@ -247,7 +213,6 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa eventChan <- ProviderEvent{Type: EventContentStop} case anthropic.MessageStopEvent: - streamSuccess = true content := "" for _, block := range accumulatedMessage.Content { if text, ok := block.AsAny().(anthropic.TextBlock); ok { @@ -270,51 +235,59 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa } } - // If the stream completed successfully, we're done - if streamSuccess { + err := stream.Err() + if err == nil { return } - // Check for stream errors - err := stream.Err() - if err != nil { - var apierr *anthropic.Error - if errors.As(err, &apierr) { - if apierr.StatusCode == 429 || apierr.StatusCode == 529 { - // Check for Retry-After header - if retryAfterValues := apierr.Response.Header.Values("Retry-After"); len(retryAfterValues) > 0 { - // Parse the retry after value (seconds) - var retryAfterSec int - if _, err := fmt.Sscanf(retryAfterValues[0], "%d", &retryAfterSec); err == nil { - retryMs := retryAfterSec * 1000 - - // Inform user of retry with specific wait time - eventChan <- ProviderEvent{ - Type: EventWarning, - Info: fmt.Sprintf("[Rate limited: waiting %d seconds as specified by API]", retryAfterSec), - } - - // Sleep respecting context cancellation - select { - case <-ctx.Done(): - eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()} - return - case <-time.After(time.Duration(retryMs) * time.Millisecond): - // Continue with retry after specified delay - continue - } - } - } + var apierr *anthropic.Error + if !errors.As(err, &apierr) { + eventChan <- ProviderEvent{Type: EventError, Error: err} + return + } - // Fall back to exponential backoff if Retry-After parsing failed - continue + if apierr.StatusCode != 429 && apierr.StatusCode != 529 { + eventChan <- ProviderEvent{Type: EventError, Error: err} + return + } + + if attempts > maxRetries { + eventChan <- ProviderEvent{ + Type: EventError, + Error: errors.New("maximum retry attempts reached for rate limit (429)"), + } + return + } + + retryMs := 0 + retryAfterValues := apierr.Response.Header.Values("Retry-After") + if len(retryAfterValues) > 0 { + var retryAfterSec int + if _, err := fmt.Sscanf(retryAfterValues[0], "%d", &retryAfterSec); err == nil { + retryMs = retryAfterSec * 1000 + eventChan <- ProviderEvent{ + Type: EventWarning, + Info: fmt.Sprintf("[Rate limited: waiting %d seconds as specified by API]", retryAfterSec), } } + } else { + eventChan <- ProviderEvent{ + Type: EventWarning, + Info: fmt.Sprintf("[Retrying due to rate limit... attempt %d of %d]", attempts, maxRetries), + } - // For non-rate limit errors, report and exit - eventChan <- ProviderEvent{Type: EventError, Error: err} + backoffMs := 2000 * (1 << (attempts - 1)) + jitterMs := int(float64(backoffMs) * 0.2) + retryMs = backoffMs + jitterMs + } + select { + case <-ctx.Done(): + eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()} return + case <-time.After(time.Duration(retryMs) * time.Millisecond): + continue } + } }() @@ -412,7 +385,6 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag blocks = append(blocks, anthropic.ContentBlockParamOfRequestToolUseBlock(toolCall.ID, inputMap, toolCall.Name)) } - // Skip empty assistant messages completely if len(blocks) > 0 { anthropicMessages = append(anthropicMessages, anthropic.NewAssistantMessage(blocks...)) } diff --git a/internal/llm/provider/gemini.go b/internal/llm/provider/gemini.go index 53ffa154e..34bfc5283 100644 --- a/internal/llm/provider/gemini.go +++ b/internal/llm/provider/gemini.go @@ -4,14 +4,12 @@ import ( "context" "encoding/json" "errors" - "log" "github.com/google/generative-ai-go/genai" "github.com/google/uuid" "github.com/kujtimiihoxha/termai/internal/llm/models" "github.com/kujtimiihoxha/termai/internal/llm/tools" "github.com/kujtimiihoxha/termai/internal/message" - "google.golang.org/api/googleapi" "google.golang.org/api/iterator" "google.golang.org/api/option" ) @@ -242,10 +240,6 @@ func (p *geminiProvider) StreamResponse(ctx context.Context, messages []message. break } if err != nil { - var apiErr *googleapi.Error - if errors.As(err, &apiErr) { - log.Printf("%s", apiErr.Body) - } eventChan <- ProviderEvent{ Type: EventError, Error: err, diff --git a/internal/llm/tools/fetch.go b/internal/llm/tools/fetch.go index 5ea0c7633..19e644281 100644 --- a/internal/llm/tools/fetch.go +++ b/internal/llm/tools/fetch.go @@ -121,11 +121,7 @@ func (t *fetchTool) Run(ctx context.Context, call ToolCall) (ToolResponse, error ToolName: FetchToolName, Action: "fetch", Description: fmt.Sprintf("Fetch content from URL: %s", params.URL), - Params: FetchPermissionsParams{ - URL: params.URL, - Format: params.Format, - Timeout: params.Timeout, - }, + Params: FetchPermissionsParams(params), }, ) |
