1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-10 11:27:26 -05:00

Merge pull request #197 from Vigilans/vigilans/stats-channel-runnable

Stats: Implement common.Runnable for Channel feature
This commit is contained in:
Kslr 2020-09-20 22:39:46 +08:00 committed by GitHub
commit 525d4e13a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 606 additions and 411 deletions

127
app/stats/channel.go Normal file
View File

@ -0,0 +1,127 @@
// +build !confonly
package stats
import (
"sync"
"time"
)
// Channel is an implementation of stats.Channel.
type Channel struct {
access sync.RWMutex
closed chan struct{}
channel chan interface{}
subscribers []chan interface{}
}
// Channel returns the underlying go channel.
func (c *Channel) Channel() chan interface{} {
c.access.RLock()
defer c.access.RUnlock()
return c.channel
}
// Subscribers implements stats.Channel.
func (c *Channel) Subscribers() []chan interface{} {
c.access.RLock()
defer c.access.RUnlock()
return c.subscribers
}
// Subscribe implements stats.Channel.
func (c *Channel) Subscribe() chan interface{} {
c.access.Lock()
defer c.access.Unlock()
subscriber := make(chan interface{})
c.subscribers = append(c.subscribers, subscriber)
return subscriber
}
// Unsubscribe implements stats.Channel.
func (c *Channel) Unsubscribe(subscriber chan interface{}) {
c.access.Lock()
defer c.access.Unlock()
for i, s := range c.subscribers {
if s == subscriber {
// Copy to new memory block to prevent modifying original data
subscribers := make([]chan interface{}, len(c.subscribers)-1)
copy(subscribers[:i], c.subscribers[:i])
copy(subscribers[i:], c.subscribers[i+1:])
c.subscribers = subscribers
return
}
}
}
// Publish implements stats.Channel.
func (c *Channel) Publish(message interface{}) {
select { // Early exit if channel closed
case <-c.closed:
return
default:
}
select { // Drop message if not successfully sent
case c.channel <- message:
default:
return
}
}
// Running returns whether the channel is running.
func (c *Channel) Running() bool {
select {
case <-c.closed: // Channel closed
default: // Channel running or not initialized
if c.closed != nil { // Channel initialized
return true
}
}
return false
}
// Start implements common.Runnable.
func (c *Channel) Start() error {
c.access.Lock()
defer c.access.Unlock()
if c.Running() {
return nil
}
if c.channel == nil { // Initialize publisher channel
c.channel = make(chan interface{}, 16)
}
c.closed = make(chan struct{}) // Reset close signal
go func() {
for {
select {
case message := <-c.channel: // Broadcast message
for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
select {
case sub <- message: // Successfully sent message
case <-time.After(100 * time.Millisecond):
c.Unsubscribe(sub) // Remove timeout subscriber
close(sub) // Actively close subscriber as notification
}
}
case <-c.closed: // Channel closed
for _, sub := range c.Subscribers() { // Remove all subscribers
c.Unsubscribe(sub)
close(sub)
}
return
}
}
}()
return nil
}
// Close implements common.Closable.
func (c *Channel) Close() error {
c.access.Lock()
defer c.access.Unlock()
if c.Running() {
close(c.closed) // Send closed signal
}
return nil
}

334
app/stats/channel_test.go Normal file
View File

