diff --git a/app/stats/channel.go b/app/stats/channel.go new file mode 100644 index 000000000..478994ce2 --- /dev/null +++ b/app/stats/channel.go @@ -0,0 +1,127 @@ +// +build !confonly + +package stats + +import ( + "sync" + "time" +) + +// Channel is an implementation of stats.Channel. +type Channel struct { + access sync.RWMutex + closed chan struct{} + + channel chan interface{} + subscribers []chan interface{} +} + +// Channel returns the underlying go channel. +func (c *Channel) Channel() chan interface{} { + c.access.RLock() + defer c.access.RUnlock() + return c.channel +} + +// Subscribers implements stats.Channel. +func (c *Channel) Subscribers() []chan interface{} { + c.access.RLock() + defer c.access.RUnlock() + return c.subscribers +} + +// Subscribe implements stats.Channel. +func (c *Channel) Subscribe() chan interface{} { + c.access.Lock() + defer c.access.Unlock() + subscriber := make(chan interface{}) + c.subscribers = append(c.subscribers, subscriber) + return subscriber +} + +// Unsubscribe implements stats.Channel. +func (c *Channel) Unsubscribe(subscriber chan interface{}) { + c.access.Lock() + defer c.access.Unlock() + for i, s := range c.subscribers { + if s == subscriber { + // Copy to new memory block to prevent modifying original data + subscribers := make([]chan interface{}, len(c.subscribers)-1) + copy(subscribers[:i], c.subscribers[:i]) + copy(subscribers[i:], c.subscribers[i+1:]) + c.subscribers = subscribers + return + } + } +} + +// Publish implements stats.Channel. +func (c *Channel) Publish(message interface{}) { + select { // Early exit if channel closed + case <-c.closed: + return + default: + } + select { // Drop message if not successfully sent + case c.channel <- message: + default: + return + } +} + +// Running returns whether the channel is running. +func (c *Channel) Running() bool { + select { + case <-c.closed: // Channel closed + default: // Channel running or not initialized + if c.closed != nil { // Channel initialized + return true + } + } + return false +} + +// Start implements common.Runnable. +func (c *Channel) Start() error { + c.access.Lock() + defer c.access.Unlock() + if c.Running() { + return nil + } + if c.channel == nil { // Initialize publisher channel + c.channel = make(chan interface{}, 16) + } + c.closed = make(chan struct{}) // Reset close signal + go func() { + for { + select { + case message := <-c.channel: // Broadcast message + for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement + select { + case sub <- message: // Successfully sent message + case <-time.After(100 * time.Millisecond): + c.Unsubscribe(sub) // Remove timeout subscriber + close(sub) // Actively close subscriber as notification + } + } + case <-c.closed: // Channel closed + for _, sub := range c.Subscribers() { // Remove all subscribers + c.Unsubscribe(sub) + close(sub) + } + return + } + } + }() + return nil +} + +// Close implements common.Closable. +func (c *Channel) Close() error { + c.access.Lock() + defer c.access.Unlock() + if c.Running() { + close(c.closed) // Send closed signal + } + return nil +} diff --git a/app/stats/channel_test.go b/app/stats/channel_test.go new file mode 100644 index 000000000..8e78ddb7a --- /dev/null +++ b/app/stats/channel_test.go @@ -0,0 +1,334 @@ +package stats_test + +import ( + "context" + "fmt" + "testing" + "time" + + . "v2ray.com/core/app/stats" + "v2ray.com/core/common" + "v2ray.com/core/features/stats" +) + +func TestStatsChannel(t *testing.T) { + raw, err := common.CreateObject(context.Background(), &Config{}) + common.Must(err) + + m := raw.(stats.Manager) + c, err := m.RegisterChannel("test.channel") + common.Must(err) + common.Must(m.Start()) + defer m.Close() + + source := c.(*Channel).Channel() + a := c.Subscribe() + b := c.Subscribe() + defer c.Unsubscribe(a) + defer c.Unsubscribe(b) + + stopCh := make(chan struct{}) + errCh := make(chan string) + + go func() { + source <- 1 + source <- 2 + source <- "3" + source <- []int{4} + source <- nil // Dummy messsage with no subscriber receiving, will block reading goroutine + for i := 0; i < cap(source); i++ { + source <- nil // Fill source channel's buffer + } + select { + case source <- nil: // Source writing should be blocked here, for last message was not cleared and buffer was full + errCh <- fmt.Sprint("unexpected non-blocked source channel") + default: + close(stopCh) + } + }() + + go func() { + if v, ok := (<-a).(int); !ok || v != 1 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) + } + if v, ok := (<-a).(int); !ok || v != 2 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) + } + if v, ok := (<-a).(string); !ok || v != "3" { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3") + } + if v, ok := (<-a).([]int); !ok || v[0] != 4 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) + } + }() + + go func() { + if v, ok := (<-b).(int); !ok || v != 1 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) + } + if v, ok := (<-b).(int); !ok || v != 2 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) + } + if v, ok := (<-b).(string); !ok || v != "3" { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3") + } + if v, ok := (<-b).([]int); !ok || v[0] != 4 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) + } + }() + + select { + case <-time.After(2 * time.Second): + t.Fatal("Test timeout after 2s") + case e := <-errCh: + t.Fatal(e) + case <-stopCh: + } +} + +func TestStatsChannelUnsubcribe(t *testing.T) { + raw, err := common.CreateObject(context.Background(), &Config{}) + common.Must(err) + + m := raw.(stats.Manager) + c, err := m.RegisterChannel("test.channel") + common.Must(err) + common.Must(m.Start()) + defer m.Close() + + a := c.Subscribe() + b := c.Subscribe() + defer c.Unsubscribe(a) + + pauseCh := make(chan struct{}) + stopCh := make(chan struct{}) + errCh := make(chan string) + + { + var aSet, bSet bool + for _, s := range c.Subscribers() { + if s == a { + aSet = true + } + if s == b { + bSet = true + } + } + if !(aSet && bSet) { + t.Fatal("unexpected subscribers: ", c.Subscribers()) + } + } + + go func() { + c.Publish(1) + <-pauseCh // Wait for `b` goroutine to resume sending message + c.Publish(2) + }() + + go func() { + if v, ok := (<-a).(int); !ok || v != 1 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) + } + if v, ok := (<-a).(int); !ok || v != 2 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) + } + }() + + go func() { + if v, ok := (<-b).(int); !ok || v != 1 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) + } + // Unsubscribe `b` while `source`'s messaging is paused + c.Unsubscribe(b) + { // Test `b` is not in subscribers + var aSet, bSet bool + for _, s := range c.Subscribers() { + if s == a { + aSet = true + } + if s == b { + bSet = true + } + } + if !(aSet && !bSet) { + errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) + } + } + // Resume `source`'s progress + close(pauseCh) + // Test `b` is neither closed nor able to receive any data + select { + case v, ok := <-b: + if ok { + errCh <- fmt.Sprint("unexpected data received: ", v) + } else { + errCh <- fmt.Sprint("unexpected closed channel: ", b) + } + default: + } + close(stopCh) + }() + + select { + case <-time.After(2 * time.Second): + t.Fatal("Test timeout after 2s") + case e := <-errCh: + t.Fatal(e) + case <-stopCh: + } +} + +func TestStatsChannelTimeout(t *testing.T) { + raw, err := common.CreateObject(context.Background(), &Config{}) + common.Must(err) + + m := raw.(stats.Manager) + c, err := m.RegisterChannel("test.channel") + common.Must(err) + common.Must(m.Start()) + defer m.Close() + + a := c.Subscribe() + b := c.Subscribe() + defer c.Unsubscribe(a) + defer c.Unsubscribe(b) + + stopCh := make(chan struct{}) + errCh := make(chan string) + + go func() { + c.Publish(1) + c.Publish(2) + }() + + go func() { + if v, ok := (<-a).(int); !ok || v != 1 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) + } + if v, ok := (<-a).(int); !ok || v != 2 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) + } + { // Test `b` is still in subscribers yet (because `a` receives 2 first) + var aSet, bSet bool + for _, s := range c.Subscribers() { + if s == a { + aSet = true + } + if s == b { + bSet = true + } + } + if !(aSet && bSet) { + errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) + } + } + }() + + go func() { + if v, ok := (<-b).(int); !ok || v != 1 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) + } + // Block `b` channel for a time longer than `source`'s timeout + <-time.After(150 * time.Millisecond) + { // Test `b` has been unsubscribed by source + var aSet, bSet bool + for _, s := range c.Subscribers() { + if s == a { + aSet = true + } + if s == b { + bSet = true + } + } + if !(aSet && !bSet) { + errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) + } + } + select { // Test `b` has been closed by source + case v, ok := <-b: + if ok { + errCh <- fmt.Sprint("unexpected data received: ", v) + } + default: + } + close(stopCh) + }() + + select { + case <-time.After(2 * time.Second): + t.Fatal("Test timeout after 2s") + case e := <-errCh: + t.Fatal(e) + case <-stopCh: + } +} + +func TestStatsChannelConcurrency(t *testing.T) { + raw, err := common.CreateObject(context.Background(), &Config{}) + common.Must(err) + + m := raw.(stats.Manager) + c, err := m.RegisterChannel("test.channel") + common.Must(err) + common.Must(m.Start()) + defer m.Close() + + a := c.Subscribe() + b := c.Subscribe() + defer c.Unsubscribe(a) + + stopCh := make(chan struct{}) + errCh := make(chan string) + + go func() { + c.Publish(1) + c.Publish(2) + }() + + go func() { + if v, ok := (<-a).(int); !ok || v != 1 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) + } + if v, ok := (<-a).(int); !ok || v != 2 { + errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) + } + }() + + go func() { + // Block `b` for a time shorter than `source`'s timeout + // So as to ensure source channel is trying to send message to `b`. + <-time.After(25 * time.Millisecond) + // This causes concurrency scenario: unsubscribe `b` while trying to send message to it + c.Unsubscribe(b) + // Test `b` is not closed and can still receive data 1: + // Because unsubscribe won't affect the ongoing process of sending message. + select { + case v, ok := <-b: + if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) { + errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1) + } + default: + errCh <- fmt.Sprint("unexpected block from receiving data: ", 1) + } + // Test `b` is not closed but cannot receive data 2: + // Becuase in a new round of messaging, `b` has been unsubscribed. + select { + case v, ok := <-b: + if ok { + errCh <- fmt.Sprint("unexpected receving: ", v) + } else { + errCh <- fmt.Sprint("unexpected closing of channel") + } + default: + } + close(stopCh) + }() + + select { + case <-time.After(2 * time.Second): + t.Fatal("Test timeout after 2s") + case e := <-errCh: + t.Fatal(e) + case <-stopCh: + } +} diff --git a/app/stats/counter.go b/app/stats/counter.go new file mode 100644 index 000000000..c4e120133 --- /dev/null +++ b/app/stats/counter.go @@ -0,0 +1,25 @@ +// +build !confonly + +package stats + +import "sync/atomic" + +// Counter is an implementation of stats.Counter. +type Counter struct { + value int64 +} + +// Value implements stats.Counter. +func (c *Counter) Value() int64 { + return atomic.LoadInt64(&c.value) +} + +// Set implements stats.Counter. +func (c *Counter) Set(newValue int64) int64 { + return atomic.SwapInt64(&c.value, newValue) +} + +// Add implements stats.Counter. +func (c *Counter) Add(delta int64) int64 { + return atomic.AddInt64(&c.value, delta) +} diff --git a/app/stats/counter_test.go b/app/stats/counter_test.go new file mode 100644 index 000000000..f2594e1ee --- /dev/null +++ b/app/stats/counter_test.go @@ -0,0 +1,31 @@ +package stats_test + +import ( + "context" + "testing" + + . "v2ray.com/core/app/stats" + "v2ray.com/core/common" + "v2ray.com/core/features/stats" +) + +func TestStatsCounter(t *testing.T) { + raw, err := common.CreateObject(context.Background(), &Config{}) + common.Must(err) + + m := raw.(stats.Manager) + c, err := m.RegisterCounter("test.counter") + common.Must(err) + + if v := c.Add(1); v != 1 { + t.Fatal("unpexcted Add(1) return: ", v, ", wanted ", 1) + } + + if v := c.Set(0); v != 1 { + t.Fatal("unexpected Set(0) return: ", v, ", wanted ", 1) + } + + if v := c.Value(); v != 0 { + t.Fatal("unexpected Value() return: ", v, ", wanted ", 0) + } +} diff --git a/app/stats/stats.go b/app/stats/stats.go index 7e2122df7..0dd91ea43 100644 --- a/app/stats/stats.go +++ b/app/stats/stats.go @@ -7,98 +7,19 @@ package stats import ( "context" "sync" - "sync/atomic" - "time" "v2ray.com/core/features/stats" ) -// Counter is an implementation of stats.Counter. -type Counter struct { - value int64 -} - -// Value implements stats.Counter. -func (c *Counter) Value() int64 { - return atomic.LoadInt64(&c.value) -} - -// Set implements stats.Counter. -func (c *Counter) Set(newValue int64) int64 { - return atomic.SwapInt64(&c.value, newValue) -} - -// Add implements stats.Counter. -func (c *Counter) Add(delta int64) int64 { - return atomic.AddInt64(&c.value, delta) -} - -// Channel is an implementation of stats.Channel -type Channel struct { - channel chan interface{} - subscribers []chan interface{} - access sync.RWMutex -} - -// Channel implements stats.Channel -func (c *Channel) Channel() chan interface{} { - return c.channel -} - -// Subscribers implements stats.Channel -func (c *Channel) Subscribers() []chan interface{} { - c.access.RLock() - defer c.access.RUnlock() - return c.subscribers -} - -// Subscribe implements stats.Channel -func (c *Channel) Subscribe() chan interface{} { - c.access.Lock() - defer c.access.Unlock() - ch := make(chan interface{}) - c.subscribers = append(c.subscribers, ch) - return ch -} - -// Unsubscribe implements stats.Channel -func (c *Channel) Unsubscribe(ch chan interface{}) { - c.access.Lock() - defer c.access.Unlock() - for i, s := range c.subscribers { - if s == ch { - // Copy to new memory block to prevent modifying original data - subscribers := make([]chan interface{}, len(c.subscribers)-1) - copy(subscribers[:i], c.subscribers[:i]) - copy(subscribers[i:], c.subscribers[i+1:]) - c.subscribers = subscribers - return - } - } -} - -// Start starts the channel for listening to messsages -func (c *Channel) Start() { - for message := range c.Channel() { - subscribers := c.Subscribers() // Store a copy of slice value for concurrency safety - for _, sub := range subscribers { - select { - case sub <- message: // Successfully sent message - case <-time.After(100 * time.Millisecond): - c.Unsubscribe(sub) // Remove timeout subscriber - close(sub) // Actively close subscriber as notification - } - } - } -} - // Manager is an implementation of stats.Manager. type Manager struct { access sync.RWMutex counters map[string]*Counter channels map[string]*Channel + running bool } +// NewManager creates an instance of Statistics Manager. func NewManager(ctx context.Context, config *Config) (*Manager, error) { m := &Manager{ counters: make(map[string]*Counter), @@ -108,6 +29,7 @@ func NewManager(ctx context.Context, config *Config) (*Manager, error) { return m, nil } +// Type implements common.HasType. func (*Manager) Type() interface{} { return stats.ManagerType() } @@ -170,9 +92,11 @@ func (m *Manager) RegisterChannel(name string) (stats.Channel, error) { return nil, newError("Channel ", name, " already registered.") } newError("create new channel ", name).AtDebug().WriteToLog() - c := &Channel{channel: make(chan interface{})} + c := new(Channel) m.channels[name] = c - go c.Start() + if m.running { + c.Start() + } return c, nil } @@ -181,9 +105,10 @@ func (m *Manager) UnregisterChannel(name string) error { m.access.Lock() defer m.access.Unlock() - if _, found := m.channels[name]; found { + if c, found := m.channels[name]; found { newError("remove channel ", name).AtDebug().WriteToLog() delete(m.channels, name) + c.Close() } return nil } @@ -201,10 +126,24 @@ func (m *Manager) GetChannel(name string) stats.Channel { // Start implements common.Runnable. func (m *Manager) Start() error { + m.access.Lock() + defer m.access.Unlock() + m.running = true + for _, channel := range m.channels { + channel.Start() + } return nil } // Close implement common.Closable. func (m *Manager) Close() error { + m.access.Lock() + defer m.access.Unlock() + m.running = false + for name, channel := range m.channels { + newError("remove channel ", name).AtDebug().WriteToLog() + delete(m.channels, name) + channel.Close() + } return nil } diff --git a/app/stats/stats_test.go b/app/stats/stats_test.go index 0c724257b..f4079b802 100644 --- a/app/stats/stats_test.go +++ b/app/stats/stats_test.go @@ -2,7 +2,6 @@ package stats_test import ( "context" - "fmt" "testing" "time" @@ -15,337 +14,72 @@ func TestInterface(t *testing.T) { _ = (stats.Manager)(new(Manager)) } -func TestStatsCounter(t *testing.T) { +func TestStatsChannelRunnable(t *testing.T) { raw, err := common.CreateObject(context.Background(), &Config{}) common.Must(err) m := raw.(stats.Manager) - c, err := m.RegisterCounter("test.counter") + + ch1, err := m.RegisterChannel("test.channel.1") + c1 := ch1.(*Channel) common.Must(err) - if v := c.Add(1); v != 1 { - t.Fatal("unpexcted Add(1) return: ", v, ", wanted ", 1) + if c1.Running() { + t.Fatalf("unexpected running channel: test.channel.%d", 1) } - if v := c.Set(0); v != 1 { - t.Fatal("unexpected Set(0) return: ", v, ", wanted ", 1) + common.Must(m.Start()) + + if !c1.Running() { + t.Fatalf("unexpected non-running channel: test.channel.%d", 1) } - if v := c.Value(); v != 0 { - t.Fatal("unexpected Value() return: ", v, ", wanted ", 0) - } -} - -func TestStatsChannel(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) - common.Must(err) - - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - - source := c.Channel() - a := c.Subscribe() - b := c.Subscribe() - defer c.Unsubscribe(a) - defer c.Unsubscribe(b) - - stopCh := make(chan struct{}) - errCh := make(chan string) - - go func() { - source <- 1 - source <- 2 - source <- "3" - source <- []int{4} - source <- nil // Dummy messsage with no subscriber receiving - select { - case source <- nil: // Source should be blocked here, for last message was not cleared - errCh <- fmt.Sprint("unexpected non-blocked source") - default: - close(stopCh) - } - }() - - go func() { - if v, ok := (<-a).(int); !ok || v != 1 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) - } - if v, ok := (<-a).(int); !ok || v != 2 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) - } - if v, ok := (<-a).(string); !ok || v != "3" { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3") - } - if v, ok := (<-a).([]int); !ok || v[0] != 4 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) - } - }() - - go func() { - if v, ok := (<-b).(int); !ok || v != 1 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) - } - if v, ok := (<-b).(int); !ok || v != 2 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) - } - if v, ok := (<-b).(string); !ok || v != "3" { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3") - } - if v, ok := (<-b).([]int); !ok || v[0] != 4 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4}) - } - }() - - select { - case <-time.After(2 * time.Second): - t.Fatal("Test timeout after 2s") - case e := <-errCh: - t.Fatal(e) - case <-stopCh: - } -} - -func TestStatsChannelUnsubcribe(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) - common.Must(err) - - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - - source := c.Channel() - a := c.Subscribe() - b := c.Subscribe() - defer c.Unsubscribe(a) - - pauseCh := make(chan struct{}) - stopCh := make(chan struct{}) - errCh := make(chan string) - - { - var aSet, bSet bool - for _, s := range c.Subscribers() { - if s == a { - aSet = true - } - if s == b { - bSet = true - } - } - if !(aSet && bSet) { - t.Fatal("unexpected subscribers: ", c.Subscribers()) - } - } - - go func() { - source <- 1 - <-pauseCh // Wait for `b` goroutine to resume sending message - source <- 2 - }() - - go func() { - if v, ok := (<-a).(int); !ok || v != 1 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) - } - if v, ok := (<-a).(int); !ok || v != 2 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) - } - }() - - go func() { - if v, ok := (<-b).(int); !ok || v != 1 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) - } - // Unsubscribe `b` while `source`'s messaging is paused - c.Unsubscribe(b) - { // Test `b` is not in subscribers - var aSet, bSet bool - for _, s := range c.Subscribers() { - if s == a { - aSet = true - } - if s == b { - bSet = true - } - } - if !(aSet && !bSet) { - errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) - } - } - // Resume `source`'s progress - close(pauseCh) - // Test `b` is neither closed nor able to receive any data - select { - case v, ok := <-b: - if ok { - errCh <- fmt.Sprint("unexpected data received: ", v) - } else { - errCh <- fmt.Sprint("unexpected closed channel: ", b) - } - default: - } - close(stopCh) - }() - - select { - case <-time.After(2 * time.Second): - t.Fatal("Test timeout after 2s") - case e := <-errCh: - t.Fatal(e) - case <-stopCh: - } -} - -func TestStatsChannelTimeout(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) - common.Must(err) - - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - - source := c.Channel() - a := c.Subscribe() - b := c.Subscribe() - defer c.Unsubscribe(a) - defer c.Unsubscribe(b) - - stopCh := make(chan struct{}) - errCh := make(chan string) - - go func() { - source <- 1 - source <- 2 - }() - - go func() { - if v, ok := (<-a).(int); !ok || v != 1 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) - } - if v, ok := (<-a).(int); !ok || v != 2 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) - } - { // Test `b` is still in subscribers yet (because `a` receives 2 first) - var aSet, bSet bool - for _, s := range c.Subscribers() { - if s == a { - aSet = true - } - if s == b { - bSet = true - } - } - if !(aSet && bSet) { - errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) - } - } - }() - - go func() { - if v, ok := (<-b).(int); !ok || v != 1 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) - } - // Block `b` channel for a time longer than `source`'s timeout - <-time.After(150 * time.Millisecond) - { // Test `b` has been unsubscribed by source - var aSet, bSet bool - for _, s := range c.Subscribers() { - if s == a { - aSet = true - } - if s == b { - bSet = true - } - } - if !(aSet && !bSet) { - errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers()) - } - } - select { // Test `b` has been closed by source - case v, ok := <-b: - if ok { - errCh <- fmt.Sprint("unexpected data received: ", v) - } - default: - } - close(stopCh) - }() - - select { - case <-time.After(2 * time.Second): - t.Fatal("Test timeout after 2s") - case e := <-errCh: - t.Fatal(e) - case <-stopCh: - } -} - -func TestStatsChannelConcurrency(t *testing.T) { - raw, err := common.CreateObject(context.Background(), &Config{}) - common.Must(err) - - m := raw.(stats.Manager) - c, err := m.RegisterChannel("test.channel") - common.Must(err) - - source := c.Channel() - a := c.Subscribe() - b := c.Subscribe() - defer c.Unsubscribe(a) - - stopCh := make(chan struct{}) - errCh := make(chan string) - - go func() { - source <- 1 - source <- 2 - }() - - go func() { - if v, ok := (<-a).(int); !ok || v != 1 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) - } - if v, ok := (<-a).(int); !ok || v != 2 { - errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2) - } - }() - - go func() { - // Block `b` for a time shorter than `source`'s timeout - // So as to ensure source channel is trying to send message to `b`. - <-time.After(25 * time.Millisecond) - // This causes concurrency scenario: unsubscribe `b` while trying to send message to it - c.Unsubscribe(b) - // Test `b` is not closed and can still receive data 1: - // Because unsubscribe won't affect the ongoing process of sending message. - select { - case v, ok := <-b: - if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) { - errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1) - } - default: - errCh <- fmt.Sprint("unexpected block from receiving data: ", 1) - } - // Test `b` is not closed but cannot receive data 2: - // Becuase in a new round of messaging, `b` has been unsubscribed. - select { - case v, ok := <-b: - if ok { - errCh <- fmt.Sprint("unexpected receving: ", v) - } else { - errCh <- fmt.Sprint("unexpected closing of channel") - } - default: - } - close(stopCh) - }() - - select { - case <-time.After(2 * time.Second): - t.Fatal("Test timeout after 2s") - case e := <-errCh: - t.Fatal(e) - case <-stopCh: + ch2, err := m.RegisterChannel("test.channel.2") + c2 := ch2.(*Channel) + common.Must(err) + + if !c2.Running() { + t.Fatalf("unexpected non-running channel: test.channel.%d", 2) + } + + s1 := c1.Subscribe() + common.Must(c1.Close()) + + if c1.Running() { + t.Fatalf("unexpected running channel: test.channel.%d", 1) + } + + select { // Check all subscribers in closed channel are closed + case _, ok := <-s1: + if ok { + t.Fatalf("unexpected non-closed subscriber in channel: test.channel.%d", 1) + } + case <-time.After(500 * time.Millisecond): + t.Fatalf("unexpected non-closed subscriber in channel: test.channel.%d", 1) + } + + if len(c1.Subscribers()) != 0 { // Check subscribers in closed channel are emptied + t.Fatalf("unexpected non-empty subscribers in channel: test.channel.%d", 1) + } + + common.Must(m.Close()) + + if c2.Running() { + t.Fatalf("unexpected running channel: test.channel.%d", 2) + } + + ch3, err := m.RegisterChannel("test.channel.3") + c3 := ch3.(*Channel) + common.Must(err) + + if c3.Running() { + t.Fatalf("unexpected running channel: test.channel.%d", 3) + } + + common.Must(c3.Start()) + common.Must(m.UnregisterChannel("test.channel.3")) + + if c3.Running() { // Test that unregistering will close the channel. + t.Fatalf("unexpected running channel: test.channel.%d", 3) } } diff --git a/features/stats/stats.go b/features/stats/stats.go index a27b441c6..cfe6d3070 100644 --- a/features/stats/stats.go +++ b/features/stats/stats.go @@ -2,7 +2,10 @@ package stats //go:generate errorgen -import "v2ray.com/core/features" +import ( + "v2ray.com/core/common" + "v2ray.com/core/features" +) // Counter is the interface for stats counters. // @@ -16,12 +19,14 @@ type Counter interface { Add(int64) int64 } -// Channel is the interface for stats channel +// Channel is the interface for stats channel. // // v2ray:api:stable type Channel interface { - // Channel returns the underlying go channel. - Channel() chan interface{} + // Channel is a runnable unit. + common.Runnable + // Publish broadcasts a message through the channel. + Publish(interface{}) // SubscriberCount returns the number of the subscribers. Subscribers() []chan interface{} // Subscribe registers for listening to channel stream and returns a new listener channel.