summaryrefslogtreecommitdiffhomepage
path: root/internal/pubsub
diff options
context:
space:
mode:
authorDax Raad <[email protected]>2025-05-30 20:47:56 -0400
committerDax Raad <[email protected]>2025-05-30 20:48:36 -0400
commitf3da73553c45f17e04b1e77cb13eb0fca714d1bd (patch)
treea24317a19e1ab2a89da50db669dc6894f15d00d1 /internal/pubsub
parent9a26b3058ffc1023e5c7e54b6d571c903d15888e (diff)
downloadopencode-f3da73553c45f17e04b1e77cb13eb0fca714d1bd.tar.gz
opencode-f3da73553c45f17e04b1e77cb13eb0fca714d1bd.zip
sync
Diffstat (limited to 'internal/pubsub')
-rw-r--r--internal/pubsub/broker.go113
-rw-r--r--internal/pubsub/broker_test.go144
-rw-r--r--internal/pubsub/events.go24
3 files changed, 0 insertions, 281 deletions
diff --git a/internal/pubsub/broker.go b/internal/pubsub/broker.go
deleted file mode 100644
index 05a4476c8..000000000
--- a/internal/pubsub/broker.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package pubsub
-
-import (
- "context"
- "fmt"
- "log/slog"
- "sync"
- "time"
-)
-
-const defaultChannelBufferSize = 100
-
-type Broker[T any] struct {
- subs map[chan Event[T]]context.CancelFunc
- mu sync.RWMutex
- isClosed bool
-}
-
-func NewBroker[T any]() *Broker[T] {
- return &Broker[T]{
- subs: make(map[chan Event[T]]context.CancelFunc),
- }
-}
-
-func (b *Broker[T]) Shutdown() {
- b.mu.Lock()
- if b.isClosed {
- b.mu.Unlock()
- return
- }
- b.isClosed = true
-
- for ch, cancel := range b.subs {
- cancel()
- close(ch)
- delete(b.subs, ch)
- }
- b.mu.Unlock()
- slog.Debug("PubSub broker shut down", "type", fmt.Sprintf("%T", *new(T)))
-}
-
-func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.isClosed {
- closedCh := make(chan Event[T])
- close(closedCh)
- return closedCh
- }
-
- subCtx, subCancel := context.WithCancel(ctx)
- subscriberChannel := make(chan Event[T], defaultChannelBufferSize)
- b.subs[subscriberChannel] = subCancel
-
- go func() {
- <-subCtx.Done()
- b.mu.Lock()
- defer b.mu.Unlock()
- if _, ok := b.subs[subscriberChannel]; ok {
- close(subscriberChannel)
- delete(b.subs, subscriberChannel)
- }
- }()
-
- return subscriberChannel
-}
-
-func (b *Broker[T]) Publish(eventType EventType, payload T) {
- b.mu.RLock()
- defer b.mu.RUnlock()
-
- if b.isClosed {
- slog.Warn("Attempted to publish on a closed pubsub broker", "type", eventType, "payload_type", fmt.Sprintf("%T", payload))
- return
- }
-
- event := Event[T]{Type: eventType, Payload: payload}
-
- for ch := range b.subs {
- // Non-blocking send with a fallback to a goroutine to prevent slow subscribers
- // from blocking the publisher.
- select {
- case ch <- event:
- // Successfully sent
- default:
- // Subscriber channel is full or receiver is slow.
- // Send in a new goroutine to avoid blocking the publisher.
- // This might lead to out-of-order delivery for this specific slow subscriber.
- go func(sChan chan Event[T], ev Event[T]) {
- // Re-check if broker is closed before attempting send in goroutine
- b.mu.RLock()
- isBrokerClosed := b.isClosed
- b.mu.RUnlock()
- if isBrokerClosed {
- return
- }
-
- select {
- case sChan <- ev:
- case <-time.After(2 * time.Second): // Timeout for slow subscriber
- slog.Warn("PubSub: Dropped event for slow subscriber after timeout", "type", ev.Type)
- }
- }(ch, event)
- }
- }
-}
-
-func (b *Broker[T]) GetSubscriberCount() int {
- b.mu.RLock()
- defer b.mu.RUnlock()
- return len(b.subs)
-}
diff --git a/internal/pubsub/broker_test.go b/internal/pubsub/broker_test.go
deleted file mode 100644
index b4caa98f3..000000000
--- a/internal/pubsub/broker_test.go
+++ /dev/null
@@ -1,144 +0,0 @@
-package pubsub
-
-import (
- "context"
- "sync"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestBrokerSubscribe(t *testing.T) {
- t.Parallel()
-
- t.Run("with cancellable context", func(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- ch := broker.Subscribe(ctx)
- assert.NotNil(t, ch)
- assert.Equal(t, 1, broker.GetSubscriberCount())
-
- // Cancel the context should remove the subscription
- cancel()
- time.Sleep(10 * time.Millisecond) // Give time for goroutine to process
- assert.Equal(t, 0, broker.GetSubscriberCount())
- })
-
- t.Run("with background context", func(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
-
- // Using context.Background() should not leak goroutines
- ch := broker.Subscribe(context.Background())
- assert.NotNil(t, ch)
- assert.Equal(t, 1, broker.GetSubscriberCount())
-
- // Shutdown should clean up all subscriptions
- broker.Shutdown()
- assert.Equal(t, 0, broker.GetSubscriberCount())
- })
-}
-
-func TestBrokerPublish(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
- ctx := t.Context()
-
- ch := broker.Subscribe(ctx)
-
- // Publish a message
- broker.Publish(EventTypeCreated, "test message")
-
- // Verify message is received
- select {
- case event := <-ch:
- assert.Equal(t, EventTypeCreated, event.Type)
- assert.Equal(t, "test message", event.Payload)
- case <-time.After(100 * time.Millisecond):
- t.Fatal("timeout waiting for message")
- }
-}
-
-func TestBrokerShutdown(t *testing.T) {
- t.Parallel()
- broker := NewBroker[string]()
-
- // Create multiple subscribers
- ch1 := broker.Subscribe(context.Background())
- ch2 := broker.Subscribe(context.Background())
-
- assert.Equal(t, 2, broker.GetSubscriberCount())
-
- // Shutdown should close all channels and clean up
- broker.Shutdown()
-
- // Verify channels are closed
- _, ok1 := <-ch1
- _, ok2 := <-ch2
- assert.False(t, ok1, "channel 1 should be closed")
- assert.False(t, ok2, "channel 2 should be closed")
-
- // Verify subscriber count is reset
- assert.Equal(t, 0, broker.GetSubscriberCount())
-}
-
-func TestBrokerConcurrency(t *testing.T) {
- t.Parallel()
- broker := NewBroker[int]()
-
- // Create a large number of subscribers
- const numSubscribers = 100
- var wg sync.WaitGroup
- wg.Add(numSubscribers)
-
- // Create a channel to collect received events
- receivedEvents := make(chan int, numSubscribers)
-
- for i := range numSubscribers {
- go func(id int) {
- defer wg.Done()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- ch := broker.Subscribe(ctx)
-
- // Receive one message then cancel
- select {
- case event := <-ch:
- receivedEvents <- event.Payload
- case <-time.After(1 * time.Second):
- t.Errorf("timeout waiting for message %d", id)
- }
- cancel()
- }(i)
- }
-
- // Give subscribers time to set up
- time.Sleep(10 * time.Millisecond)
-
- // Publish messages to all subscribers
- for i := range numSubscribers {
- broker.Publish(EventTypeCreated, i)
- }
-
- // Wait for all subscribers to finish
- wg.Wait()
- close(receivedEvents)
-
- // Give time for cleanup goroutines to run
- time.Sleep(10 * time.Millisecond)
-
- // Verify all subscribers are cleaned up
- assert.Equal(t, 0, broker.GetSubscriberCount())
-
- // Verify we received the expected number of events
- count := 0
- for range receivedEvents {
- count++
- }
- assert.Equal(t, numSubscribers, count)
-}
diff --git a/internal/pubsub/events.go b/internal/pubsub/events.go
deleted file mode 100644
index e3910f9f5..000000000
--- a/internal/pubsub/events.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package pubsub
-
-import "context"
-
-type EventType string
-
-const (
- EventTypeCreated EventType = "created"
- EventTypeUpdated EventType = "updated"
- EventTypeDeleted EventType = "deleted"
-)
-
-type Event[T any] struct {
- Type EventType
- Payload T
-}
-
-type Subscriber[T any] interface {
- Subscribe(ctx context.Context) <-chan Event[T]
-}
-
-type Publisher[T any] interface {
- Publish(eventType EventType, payload T)
-}