1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2025-01-20 16:26:23 -05:00
v2fly/app/proxyman/inbound/dynamic.go

202 lines
5.2 KiB
Go
Raw Normal View History

package inbound
import (
"context"
"sync"
"time"
2018-02-08 09:39:46 -05:00
"v2ray.com/core"
"v2ray.com/core/app/proxyman"
"v2ray.com/core/common/dice"
2018-10-23 06:21:12 -04:00
"v2ray.com/core/common/mux"
"v2ray.com/core/common/net"
2018-05-27 07:02:29 -04:00
"v2ray.com/core/common/task"
"v2ray.com/core/proxy"
2018-09-07 08:50:25 -04:00
"v2ray.com/core/transport/internet"
)
type DynamicInboundHandler struct {
tag string
2018-02-08 09:39:46 -05:00
v *core.Instance
proxyConfig interface{}
receiverConfig *proxyman.ReceiverConfig
2018-09-07 08:50:25 -04:00
streamSettings *internet.MemoryStreamConfig
2017-01-30 15:35:34 -05:00
portMutex sync.Mutex
portsInUse map[net.Port]bool
2017-01-30 15:35:34 -05:00
workerMutex sync.RWMutex
worker []worker
lastRefresh time.Time
2017-04-02 08:06:20 -04:00
mux *mux.Server
2018-05-27 07:02:29 -04:00
task *task.Periodic
2020-06-18 00:37:10 -04:00
ctx context.Context
}
func NewDynamicInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*DynamicInboundHandler, error) {
2018-02-21 11:05:29 -05:00
v := core.MustFromContext(ctx)
h := &DynamicInboundHandler{
tag: tag,
proxyConfig: proxyConfig,
receiverConfig: receiverConfig,
portsInUse: make(map[net.Port]bool),
2017-04-02 08:06:20 -04:00
mux: mux.NewServer(ctx),
2018-02-08 09:39:46 -05:00
v: v,
2020-06-18 00:37:10 -04:00
ctx: ctx,
2018-02-08 09:39:46 -05:00
}
2018-09-07 08:50:25 -04:00
mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
if err != nil {
return nil, newError("failed to parse stream settings").Base(err).AtWarning()
}
2018-09-17 09:12:58 -04:00
if receiverConfig.ReceiveOriginalDestination {
if mss.SocketSettings == nil {
mss.SocketSettings = &internet.SocketConfig{}
}
if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
}
mss.SocketSettings.ReceiveOriginalDestAddress = true
}
2018-09-07 08:50:25 -04:00
h.streamSettings = mss
2018-05-27 07:02:29 -04:00
h.task = &task.Periodic{
2018-02-08 09:39:46 -05:00
Interval: time.Minute * time.Duration(h.receiverConfig.AllocationStrategy.GetRefreshValue()),
Execute: h.refresh,
}
return h, nil
}
func (h *DynamicInboundHandler) allocatePort() net.Port {
from := int(h.receiverConfig.PortRange.From)
delta := int(h.receiverConfig.PortRange.To) - from + 1
2017-01-30 15:35:34 -05:00
h.portMutex.Lock()
defer h.portMutex.Unlock()
for {
r := dice.Roll(delta)
port := net.Port(from + r)
_, used := h.portsInUse[port]
if !used {
h.portsInUse[port] = true
return port
}
}
}
2018-02-08 09:39:46 -05:00
func (h *DynamicInboundHandler) closeWorkers(workers []worker) {
ports2Del := make([]net.Port, len(workers))
2017-01-30 15:35:34 -05:00
for idx, worker := range workers {
ports2Del[idx] = worker.Port()
2018-05-31 05:55:11 -04:00
if err := worker.Close(); err != nil {
newError("failed to close worker").Base(err).WriteToLog()
}
}
2017-01-30 15:35:34 -05:00
h.portMutex.Lock()
for _, port := range ports2Del {
delete(h.portsInUse, port)
}
2017-01-30 15:35:34 -05:00
h.portMutex.Unlock()
}
func (h *DynamicInboundHandler) refresh() error {
h.lastRefresh = time.Now()
2017-01-30 15:37:50 -05:00
timeout := time.Minute * time.Duration(h.receiverConfig.AllocationStrategy.GetRefreshValue()) * 2
concurrency := h.receiverConfig.AllocationStrategy.GetConcurrencyValue()
workers := make([]worker, 0, concurrency)
2017-01-27 17:52:29 -05:00
address := h.receiverConfig.Listen.AsAddress()
if address == nil {
address = net.AnyIP
2017-01-27 17:52:29 -05:00
}
2018-04-11 18:10:14 -04:00
uplinkCounter, downlinkCounter := getStatCounter(h.v, h.tag)
2017-01-30 15:37:50 -05:00
for i := uint32(0); i < concurrency; i++ {
port := h.allocatePort()
2018-05-31 05:55:11 -04:00
rawProxy, err := core.CreateObject(h.v, h.proxyConfig)
if err != nil {
2017-12-19 15:28:12 -05:00
newError("failed to create proxy instance").Base(err).AtWarning().WriteToLog()
continue
}
2018-02-08 09:39:46 -05:00
p := rawProxy.(proxy.Inbound)
nl := p.Network()
2018-11-20 10:58:26 -05:00
if net.HasNetwork(nl, net.Network_TCP) {
worker := &tcpWorker{
2018-04-11 18:10:14 -04:00
tag: h.tag,
address: address,
port: port,
proxy: p,
2018-09-07 08:50:25 -04:00
stream: h.streamSettings,
2018-04-11 18:10:14 -04:00
recvOrigDest: h.receiverConfig.ReceiveOriginalDestination,
dispatcher: h.mux,
2018-07-16 07:47:00 -04:00
sniffingConfig: h.receiverConfig.GetEffectiveSniffingSettings(),
2018-04-11 18:10:14 -04:00
uplinkCounter: uplinkCounter,
downlinkCounter: downlinkCounter,
2020-06-18 00:37:10 -04:00
ctx: h.ctx,
}
if err := worker.Start(); err != nil {
2017-12-19 15:28:12 -05:00
newError("failed to create TCP worker").Base(err).AtWarning().WriteToLog()
2017-01-30 15:35:34 -05:00
continue
}
2017-01-30 15:35:34 -05:00
workers = append(workers, worker)
}
2018-11-20 10:58:26 -05:00
if net.HasNetwork(nl, net.Network_UDP) {
worker := &udpWorker{
2018-04-11 18:10:14 -04:00
tag: h.tag,
proxy: p,
address: address,
port: port,
dispatcher: h.mux,
uplinkCounter: uplinkCounter,
downlinkCounter: downlinkCounter,
2018-09-17 09:12:58 -04:00
stream: h.streamSettings,
}
if err := worker.Start(); err != nil {
2017-12-19 15:28:12 -05:00
newError("failed to create UDP worker").Base(err).AtWarning().WriteToLog()
2017-01-30 15:35:34 -05:00
continue
}
2017-01-30 15:35:34 -05:00
workers = append(workers, worker)
}
}
2017-01-30 15:35:34 -05:00
h.workerMutex.Lock()
h.worker = workers
h.workerMutex.Unlock()
2018-02-08 09:39:46 -05:00
time.AfterFunc(timeout, func() {
h.closeWorkers(workers)
})
2017-01-30 15:35:34 -05:00
return nil
}
func (h *DynamicInboundHandler) Start() error {
2018-02-08 09:39:46 -05:00
return h.task.Start()
}
2018-02-08 09:39:46 -05:00
func (h *DynamicInboundHandler) Close() error {
return h.task.Close()
}
func (h *DynamicInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
2017-01-30 15:35:34 -05:00
h.workerMutex.RLock()
defer h.workerMutex.RUnlock()
if len(h.worker) == 0 {
return nil, 0, 0
}
w := h.worker[dice.Roll(len(h.worker))]
expire := h.receiverConfig.AllocationStrategy.GetRefreshValue() - uint32(time.Since(h.lastRefresh)/time.Minute)
2017-01-30 15:35:34 -05:00
return w.Proxy(), w.Port(), int(expire)
}
func (h *DynamicInboundHandler) Tag() string {
return h.tag
}