@ -0,0 +1,334 @@
package stats_test
import (
"context"
"fmt"
"testing"
"time"
. "v2ray.com/core/app/stats"
"v2ray.com/core/common"
"v2ray.com/core/features/stats"
)
func TestStatsChannel(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
common.Must(m.Start())
defer m.Close()
source := c.(*Channel).Channel()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
defer c.Unsubscribe(b)
stopCh := make(chan struct{})
errCh := make(chan string)
go func() {
source <- 1
source <- 2
source <- "3"
source <- []int{4}
source <- nil // Dummy messsage with no subscriber receiving, will block reading goroutine
for i := 0; i < cap(source); i++ {
source <- nil // Fill source channel's buffer
}
select {
case source <- nil: // Source writing should be blocked here, for last message was not cleared and buffer was full
errCh <- fmt.Sprint("unexpected non-blocked source channel")
default:
close(stopCh)
}
}()
go func() {
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
if v, ok := (<-a).(string); !ok || v != "3" {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
}
if v, ok := (<-a).([]int); !ok || v[0] != 4 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
}
}()
go func() {
if v, ok := (<-b).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-b).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
if v, ok := (<-b).(string); !ok || v != "3" {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
}
if v, ok := (<-b).([]int); !ok || v[0] != 4 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
}
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
}
func TestStatsChannelUnsubcribe(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
common.Must(m.Start())
defer m.Close()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
pauseCh := make(chan struct{})
stopCh := make(chan struct{})
errCh := make(chan string)
{
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && bSet) {
t.Fatal("unexpected subscribers: ", c.Subscribers())
}
}
go func() {
c.Publish(1)
<-pauseCh // Wait for `b` goroutine to resume sending message
c.Publish(2)
}()
go func() {
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
}()
go func() {
if v, ok := (<-b).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
// Unsubscribe `b` while `source`'s messaging is paused
c.Unsubscribe(b)
{ // Test `b` is not in subscribers
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && !bSet) {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
}
}
// Resume `source`'s progress
close(pauseCh)
// Test `b` is neither closed nor able to receive any data
select {
case v, ok := <-b:
if ok {
errCh <- fmt.Sprint("unexpected data received: ", v)
} else {
errCh <- fmt.Sprint("unexpected closed channel: ", b)
}
default:
}
close(stopCh)
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
}
func TestStatsChannelTimeout(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
common.Must(m.Start())
defer m.Close()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
defer c.Unsubscribe(b)
stopCh := make(chan struct{})
errCh := make(chan string)
go func() {
c.Publish(1)
c.Publish(2)
}()
go func() {
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
{ // Test `b` is still in subscribers yet (because `a` receives 2 first)
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && bSet) {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
}
}
}()
go func() {
if v, ok := (<-b).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
// Block `b` channel for a time longer than `source`'s timeout
<-time.After(150 * time.Millisecond)
{ // Test `b` has been unsubscribed by source
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && !bSet) {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
}
}
select { // Test `b` has been closed by source
case v, ok := <-b:
if ok {
errCh <- fmt.Sprint("unexpected data received: ", v)
}
default:
}
close(stopCh)
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
}
func TestStatsChannelConcurrency(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
common.Must(m.Start())
defer m.Close()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
stopCh := make(chan struct{})
errCh := make(chan string)
go func() {
c.Publish(1)
c.Publish(2)
}()
go func() {
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
}()
go func() {
// Block `b` for a time shorter than `source`'s timeout
// So as to ensure source channel is trying to send message to `b`.
<-time.After(25 * time.Millisecond)
// This causes concurrency scenario: unsubscribe `b` while trying to send message to it
c.Unsubscribe(b)
// Test `b` is not closed and can still receive data 1:
// Because unsubscribe won't affect the ongoing process of sending message.
select {
case v, ok := <-b:
if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) {
errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1)
}
default:
errCh <- fmt.Sprint("unexpected block from receiving data: ", 1)
}
// Test `b` is not closed but cannot receive data 2:
// Becuase in a new round of messaging, `b` has been unsubscribed.
select {
case v, ok := <-b:
if ok {
errCh <- fmt.Sprint("unexpected receving: ", v)
} else {
errCh <- fmt.Sprint("unexpected closing of channel")
}
default:
}
close(stopCh)
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
}

25
app/stats/counter.go Normal file
View File

