1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-18 07:17:32 -05:00
v2fly/common/signal/pubsub/pubsub.go

106 lines
1.8 KiB
Go
Raw Normal View History

2018-07-01 06:38:40 -04:00
package pubsub
import (
"errors"
2018-07-01 06:38:40 -04:00
"sync"
"time"
2021-02-16 15:31:50 -05:00
"github.com/v2fly/v2ray-core/v4/common"
"github.com/v2fly/v2ray-core/v4/common/signal/done"
"github.com/v2fly/v2ray-core/v4/common/task"
2018-07-01 06:38:40 -04:00
)
type Subscriber struct {
2018-07-01 13:30:05 -04:00
buffer chan interface{}
done *done.Instance
2018-07-01 06:38:40 -04:00
}
func (s *Subscriber) push(msg interface{}) {
select {
case s.buffer <- msg:
default:
}
}
func (s *Subscriber) Wait() <-chan interface{} {
return s.buffer
}
2018-07-01 13:30:05 -04:00
func (s *Subscriber) Close() error {
return s.done.Close()
2018-07-01 06:38:40 -04:00
}
func (s *Subscriber) IsClosed() bool {
2018-07-01 13:30:05 -04:00
return s.done.Done()
2018-07-01 06:38:40 -04:00
}
type Service struct {
sync.RWMutex
2018-07-01 13:30:05 -04:00
subs map[string][]*Subscriber
2018-07-01 06:38:40 -04:00
ctask *task.Periodic
}
func NewService() *Service {
2018-07-01 13:30:05 -04:00
s := &Service{
subs: make(map[string][]*Subscriber),
}
2018-07-01 06:38:40 -04:00
s.ctask = &task.Periodic{
2018-07-01 13:30:05 -04:00
Execute: s.Cleanup,
2018-07-01 06:38:40 -04:00
Interval: time.Second * 30,
}
return s
}
2018-07-01 13:30:05 -04:00
// Cleanup cleans up internal caches of subscribers.
// Visible for testing only.
func (s *Service) Cleanup() error {
2018-07-01 06:38:40 -04:00
s.Lock()
defer s.Unlock()
if len(s.subs) == 0 {
return errors.New("nothing to do")
}
2018-07-01 13:30:05 -04:00
for name, subs := range s.subs {
newSub := make([]*Subscriber, 0, len(s.subs))
for _, sub := range subs {
if !sub.IsClosed() {
newSub = append(newSub, sub)
}
}
if len(newSub) == 0 {
delete(s.subs, name)
} else {
s.subs[name] = newSub
2018-07-01 06:38:40 -04:00
}
}
2018-07-01 13:30:05 -04:00
if len(s.subs) == 0 {
s.subs = make(map[string][]*Subscriber)
}
2018-07-01 06:38:40 -04:00
return nil
}
func (s *Service) Subscribe(name string) *Subscriber {
sub := &Subscriber{
2018-07-01 13:30:05 -04:00
buffer: make(chan interface{}, 16),
done: done.New(),
2018-07-01 06:38:40 -04:00
}
s.Lock()
2021-05-10 20:36:06 -04:00
s.subs[name] = append(s.subs[name], sub)
2018-07-01 06:38:40 -04:00
s.Unlock()
common.Must(s.ctask.Start())
2018-07-01 06:38:40 -04:00
return sub
}
func (s *Service) Publish(name string, message interface{}) {
s.RLock()
defer s.RUnlock()
2018-07-01 13:30:05 -04:00
for _, sub := range s.subs[name] {
if !sub.IsClosed() {
2018-07-01 06:38:40 -04:00
sub.push(message)
}
}
}