From e7bb99baab5e6968ce0351d6ad219ed21ceec4df Mon Sep 17 00:00:00 2001 From: Kujtim Hoxha Date: Mon, 21 Apr 2025 13:33:51 +0200 Subject: fix the memory bug --- cmd/root.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) (limited to 'cmd') diff --git a/cmd/root.go b/cmd/root.go index f506e9940..54280ecaa 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -79,7 +79,7 @@ var rootCmd = &cobra.Command{ initMCPTools(ctx, app) // Setup the subscriptions, this will send services events to the TUI - ch, cancelSubs := setupSubscriptions(app) + ch, cancelSubs := setupSubscriptions(app, ctx) // Create a context for the TUI message handler tuiCtx, tuiCancel := context.WithCancel(ctx) @@ -174,21 +174,21 @@ func setupSubscriber[T any]( defer wg.Done() defer logging.RecoverPanic(fmt.Sprintf("subscription-%s", name), nil) + subCh := subscriber(ctx) + for { select { - case event, ok := <-subscriber(ctx): + case event, ok := <-subCh: if !ok { logging.Info("%s subscription channel closed", name) return } - // Convert generic event to tea.Msg if needed var msg tea.Msg = event - // Non-blocking send with timeout to prevent deadlocks select { case outputCh <- msg: - case <-time.After(500 * time.Millisecond): + case <-time.After(2 * time.Second): logging.Warn("%s message dropped due to slow consumer", name) case <-ctx.Done(): logging.Info("%s subscription cancelled", name) @@ -202,23 +202,21 @@ func setupSubscriber[T any]( }() } -func setupSubscriptions(app *app.App) (chan tea.Msg, func()) { +func setupSubscriptions(app *app.App, parentCtx context.Context) (chan tea.Msg, func()) { ch := make(chan tea.Msg, 100) - // Add a buffer to prevent blocking + wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - // Setup each subscription using the helper + ctx, cancel := context.WithCancel(parentCtx) // Inherit from parent context + setupSubscriber(ctx, &wg, "logging", logging.Subscribe, ch) setupSubscriber(ctx, &wg, "sessions", app.Sessions.Subscribe, ch) setupSubscriber(ctx, &wg, "messages", app.Messages.Subscribe, ch) setupSubscriber(ctx, &wg, "permissions", app.Permissions.Subscribe, ch) - // Return channel and a cleanup function cleanupFunc := func() { logging.Info("Cancelling all subscriptions") cancel() // Signal all goroutines to stop - // Wait with a timeout for all goroutines to complete waitCh := make(chan struct{}) go func() { defer logging.RecoverPanic("subscription-cleanup", nil) @@ -229,11 +227,11 @@ func setupSubscriptions(app *app.App) (chan tea.Msg, func()) { select { case <-waitCh: logging.Info("All subscription goroutines completed successfully") + close(ch) // Only close after all writers are confirmed done case <-time.After(5 * time.Second): logging.Warn("Timed out waiting for some subscription goroutines to complete") + close(ch) } - - close(ch) // Safe to close after all writers are done or timed out } return ch, cleanupFunc } -- cgit v1.2.3