diff options
| author | Dax Raad <[email protected]> | 2025-05-30 20:47:56 -0400 |
|---|---|---|
| committer | Dax Raad <[email protected]> | 2025-05-30 20:48:36 -0400 |
| commit | f3da73553c45f17e04b1e77cb13eb0fca714d1bd (patch) | |
| tree | a24317a19e1ab2a89da50db669dc6894f15d00d1 /internal/pubsub | |
| parent | 9a26b3058ffc1023e5c7e54b6d571c903d15888e (diff) | |
| download | opencode-f3da73553c45f17e04b1e77cb13eb0fca714d1bd.tar.gz opencode-f3da73553c45f17e04b1e77cb13eb0fca714d1bd.zip | |
sync
Diffstat (limited to 'internal/pubsub')
| -rw-r--r-- | internal/pubsub/broker.go | 113 | ||||
| -rw-r--r-- | internal/pubsub/broker_test.go | 144 | ||||
| -rw-r--r-- | internal/pubsub/events.go | 24 |
3 files changed, 0 insertions, 281 deletions
diff --git a/internal/pubsub/broker.go b/internal/pubsub/broker.go deleted file mode 100644 index 05a4476c8..000000000 --- a/internal/pubsub/broker.go +++ /dev/null @@ -1,113 +0,0 @@ -package pubsub - -import ( - "context" - "fmt" - "log/slog" - "sync" - "time" -) - -const defaultChannelBufferSize = 100 - -type Broker[T any] struct { - subs map[chan Event[T]]context.CancelFunc - mu sync.RWMutex - isClosed bool -} - -func NewBroker[T any]() *Broker[T] { - return &Broker[T]{ - subs: make(map[chan Event[T]]context.CancelFunc), - } -} - -func (b *Broker[T]) Shutdown() { - b.mu.Lock() - if b.isClosed { - b.mu.Unlock() - return - } - b.isClosed = true - - for ch, cancel := range b.subs { - cancel() - close(ch) - delete(b.subs, ch) - } - b.mu.Unlock() - slog.Debug("PubSub broker shut down", "type", fmt.Sprintf("%T", *new(T))) -} - -func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] { - b.mu.Lock() - defer b.mu.Unlock() - - if b.isClosed { - closedCh := make(chan Event[T]) - close(closedCh) - return closedCh - } - - subCtx, subCancel := context.WithCancel(ctx) - subscriberChannel := make(chan Event[T], defaultChannelBufferSize) - b.subs[subscriberChannel] = subCancel - - go func() { - <-subCtx.Done() - b.mu.Lock() - defer b.mu.Unlock() - if _, ok := b.subs[subscriberChannel]; ok { - close(subscriberChannel) - delete(b.subs, subscriberChannel) - } - }() - - return subscriberChannel -} - -func (b *Broker[T]) Publish(eventType EventType, payload T) { - b.mu.RLock() - defer b.mu.RUnlock() - - if b.isClosed { - slog.Warn("Attempted to publish on a closed pubsub broker", "type", eventType, "payload_type", fmt.Sprintf("%T", payload)) - return - } - - event := Event[T]{Type: eventType, Payload: payload} - - for ch := range b.subs { - // Non-blocking send with a fallback to a goroutine to prevent slow subscribers - // from blocking the publisher. - select { - case ch <- event: - // Successfully sent - default: - // Subscriber channel is full or receiver is slow. - // Send in a new goroutine to avoid blocking the publisher. - // This might lead to out-of-order delivery for this specific slow subscriber. - go func(sChan chan Event[T], ev Event[T]) { - // Re-check if broker is closed before attempting send in goroutine - b.mu.RLock() - isBrokerClosed := b.isClosed - b.mu.RUnlock() - if isBrokerClosed { - return - } - - select { - case sChan <- ev: - case <-time.After(2 * time.Second): // Timeout for slow subscriber - slog.Warn("PubSub: Dropped event for slow subscriber after timeout", "type", ev.Type) - } - }(ch, event) - } - } -} - -func (b *Broker[T]) GetSubscriberCount() int { - b.mu.RLock() - defer b.mu.RUnlock() - return len(b.subs) -} diff --git a/internal/pubsub/broker_test.go b/internal/pubsub/broker_test.go deleted file mode 100644 index b4caa98f3..000000000 --- a/internal/pubsub/broker_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package pubsub - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestBrokerSubscribe(t *testing.T) { - t.Parallel() - - t.Run("with cancellable context", func(t *testing.T) { - t.Parallel() - broker := NewBroker[string]() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ch := broker.Subscribe(ctx) - assert.NotNil(t, ch) - assert.Equal(t, 1, broker.GetSubscriberCount()) - - // Cancel the context should remove the subscription - cancel() - time.Sleep(10 * time.Millisecond) // Give time for goroutine to process - assert.Equal(t, 0, broker.GetSubscriberCount()) - }) - - t.Run("with background context", func(t *testing.T) { - t.Parallel() - broker := NewBroker[string]() - - // Using context.Background() should not leak goroutines - ch := broker.Subscribe(context.Background()) - assert.NotNil(t, ch) - assert.Equal(t, 1, broker.GetSubscriberCount()) - - // Shutdown should clean up all subscriptions - broker.Shutdown() - assert.Equal(t, 0, broker.GetSubscriberCount()) - }) -} - -func TestBrokerPublish(t *testing.T) { - t.Parallel() - broker := NewBroker[string]() - ctx := t.Context() - - ch := broker.Subscribe(ctx) - - // Publish a message - broker.Publish(EventTypeCreated, "test message") - - // Verify message is received - select { - case event := <-ch: - assert.Equal(t, EventTypeCreated, event.Type) - assert.Equal(t, "test message", event.Payload) - case <-time.After(100 * time.Millisecond): - t.Fatal("timeout waiting for message") - } -} - -func TestBrokerShutdown(t *testing.T) { - t.Parallel() - broker := NewBroker[string]() - - // Create multiple subscribers - ch1 := broker.Subscribe(context.Background()) - ch2 := broker.Subscribe(context.Background()) - - assert.Equal(t, 2, broker.GetSubscriberCount()) - - // Shutdown should close all channels and clean up - broker.Shutdown() - - // Verify channels are closed - _, ok1 := <-ch1 - _, ok2 := <-ch2 - assert.False(t, ok1, "channel 1 should be closed") - assert.False(t, ok2, "channel 2 should be closed") - - // Verify subscriber count is reset - assert.Equal(t, 0, broker.GetSubscriberCount()) -} - -func TestBrokerConcurrency(t *testing.T) { - t.Parallel() - broker := NewBroker[int]() - - // Create a large number of subscribers - const numSubscribers = 100 - var wg sync.WaitGroup - wg.Add(numSubscribers) - - // Create a channel to collect received events - receivedEvents := make(chan int, numSubscribers) - - for i := range numSubscribers { - go func(id int) { - defer wg.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ch := broker.Subscribe(ctx) - - // Receive one message then cancel - select { - case event := <-ch: - receivedEvents <- event.Payload - case <-time.After(1 * time.Second): - t.Errorf("timeout waiting for message %d", id) - } - cancel() - }(i) - } - - // Give subscribers time to set up - time.Sleep(10 * time.Millisecond) - - // Publish messages to all subscribers - for i := range numSubscribers { - broker.Publish(EventTypeCreated, i) - } - - // Wait for all subscribers to finish - wg.Wait() - close(receivedEvents) - - // Give time for cleanup goroutines to run - time.Sleep(10 * time.Millisecond) - - // Verify all subscribers are cleaned up - assert.Equal(t, 0, broker.GetSubscriberCount()) - - // Verify we received the expected number of events - count := 0 - for range receivedEvents { - count++ - } - assert.Equal(t, numSubscribers, count) -} diff --git a/internal/pubsub/events.go b/internal/pubsub/events.go deleted file mode 100644 index e3910f9f5..000000000 --- a/internal/pubsub/events.go +++ /dev/null @@ -1,24 +0,0 @@ -package pubsub - -import "context" - -type EventType string - -const ( - EventTypeCreated EventType = "created" - EventTypeUpdated EventType = "updated" - EventTypeDeleted EventType = "deleted" -) - -type Event[T any] struct { - Type EventType - Payload T -} - -type Subscriber[T any] interface { - Subscribe(ctx context.Context) <-chan Event[T] -} - -type Publisher[T any] interface { - Publish(eventType EventType, payload T) -} |
