v2fly/app/proxyman/inbound/inbound.go

179 lines
4.6 KiB
Go
Raw Normal View History

package inbound
2021-02-16 20:31:50 +00:00
//go:generate go run github.com/v2fly/v2ray-core/v4/common/errors/errorgen
2017-04-08 23:43:25 +00:00
import (
"context"
2018-02-05 22:38:24 +00:00
"sync"
2021-02-16 20:31:50 +00:00
core "github.com/v2fly/v2ray-core/v4"
"github.com/v2fly/v2ray-core/v4/app/proxyman"
"github.com/v2fly/v2ray-core/v4/common"
"github.com/v2fly/v2ray-core/v4/common/serial"
"github.com/v2fly/v2ray-core/v4/common/session"
"github.com/v2fly/v2ray-core/v4/features/inbound"
)
2017-04-09 12:49:40 +00:00
// Manager is to manage all inbound handlers.
type Manager struct {
2018-02-07 11:34:15 +00:00
access sync.RWMutex
untaggedHandler []inbound.Handler
taggedHandlers map[string]inbound.Handler
2018-02-07 11:34:15 +00:00
running bool
}
2018-02-08 21:09:55 +00:00
// New returns a new Manager for inbound handlers.
2017-04-09 12:49:40 +00:00
func New(ctx context.Context, config *proxyman.InboundConfig) (*Manager, error) {
m := &Manager{
taggedHandlers: make(map[string]inbound.Handler),
}
return m, nil
}
2018-10-12 21:57:56 +00:00
// Type implements common.HasType.
func (*Manager) Type() interface{} {
return inbound.ManagerType()
}
// AddHandler implements inbound.Manager.
func (m *Manager) AddHandler(ctx context.Context, handler inbound.Handler) error {
2018-02-07 11:34:15 +00:00
m.access.Lock()
defer m.access.Unlock()
2018-02-05 22:38:24 +00:00
tag := handler.Tag()
if len(tag) > 0 {
m.taggedHandlers[tag] = handler
2018-02-05 22:38:24 +00:00
} else {
m.untaggedHandler = append(m.untaggedHandler, handler)
}
2018-02-07 11:34:15 +00:00
if m.running {
return handler.Start()
}
return nil
}
// GetHandler implements inbound.Manager.
func (m *Manager) GetHandler(ctx context.Context, tag string) (inbound.Handler, error) {
2018-02-07 11:34:15 +00:00
m.access.RLock()
defer m.access.RUnlock()
2018-02-05 22:38:24 +00:00
handler, found := m.taggedHandlers[tag]
if !found {
2017-04-08 23:43:25 +00:00
return nil, newError("handler not found: ", tag)
}
return handler, nil
}
// RemoveHandler implements inbound.Manager.
2018-02-05 22:38:24 +00:00
func (m *Manager) RemoveHandler(ctx context.Context, tag string) error {
if tag == "" {
return common.ErrNoClue
2018-02-05 22:38:24 +00:00
}
2018-02-07 11:34:15 +00:00
m.access.Lock()
defer m.access.Unlock()
2018-02-05 22:38:24 +00:00
if handler, found := m.taggedHandlers[tag]; found {
2018-04-04 19:32:40 +00:00
if err := handler.Close(); err != nil {
newError("failed to close handler ", tag).Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
2018-04-04 19:32:40 +00:00
}
2018-02-05 22:38:24 +00:00
delete(m.taggedHandlers, tag)
return nil
}
return common.ErrNoClue
2018-02-05 22:38:24 +00:00
}
2018-04-03 22:57:44 +00:00
// Start implements common.Runnable.
2017-04-09 12:49:40 +00:00
func (m *Manager) Start() error {
2018-02-07 11:34:15 +00:00
m.access.Lock()
defer m.access.Unlock()
m.running = true
2018-02-05 22:38:24 +00:00
for _, handler := range m.taggedHandlers {
if err := handler.Start(); err != nil {
return err
}
}
for _, handler := range m.untaggedHandler {
if err := handler.Start(); err != nil {
return err
}
}
return nil
}
2018-04-03 22:57:44 +00:00
// Close implements common.Closable.
2018-02-08 14:39:46 +00:00
func (m *Manager) Close() error {
2018-02-07 11:34:15 +00:00
m.access.Lock()
defer m.access.Unlock()
m.running = false
2018-05-31 09:55:11 +00:00
var errors []interface{}
2018-02-05 22:38:24 +00:00
for _, handler := range m.taggedHandlers {
2018-05-31 09:55:11 +00:00
if err := handler.Close(); err != nil {
errors = append(errors, err)
}
2018-02-05 22:38:24 +00:00
}
for _, handler := range m.untaggedHandler {
2018-05-31 09:55:11 +00:00
if err := handler.Close(); err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return newError("failed to close all handlers").Base(newError(serial.Concat(errors...)))
}
2018-02-08 14:39:46 +00:00
return nil
}
// NewHandler creates a new inbound.Handler based on the given config.
func NewHandler(ctx context.Context, config *core.InboundHandlerConfig) (inbound.Handler, error) {
2021-06-19 13:36:54 +00:00
rawReceiverSettings, err := serial.GetInstanceOf(config.ReceiverSettings)
if err != nil {
return nil, err
}
2021-06-19 13:36:54 +00:00
proxySettings, err := serial.GetInstanceOf(config.ProxySettings)
if err != nil {
return nil, err
}
tag := config.Tag
2018-04-04 09:12:09 +00:00
receiverSettings, ok := rawReceiverSettings.(*proxyman.ReceiverConfig)
if !ok {
return nil, newError("not a ReceiverConfig").AtError()
}
streamSettings := receiverSettings.StreamSettings
if streamSettings != nil && streamSettings.SocketSettings != nil {
ctx = session.ContextWithSockopt(ctx, &session.Sockopt{
Mark: streamSettings.SocketSettings.Mark,
})
}
allocStrategy := receiverSettings.AllocationStrategy
if allocStrategy == nil || allocStrategy.Type == proxyman.AllocationStrategy_Always {
return NewAlwaysOnInboundHandler(ctx, tag, receiverSettings, proxySettings)
}
if allocStrategy.Type == proxyman.AllocationStrategy_Random {
return NewDynamicInboundHandler(ctx, tag, receiverSettings, proxySettings)
}
return nil, newError("unknown allocation strategy: ", receiverSettings.AllocationStrategy.Type).AtError()
}
func init() {
common.Must(common.RegisterConfig((*proxyman.InboundConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return New(ctx, config.(*proxyman.InboundConfig))
}))
common.Must(common.RegisterConfig((*core.InboundHandlerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
return NewHandler(ctx, config.(*core.InboundHandlerConfig))
}))
}