@ -0,0 +1,25 @@
// +build !confonly
package stats
import "sync/atomic"
// Counter is an implementation of stats.Counter.
type Counter struct {
value int64
}
// Value implements stats.Counter.
func (c *Counter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
// Set implements stats.Counter.
func (c *Counter) Set(newValue int64) int64 {
return atomic.SwapInt64(&c.value, newValue)
}
// Add implements stats.Counter.
func (c *Counter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}

31
app/stats/counter_test.go Normal file
View File

@ -0,0 +1,31 @@
package stats_test
import (
"context"
"testing"
. "v2ray.com/core/app/stats"
"v2ray.com/core/common"
"v2ray.com/core/features/stats"
)
func TestStatsCounter(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterCounter("test.counter")
common.Must(err)
if v := c.Add(1); v != 1 {
t.Fatal("unpexcted Add(1) return: ", v, ", wanted ", 1)
}
if v := c.Set(0); v != 1 {
t.Fatal("unexpected Set(0) return: ", v, ", wanted ", 1)
}
if v := c.Value(); v != 0 {
t.Fatal("unexpected Value() return: ", v, ", wanted ", 0)
}
}

View File

@ -7,98 +7,19 @@ package stats
import ( import (
"context" "context"
"sync" "sync"
"sync/atomic"
"time"
"v2ray.com/core/features/stats" "v2ray.com/core/features/stats"
) )
// Counter is an implementation of stats.Counter.
type Counter struct {
value int64
}
// Value implements stats.Counter.
func (c *Counter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
// Set implements stats.Counter.
func (c *Counter) Set(newValue int64) int64 {
return atomic.SwapInt64(&c.value, newValue)
}
// Add implements stats.Counter.
func (c *Counter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}
// Channel is an implementation of stats.Channel
type Channel struct {
channel chan interface{}
subscribers []chan interface{}
access sync.RWMutex
}
// Channel implements stats.Channel
func (c *Channel) Channel() chan interface{} {
return c.channel
}
// Subscribers implements stats.Channel
func (c *Channel) Subscribers() []chan interface{} {
c.access.RLock()
defer c.access.RUnlock()
return c.subscribers
}
// Subscribe implements stats.Channel
func (c *Channel) Subscribe() chan interface{} {
c.access.Lock()
defer c.access.Unlock()
ch := make(chan interface{})
c.subscribers = append(c.subscribers, ch)
return ch
}
// Unsubscribe implements stats.Channel
func (c *Channel) Unsubscribe(ch chan interface{}) {
c.access.Lock()
defer c.access.Unlock()
for i, s := range c.subscribers {
if s == ch {
// Copy to new memory block to prevent modifying original data
subscribers := make([]chan interface{}, len(c.subscribers)-1)
copy(subscribers[:i], c.subscribers[:i])
copy(subscribers[i:], c.subscribers[i+1:])
c.subscribers = subscribers
return
}
}
}
// Start starts the channel for listening to messsages
func (c *Channel) Start() {
for message := range c.Channel() {
subscribers := c.Subscribers() // Store a copy of slice value for concurrency safety
for _, sub := range subscribers {
select {
case sub <- message: // Successfully sent message
case <-time.After(100 * time.Millisecond):
c.Unsubscribe(sub) // Remove timeout subscriber
close(sub) // Actively close subscriber as notification
}
}
}
}
// Manager is an implementation of stats.Manager. // Manager is an implementation of stats.Manager.
type Manager struct { type Manager struct {
access sync.RWMutex access sync.RWMutex
counters map[string]*Counter counters map[string]*Counter
channels map[string]*Channel channels map[string]*Channel
running bool
} }
// NewManager creates an instance of Statistics Manager.
func NewManager(ctx context.Context, config *Config) (*Manager, error) { func NewManager(ctx context.Context, config *Config) (*Manager, error) {
m := &Manager{ m := &Manager{
counters: make(map[string]*Counter), counters: make(map[string]*Counter),
@ -108,6 +29,7 @@ func NewManager(ctx context.Context, config *Config) (*Manager, error) {
return m, nil return m, nil
} }
// Type implements common.HasType.
func (*Manager) Type() interface{} { func (*Manager) Type() interface{} {
return stats.ManagerType() return stats.ManagerType()
} }
@ -170,9 +92,11 @@ func (m *Manager) RegisterChannel(name string) (stats.Channel, error) {
return nil, newError("Channel ", name, " already registered.") return nil, newError("Channel ", name, " already registered.")
} }
newError("create new channel ", name).AtDebug().WriteToLog() newError("create new channel ", name).AtDebug().WriteToLog()
c := &Channel{channel: make(chan interface{})} c := new(Channel)
m.channels[name] = c m.channels[name] = c
go c.Start() if m.running {
c.Start()
}
return c, nil return c, nil
} }
@ -181,9 +105,10 @@ func (m *Manager) UnregisterChannel(name string) error {
m.access.Lock() m.access.Lock()
defer m.access.Unlock() defer m.access.Unlock()
if _, found := m.channels[name]; found { if c, found := m.channels[name]; found {
newError("remove channel ", name).AtDebug().WriteToLog() newError("remove channel ", name).AtDebug().WriteToLog()
delete(m.channels, name) delete(m.channels, name)
c.Close()
} }
return nil return nil
} }
@ -201,10 +126,24 @@ func (m *Manager) GetChannel(name string) stats.Channel {
// Start implements common.Runnable. // Start implements common.Runnable.
func (m *Manager) Start() error { func (m *Manager) Start() error {
m.access.Lock()
defer m.access.Unlock()
m.running = true
for _, channel := range m.channels {
channel.Start()
}
return nil return nil
} }
// Close implement common.Closable. // Close implement common.Closable.
func (m *Manager) Close() error { func (m *Manager) Close() error {
m.access.Lock()
defer m.access.Unlock()
m.running = false
for name, channel := range m.channels {
newError("remove channel ", name).AtDebug().WriteToLog()
delete(m.channels, name)
channel.Close()
}
return nil return nil
} }

View File

@ -2,7 +2,6 @@ package stats_test
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"time" "time"
@ -15,337 +14,72 @@ func TestInterface(t *testing.T) {
_ = (stats.Manager)(new(Manager)) _ = (stats.Manager)(new(Manager))
} }
func TestStatsCounter(t *testing.T) { func TestStatsChannelRunnable(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{}) raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err) common.Must(err)
m := raw.(stats.Manager) m := raw.(stats.Manager)
c, err := m.RegisterCounter("test.counter")
ch1, err := m.RegisterChannel("test.channel.1")
c1 := ch1.(*Channel)
common.Must(err) common.Must(err)
if v := c.Add(1); v != 1 { if c1.Running() {
t.Fatal("unpexcted Add(1) return: ", v, ", wanted ", 1) t.Fatalf("unexpected running channel: test.channel.%d", 1)
} }
if v := c.Set(0); v != 1 { common.Must(m.Start())
t.Fatal("unexpected Set(0) return: ", v, ", wanted ", 1)
if !c1.Running() {
t.Fatalf("unexpected non-running channel: test.channel.%d", 1)
} }
if v := c.Value(); v != 0 { ch2, err := m.RegisterChannel("test.channel.2")
t.Fatal("unexpected Value() return: ", v, ", wanted ", 0) c2 := ch2.(*Channel)
} common.Must(err)
}
if !c2.Running() {
func TestStatsChannel(t *testing.T) { t.Fatalf("unexpected non-running channel: test.channel.%d", 2)
raw, err := common.CreateObject(context.Background(), &Config{}) }
common.Must(err)
s1 := c1.Subscribe()
m := raw.(stats.Manager) common.Must(c1.Close())
c, err := m.RegisterChannel("test.channel")
common.Must(err) if c1.Running() {
t.Fatalf("unexpected running channel: test.channel.%d", 1)
source := c.Channel() }
a := c.Subscribe()
b := c.Subscribe() select { // Check all subscribers in closed channel are closed
defer c.Unsubscribe(a) case _, ok := <-s1:
defer c.Unsubscribe(b) if ok {
t.Fatalf("unexpected non-closed subscriber in channel: test.channel.%d", 1)
stopCh := make(chan struct{}) }
errCh := make(chan string) case <-time.After(500 * time.Millisecond):
t.Fatalf("unexpected non-closed subscriber in channel: test.channel.%d", 1)
go func() { }
source <- 1
source <- 2 if len(c1.Subscribers()) != 0 { // Check subscribers in closed channel are emptied
source <- "3" t.Fatalf("unexpected non-empty subscribers in channel: test.channel.%d", 1)
source <- []int{4} }
source <- nil // Dummy messsage with no subscriber receiving
select { common.Must(m.Close())
case source <- nil: // Source should be blocked here, for last message was not cleared
errCh <- fmt.Sprint("unexpected non-blocked source") if c2.Running() {
default: t.Fatalf("unexpected running channel: test.channel.%d", 2)
close(stopCh) }
}
}() ch3, err := m.RegisterChannel("test.channel.3")
c3 := ch3.(*Channel)
go func() { common.Must(err)
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1) if c3.Running() {
} t.Fatalf("unexpected running channel: test.channel.%d", 3)
if v, ok := (<-a).(int); !ok || v != 2 { }
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
} common.Must(c3.Start())
if v, ok := (<-a).(string); !ok || v != "3" { common.Must(m.UnregisterChannel("test.channel.3"))
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
} if c3.Running() { // Test that unregistering will close the channel.
if v, ok := (<-a).([]int); !ok || v[0] != 4 { t.Fatalf("unexpected running channel: test.channel.%d", 3)
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
}
}()
go func() {
if v, ok := (<-b).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-b).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
if v, ok := (<-b).(string); !ok || v != "3" {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", "3")
}
if v, ok := (<-b).([]int); !ok || v[0] != 4 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", []int{4})
}
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
}
func TestStatsChannelUnsubcribe(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
source := c.Channel()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
pauseCh := make(chan struct{})
stopCh := make(chan struct{})
errCh := make(chan string)
{
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && bSet) {
t.Fatal("unexpected subscribers: ", c.Subscribers())
}
}
go func() {
source <- 1
<-pauseCh // Wait for `b` goroutine to resume sending message
source <- 2
}()
go func() {
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
}()
go func() {
if v, ok := (<-b).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
// Unsubscribe `b` while `source`'s messaging is paused
c.Unsubscribe(b)
{ // Test `b` is not in subscribers
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && !bSet) {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
}
}
// Resume `source`'s progress
close(pauseCh)
// Test `b` is neither closed nor able to receive any data
select {
case v, ok := <-b:
if ok {
errCh <- fmt.Sprint("unexpected data received: ", v)
} else {
errCh <- fmt.Sprint("unexpected closed channel: ", b)
}
default:
}
close(stopCh)
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
}
func TestStatsChannelTimeout(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
source := c.Channel()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
defer c.Unsubscribe(b)
stopCh := make(chan struct{})
errCh := make(chan string)
go func() {
source <- 1
source <- 2
}()
go func() {
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
{ // Test `b` is still in subscribers yet (because `a` receives 2 first)
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && bSet) {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
}
}
}()
go func() {
if v, ok := (<-b).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
// Block `b` channel for a time longer than `source`'s timeout
<-time.After(150 * time.Millisecond)
{ // Test `b` has been unsubscribed by source
var aSet, bSet bool
for _, s := range c.Subscribers() {
if s == a {
aSet = true
}
if s == b {
bSet = true
}
}
if !(aSet && !bSet) {
errCh <- fmt.Sprint("unexpected subscribers: ", c.Subscribers())
}
}
select { // Test `b` has been closed by source
case v, ok := <-b:
if ok {
errCh <- fmt.Sprint("unexpected data received: ", v)
}
default:
}
close(stopCh)
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
}
}
func TestStatsChannelConcurrency(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
source := c.Channel()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
stopCh := make(chan struct{})
errCh := make(chan string)
go func() {
source <- 1
source <- 2
}()
go func() {
if v, ok := (<-a).(int); !ok || v != 1 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
if v, ok := (<-a).(int); !ok || v != 2 {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 2)
}
}()
go func() {
// Block `b` for a time shorter than `source`'s timeout
// So as to ensure source channel is trying to send message to `b`.
<-time.After(25 * time.Millisecond)
// This causes concurrency scenario: unsubscribe `b` while trying to send message to it
c.Unsubscribe(b)
// Test `b` is not closed and can still receive data 1:
// Because unsubscribe won't affect the ongoing process of sending message.
select {
case v, ok := <-b:
if v1, ok1 := v.(int); !(ok && ok1 && v1 == 1) {
errCh <- fmt.Sprint("unexpected failure in receiving data: ", 1)
}
default:
errCh <- fmt.Sprint("unexpected block from receiving data: ", 1)
}
// Test `b` is not closed but cannot receive data 2:
// Becuase in a new round of messaging, `b` has been unsubscribed.
select {
case v, ok := <-b:
if ok {
errCh <- fmt.Sprint("unexpected receving: ", v)
} else {
errCh <- fmt.Sprint("unexpected closing of channel")
}
default:
}
close(stopCh)
}()
select {
case <-time.After(2 * time.Second):
t.Fatal("Test timeout after 2s")
case e := <-errCh:
t.Fatal(e)
case <-stopCh:
} }
} }

View File

@ -2,7 +2,10 @@ package stats
//go:generate errorgen //go:generate errorgen
import "v2ray.com/core/features" import (
"v2ray.com/core/common"
"v2ray.com/core/features"
)
// Counter is the interface for stats counters. // Counter is the interface for stats counters.
// //
@ -16,12 +19,14 @@ type Counter interface {
Add(int64) int64 Add(int64) int64
} }
// Channel is the interface for stats channel // Channel is the interface for stats channel.
// //
// v2ray:api:stable // v2ray:api:stable
type Channel interface { type Channel interface {
// Channel returns the underlying go channel. // Channel is a runnable unit.
Channel() chan interface{} common.Runnable
// Publish broadcasts a message through the channel.
Publish(interface{})
// SubscriberCount returns the number of the subscribers. // SubscriberCount returns the number of the subscribers.
Subscribers() []chan interface{} Subscribers() []chan interface{}
// Subscribe registers for listening to channel stream and returns a new listener channel. // Subscribe registers for listening to channel stream and returns a new listener channel.