diff --git a/common/signal/pubsub/pubsub.go b/common/signal/pubsub/pubsub.go index d5ef8f184..7306ef2f9 100644 --- a/common/signal/pubsub/pubsub.go +++ b/common/signal/pubsub/pubsub.go @@ -5,13 +5,13 @@ import ( "time" "v2ray.com/core/common" + "v2ray.com/core/common/signal/done" "v2ray.com/core/common/task" ) type Subscriber struct { - name string - buffer chan interface{} - removed chan struct{} + buffer chan interface{} + done *done.Instance } func (s *Subscriber) push(msg interface{}) { @@ -25,62 +25,66 @@ func (s *Subscriber) Wait() <-chan interface{} { return s.buffer } -func (s *Subscriber) Close() { - close(s.removed) +func (s *Subscriber) Close() error { + return s.done.Close() } func (s *Subscriber) IsClosed() bool { - select { - case <-s.removed: - return true - default: - return false - } + return s.done.Done() } type Service struct { sync.RWMutex - subs []*Subscriber + subs map[string][]*Subscriber ctask *task.Periodic } func NewService() *Service { - s := &Service{} + s := &Service{ + subs: make(map[string][]*Subscriber), + } s.ctask = &task.Periodic{ - Execute: s.cleanup, + Execute: s.Cleanup, Interval: time.Second * 30, } common.Must(s.ctask.Start()) return s } -func (s *Service) cleanup() error { +// Cleanup cleans up internal caches of subscribers. +// Visible for testing only. +func (s *Service) Cleanup() error { s.Lock() defer s.Unlock() - if len(s.subs) < 16 { - return nil - } - - newSub := make([]*Subscriber, 0, len(s.subs)) - for _, sub := range s.subs { - if !sub.IsClosed() { - newSub = append(newSub, sub) + for name, subs := range s.subs { + newSub := make([]*Subscriber, 0, len(s.subs)) + for _, sub := range subs { + if !sub.IsClosed() { + newSub = append(newSub, sub) + } + } + if len(newSub) == 0 { + delete(s.subs, name) + } else { + s.subs[name] = newSub } } - s.subs = newSub + if len(s.subs) == 0 { + s.subs = make(map[string][]*Subscriber) + } return nil } func (s *Service) Subscribe(name string) *Subscriber { sub := &Subscriber{ - name: name, - buffer: make(chan interface{}, 16), - removed: make(chan struct{}), + buffer: make(chan interface{}, 16), + done: done.New(), } s.Lock() - s.subs = append(s.subs, sub) + subs := append(s.subs[name], sub) + s.subs[name] = subs s.Unlock() return sub } @@ -89,8 +93,8 @@ func (s *Service) Publish(name string, message interface{}) { s.RLock() defer s.RUnlock() - for _, sub := range s.subs { - if sub.name == name && !sub.IsClosed() { + for _, sub := range s.subs[name] { + if !sub.IsClosed() { sub.push(message) } } diff --git a/common/signal/pubsub/pubsub_test.go b/common/signal/pubsub/pubsub_test.go index 88a884093..99bf4e681 100644 --- a/common/signal/pubsub/pubsub_test.go +++ b/common/signal/pubsub/pubsub_test.go @@ -30,4 +30,6 @@ func TestPubsub(t *testing.T) { t.Fail() default: } + + service.Cleanup() }