move stats and inbound to features directory

This commit is contained in:
Darien Raymond 2018-10-11 21:14:53 +02:00
parent b6dc31d3fe
commit 273342d0b9
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
15 changed files with 161 additions and 138 deletions

View File

@ -19,6 +19,7 @@ import (
"v2ray.com/core/common/vio"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/routing"
feature_stats "v2ray.com/core/features/stats"
"v2ray.com/core/transport/pipe"
)
@ -85,7 +86,7 @@ type DefaultDispatcher struct {
ohm outbound.HandlerManager
router routing.Router
policy core.PolicyManager
stats core.StatManager
stats feature_stats.Manager
}
// NewDefaultDispatcher create a new DefaultDispatcher.
@ -132,7 +133,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context) (*vio.Link, *vio.Link)
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil {
if c, _ := feature_stats.GetOrRegisterCounter(d.stats, name); c != nil {
inboundLink.Writer = &stats.SizeStatWriter{
Counter: c,
Writer: inboundLink.Writer,
@ -141,7 +142,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context) (*vio.Link, *vio.Link)
}
if p.Stats.UserDownlink {
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil {
if c, _ := feature_stats.GetOrRegisterCounter(d.stats, name); c != nil {
outboundLink.Writer = &stats.SizeStatWriter{
Counter: c,
Writer: outboundLink.Writer,

View File

@ -6,6 +6,7 @@ import (
grpc "google.golang.org/grpc"
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/features/inbound"
"v2ray.com/core/features/outbound"
"v2ray.com/core/proxy"
)
@ -13,7 +14,7 @@ import (
// InboundOperation is the interface for operations that applies to inbound handlers.
type InboundOperation interface {
// ApplyInbound applies this operation to the given inbound handler.
ApplyInbound(context.Context, core.InboundHandler) error
ApplyInbound(context.Context, inbound.Handler) error
}
// OutboundOperation is the interface for operations that applies to outbound handlers.
@ -22,7 +23,7 @@ type OutboundOperation interface {
ApplyOutbound(context.Context, outbound.Handler) error
}
func getInbound(handler core.InboundHandler) (proxy.Inbound, error) {
func getInbound(handler inbound.Handler) (proxy.Inbound, error) {
gi, ok := handler.(proxy.GetInbound)
if !ok {
return nil, newError("can't get inbound proxy from handler.")
@ -31,7 +32,7 @@ func getInbound(handler core.InboundHandler) (proxy.Inbound, error) {
}
// ApplyInbound implements InboundOperation.
func (op *AddUserOperation) ApplyInbound(ctx context.Context, handler core.InboundHandler) error {
func (op *AddUserOperation) ApplyInbound(ctx context.Context, handler inbound.Handler) error {
p, err := getInbound(handler)
if err != nil {
return err
@ -48,7 +49,7 @@ func (op *AddUserOperation) ApplyInbound(ctx context.Context, handler core.Inbou
}
// ApplyInbound implements InboundOperation.
func (op *RemoveUserOperation) ApplyInbound(ctx context.Context, handler core.InboundHandler) error {
func (op *RemoveUserOperation) ApplyInbound(ctx context.Context, handler inbound.Handler) error {
p, err := getInbound(handler)
if err != nil {
return err
@ -62,7 +63,7 @@ func (op *RemoveUserOperation) ApplyInbound(ctx context.Context, handler core.In
type handlerServer struct {
s *core.Instance
ihm core.InboundHandlerManager
ihm inbound.Manager
ohm outbound.HandlerManager
}
@ -71,7 +72,7 @@ func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundReque
if err != nil {
return nil, err
}
handler, ok := rawHandler.(core.InboundHandler)
handler, ok := rawHandler.(inbound.Handler)
if !ok {
return nil, newError("not an InboundHandler.")
}

View File

@ -10,26 +10,27 @@ import (
"v2ray.com/core/common/dice"
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/features/stats"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
)
func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) {
var uplinkCounter core.StatCounter
var downlinkCounter core.StatCounter
func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
var uplinkCounter stats.Counter
var downlinkCounter stats.Counter
policy := v.PolicyManager()
stats := v.Stats()
statsManager := v.Stats()
if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
name := "inbound>>>" + tag + ">>>traffic>>>uplink"
c, _ := core.GetOrRegisterStatCounter(stats, name)
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
uplinkCounter = c
}
}
if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
name := "inbound>>>" + tag + ">>>traffic>>>downlink"
c, _ := core.GetOrRegisterStatCounter(stats, name)
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
downlinkCounter = c
}

View File

@ -11,30 +11,31 @@ import (
"v2ray.com/core/common"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/session"
"v2ray.com/core/features/inbound"
)
// Manager is to manage all inbound handlers.
type Manager struct {
access sync.RWMutex
untaggedHandler []core.InboundHandler
taggedHandlers map[string]core.InboundHandler
untaggedHandler []inbound.Handler
taggedHandlers map[string]inbound.Handler
running bool
}
// New returns a new Manager for inbound handlers.
func New(ctx context.Context, config *proxyman.InboundConfig) (*Manager, error) {
m := &Manager{
taggedHandlers: make(map[string]core.InboundHandler),
taggedHandlers: make(map[string]inbound.Handler),
}
v := core.MustFromContext(ctx)
if err := v.RegisterFeature((*core.InboundHandlerManager)(nil), m); err != nil {
if err := v.RegisterFeature((*inbound.Manager)(nil), m); err != nil {
return nil, newError("unable to register InboundHandlerManager").Base(err)
}
return m, nil
}
// AddHandler implements core.InboundHandlerManager.
func (m *Manager) AddHandler(ctx context.Context, handler core.InboundHandler) error {
// AddHandler implements inbound.Manager.
func (m *Manager) AddHandler(ctx context.Context, handler inbound.Handler) error {
m.access.Lock()
defer m.access.Unlock()
@ -52,8 +53,8 @@ func (m *Manager) AddHandler(ctx context.Context, handler core.InboundHandler) e
return nil
}
// GetHandler implements core.InboundHandlerManager.
func (m *Manager) GetHandler(ctx context.Context, tag string) (core.InboundHandler, error) {
// GetHandler implements inbound.Manager.
func (m *Manager) GetHandler(ctx context.Context, tag string) (inbound.Handler, error) {
m.access.RLock()
defer m.access.RUnlock()
@ -64,7 +65,7 @@ func (m *Manager) GetHandler(ctx context.Context, tag string) (core.InboundHandl
return handler, nil
}
// RemoveHandler implements core.InboundHandlerManager.
// RemoveHandler implements inbound.Manager.
func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
if len(tag) == 0 {
return common.ErrNoClue
@ -131,8 +132,8 @@ func (m *Manager) Close() error {
return nil
}
// NewHandler creates a new core.InboundHandler based on the given config.
func NewHandler(ctx context.Context, config *core.InboundHandlerConfig) (core.InboundHandler, error) {
// NewHandler creates a new inbound.Handler based on the given config.
func NewHandler(ctx context.Context, config *core.InboundHandlerConfig) (inbound.Handler, error) {
rawReceiverSettings, err := config.ReceiverSettings.GetInstance()
if err != nil {
return nil, err

View File

@ -6,7 +6,6 @@ import (
"sync/atomic"
"time"
"v2ray.com/core"
"v2ray.com/core/app/proxyman"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
@ -16,6 +15,7 @@ import (
"v2ray.com/core/common/signal/done"
"v2ray.com/core/common/task"
"v2ray.com/core/features/routing"
"v2ray.com/core/features/stats"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tcp"
@ -39,8 +39,8 @@ type tcpWorker struct {
tag string
dispatcher routing.Dispatcher
sniffingConfig *proxyman.SniffingConfig
uplinkCounter core.StatCounter
downlinkCounter core.StatCounter
uplinkCounter stats.Counter
downlinkCounter stats.Counter
hub internet.Listener
}
@ -145,8 +145,8 @@ type udpConn struct {
remote net.Addr
local net.Addr
done *done.Instance
uplink core.StatCounter
downlink core.StatCounter
uplink stats.Counter
downlink stats.Counter
}
func (c *udpConn) updateActivity() {
@ -225,8 +225,8 @@ type udpWorker struct {
tag string
stream *internet.MemoryStreamConfig
dispatcher routing.Dispatcher
uplinkCounter core.StatCounter
downlinkCounter core.StatCounter
uplinkCounter stats.Counter
downlinkCounter stats.Counter
checker *task.Periodic
activeConn map[connID]*udpConn

View File

@ -11,14 +11,15 @@ import (
"v2ray.com/core/app/stats"
"v2ray.com/core/common"
"v2ray.com/core/common/strmatcher"
feature_stats "v2ray.com/core/features/stats"
)
// statsServer is an implementation of StatsService.
type statsServer struct {
stats core.StatManager
stats feature_stats.Manager
}
func NewStatsServer(manager core.StatManager) StatsServiceServer {
func NewStatsServer(manager feature_stats.Manager) StatsServiceServer {
return &statsServer{stats: manager}
}
@ -54,7 +55,7 @@ func (s *statsServer) QueryStats(ctx context.Context, request *QueryStatsRequest
return nil, newError("QueryStats only works its own stats.Manager.")
}
manager.Visit(func(name string, c core.StatCounter) bool {
manager.Visit(func(name string, c feature_stats.Counter) bool {
if matcher.Match(name) {
var value int64
if request.Reset_ {

View File

@ -8,29 +8,30 @@ import (
"sync/atomic"
"v2ray.com/core"
"v2ray.com/core/features/stats"
)
// Counter is an implementation of core.StatCounter.
// Counter is an implementation of stats.Counter.
type Counter struct {
value int64
}
// Value implements core.StatCounter.
// Value implements stats.Counter.
func (c *Counter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
// Set implements core.StatCounter.
// Set implements stats.Counter.
func (c *Counter) Set(newValue int64) int64 {
return atomic.SwapInt64(&c.value, newValue)
}
// Add implements core.StatCounter.
// Add implements stats.Counter.
func (c *Counter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}
// Manager is an implementation of core.StatManager.
// Manager is an implementation of stats.Manager.
type Manager struct {
access sync.RWMutex
counters map[string]*Counter
@ -43,7 +44,7 @@ func NewManager(ctx context.Context, config *Config) (*Manager, error) {
v := core.FromContext(ctx)
if v != nil {
if err := v.RegisterFeature((*core.StatManager)(nil), m); err != nil {
if err := v.RegisterFeature((*stats.Manager)(nil), m); err != nil {
return nil, newError("failed to register StatManager").Base(err)
}
}
@ -51,7 +52,7 @@ func NewManager(ctx context.Context, config *Config) (*Manager, error) {
return m, nil
}
func (m *Manager) RegisterCounter(name string) (core.StatCounter, error) {
func (m *Manager) RegisterCounter(name string) (stats.Counter, error) {
m.access.Lock()
defer m.access.Unlock()
@ -64,7 +65,7 @@ func (m *Manager) RegisterCounter(name string) (core.StatCounter, error) {
return c, nil
}
func (m *Manager) GetCounter(name string) core.StatCounter {
func (m *Manager) GetCounter(name string) stats.Counter {
m.access.RLock()
defer m.access.RUnlock()
@ -74,7 +75,7 @@ func (m *Manager) GetCounter(name string) core.StatCounter {
return nil
}
func (m *Manager) Visit(visitor func(string, core.StatCounter) bool) {
func (m *Manager) Visit(visitor func(string, stats.Counter) bool) {
m.access.RLock()
defer m.access.RUnlock()

View File

@ -4,16 +4,16 @@ import (
"context"
"testing"
"v2ray.com/core"
. "v2ray.com/core/app/stats"
"v2ray.com/core/common"
"v2ray.com/core/features/stats"
. "v2ray.com/ext/assert"
)
func TestInternface(t *testing.T) {
assert := With(t)
assert((*Manager)(nil), Implements, (*core.StatManager)(nil))
assert((*Manager)(nil), Implements, (*stats.Manager)(nil))
}
func TestStatsCounter(t *testing.T) {
@ -22,7 +22,7 @@ func TestStatsCounter(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
assert(err, IsNil)
m := raw.(core.StatManager)
m := raw.(stats.Manager)
c, err := m.RegisterCounter("test.counter")
assert(err, IsNil)

View File

@ -1,14 +1,14 @@
package stats
import (
"v2ray.com/core"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/features/stats"
"v2ray.com/core/transport/pipe"
)
type SizeStatWriter struct {
Counter core.StatCounter
Counter stats.Counter
Writer buf.Writer
}

View File

@ -0,0 +1,31 @@
package inbound
import (
"context"
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/features"
)
// Handler is the interface for handlers that process inbound connections.
type Handler interface {
common.Runnable
// The tag of this handler.
Tag() string
// Deprecated. Do not use in new code.
GetRandomInboundProxy() (interface{}, net.Port, int)
}
// Manager is a feature that manages InboundHandlers.
type Manager interface {
features.Feature
// GetHandlers returns an InboundHandler for the given tag.
GetHandler(ctx context.Context, tag string) (Handler, error)
// AddHandler adds the given handler into this InboundHandlerManager.
AddHandler(ctx context.Context, handler Handler) error
// RemoveHandler removes a handler from InboundHandlerManager.
RemoveHandler(ctx context.Context, tag string) error
}

26
features/stats/stats.go Normal file
View File

@ -0,0 +1,26 @@
package stats
import "v2ray.com/core/features"
type Counter interface {
Value() int64
Set(int64) int64
Add(int64) int64
}
type Manager interface {
features.Feature
RegisterCounter(string) (Counter, error)
GetCounter(string) Counter
}
// GetOrRegisterCounter tries to get the StatCounter first. If not exist, it then tries to create a new counter.
func GetOrRegisterCounter(m Manager, name string) (Counter, error) {
counter := m.GetCounter(name)
if counter != nil {
return counter, nil
}
return m.RegisterCounter(name)
}

View File

@ -5,78 +5,56 @@ import (
"sync"
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/features/inbound"
"v2ray.com/core/features/outbound"
)
// InboundHandler is the interface for handlers that process inbound connections.
type InboundHandler interface {
common.Runnable
// The tag of this handler.
Tag() string
// Deprecated. Do not use in new code.
GetRandomInboundProxy() (interface{}, net.Port, int)
}
// InboundHandlerManager is a feature that manages InboundHandlers.
type InboundHandlerManager interface {
Feature
// GetHandlers returns an InboundHandler for the given tag.
GetHandler(ctx context.Context, tag string) (InboundHandler, error)
// AddHandler adds the given handler into this InboundHandlerManager.
AddHandler(ctx context.Context, handler InboundHandler) error
// RemoveHandler removes a handler from InboundHandlerManager.
RemoveHandler(ctx context.Context, tag string) error
}
type syncInboundHandlerManager struct {
sync.RWMutex
InboundHandlerManager
inbound.Manager
}
func (m *syncInboundHandlerManager) GetHandler(ctx context.Context, tag string) (InboundHandler, error) {
func (m *syncInboundHandlerManager) GetHandler(ctx context.Context, tag string) (inbound.Handler, error) {
m.RLock()
defer m.RUnlock()
if m.InboundHandlerManager == nil {
return nil, newError("InboundHandlerManager not set.").AtError()
if m.Manager == nil {
return nil, newError("inbound.Manager not set.").AtError()
}
return m.InboundHandlerManager.GetHandler(ctx, tag)
return m.Manager.GetHandler(ctx, tag)
}
func (m *syncInboundHandlerManager) AddHandler(ctx context.Context, handler InboundHandler) error {
func (m *syncInboundHandlerManager) AddHandler(ctx context.Context, handler inbound.Handler) error {
m.RLock()
defer m.RUnlock()
if m.InboundHandlerManager == nil {
return newError("InboundHandlerManager not set.").AtError()
if m.Manager == nil {
return newError("inbound.Manager not set.").AtError()
}
return m.InboundHandlerManager.AddHandler(ctx, handler)
return m.Manager.AddHandler(ctx, handler)
}
func (m *syncInboundHandlerManager) Start() error {
m.RLock()
defer m.RUnlock()
if m.InboundHandlerManager == nil {
return newError("InboundHandlerManager not set.").AtError()
if m.Manager == nil {
return newError("inbound.Manager not set.").AtError()
}
return m.InboundHandlerManager.Start()
return m.Manager.Start()
}
func (m *syncInboundHandlerManager) Close() error {
m.RLock()
defer m.RUnlock()
return common.Close(m.InboundHandlerManager)
return common.Close(m.Manager)
}
func (m *syncInboundHandlerManager) Set(manager InboundHandlerManager) {
func (m *syncInboundHandlerManager) Set(manager inbound.Manager) {
if manager == nil {
return
}
@ -84,8 +62,8 @@ func (m *syncInboundHandlerManager) Set(manager InboundHandlerManager) {
m.Lock()
defer m.Unlock()
common.Close(m.InboundHandlerManager) // nolint: errcheck
m.InboundHandlerManager = manager
common.Close(m.Manager) // nolint: errcheck
m.Manager = manager
}
type syncOutboundHandlerManager struct {

View File

@ -20,6 +20,7 @@ import (
"v2ray.com/core/common/signal"
"v2ray.com/core/common/task"
"v2ray.com/core/common/uuid"
feature_inbound "v2ray.com/core/features/inbound"
"v2ray.com/core/features/routing"
"v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/encoding"
@ -100,7 +101,7 @@ func (v *userByEmail) Remove(email string) bool {
// Handler is an inbound connection handler that handles messages in VMess protocol.
type Handler struct {
policyManager core.PolicyManager
inboundHandlerManager core.InboundHandlerManager
inboundHandlerManager feature_inbound.Manager
clients *vmess.TimedUserValidator
usersByEmail *userByEmail
detours *DetourConfig

View File

@ -2,86 +2,65 @@ package core
import (
"sync"
"v2ray.com/core/features/stats"
)
type StatCounter interface {
Value() int64
Set(int64) int64
Add(int64) int64
}
type StatManager interface {
Feature
RegisterCounter(string) (StatCounter, error)
GetCounter(string) StatCounter
}
// GetOrRegisterStatCounter tries to get the StatCounter first. If not exist, it then tries to create a new counter.
func GetOrRegisterStatCounter(m StatManager, name string) (StatCounter, error) {
counter := m.GetCounter(name)
if counter != nil {
return counter, nil
}
return m.RegisterCounter(name)
}
type syncStatManager struct {
sync.RWMutex
StatManager
stats.Manager
}
func (s *syncStatManager) Start() error {
s.RLock()
defer s.RUnlock()
if s.StatManager == nil {
if s.Manager == nil {
return nil
}
return s.StatManager.Start()
return s.Manager.Start()
}
func (s *syncStatManager) Close() error {
s.RLock()
defer s.RUnlock()
if s.StatManager == nil {
if s.Manager == nil {
return nil
}
return s.StatManager.Close()
return s.Manager.Close()
}
func (s *syncStatManager) RegisterCounter(name string) (StatCounter, error) {
func (s *syncStatManager) RegisterCounter(name string) (stats.Counter, error) {
s.RLock()
defer s.RUnlock()
if s.StatManager == nil {
if s.Manager == nil {
return nil, newError("StatManager not set.")
}
return s.StatManager.RegisterCounter(name)
return s.Manager.RegisterCounter(name)
}
func (s *syncStatManager) GetCounter(name string) StatCounter {
func (s *syncStatManager) GetCounter(name string) stats.Counter {
s.RLock()
defer s.RUnlock()
if s.StatManager == nil {
if s.Manager == nil {
return nil
}
return s.StatManager.GetCounter(name)
return s.Manager.GetCounter(name)
}
func (s *syncStatManager) Set(m StatManager) {
func (s *syncStatManager) Set(m stats.Manager) {
if m == nil {
return
}
s.Lock()
defer s.Unlock()
if s.StatManager != nil {
s.StatManager.Close() // nolint: errcheck
if s.Manager != nil {
s.Manager.Close() // nolint: errcheck
}
s.StatManager = m
s.Manager = m
}

View File

@ -7,8 +7,10 @@ import (
"v2ray.com/core/common"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/uuid"
"v2ray.com/core/features/inbound"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/routing"
"v2ray.com/core/features/stats"
)
// Server is an instance of V2Ray. At any time, there must be at most one Server instance running.
@ -64,12 +66,12 @@ func New(config *Config) (*Instance, error) {
}
}
for _, inbound := range config.Inbound {
rawHandler, err := CreateObject(server, inbound)
for _, inboundConfig := range config.Inbound {
rawHandler, err := CreateObject(server, inboundConfig)
if err != nil {
return nil, err
}
handler, ok := rawHandler.(InboundHandler)
handler, ok := rawHandler.(inbound.Handler)
if !ok {
return nil, newError("not an InboundHandler")
}
@ -153,12 +155,12 @@ func (s *Instance) RegisterFeature(feature interface{}, instance Feature) error
s.router.Set(instance.(routing.Router))
case routing.Dispatcher, *routing.Dispatcher:
s.dispatcher.Set(instance.(routing.Dispatcher))
case InboundHandlerManager, *InboundHandlerManager:
s.ihm.Set(instance.(InboundHandlerManager))
case inbound.Manager, *inbound.Manager:
s.ihm.Set(instance.(inbound.Manager))
case outbound.HandlerManager, *outbound.HandlerManager:
s.ohm.Set(instance.(outbound.HandlerManager))
case StatManager, *StatManager:
s.stats.Set(instance.(StatManager))
case stats.Manager, *stats.Manager:
s.stats.Set(instance.(stats.Manager))
default:
s.access.Lock()
s.features = append(s.features, instance)
@ -210,7 +212,7 @@ func (s *Instance) Dispatcher() routing.Dispatcher {
}
// InboundHandlerManager returns the InboundHandlerManager used by this Instance. If InboundHandlerManager was not registered before, the returned value doesn't work.
func (s *Instance) InboundHandlerManager() InboundHandlerManager {
func (s *Instance) InboundHandlerManager() inbound.Manager {
return &(s.ihm)
}
@ -219,7 +221,7 @@ func (s *Instance) OutboundHandlerManager() outbound.HandlerManager {
return &(s.ohm)
}
// Stats returns the StatManager used by this Instance. If StatManager was not registered before, the returned value doesn't work.
func (s *Instance) Stats() StatManager {
// Stats returns the stats.Manager used by this Instance. If StatManager was not registered before, the returned value doesn't work.
func (s *Instance) Stats() stats.Manager {
return &(s.stats)
}