1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-12-22 18:17:52 -05:00
v2fly/app/proxyman/inbound/always.go

167 lines
4.3 KiB
Go
Raw Normal View History

package inbound
import (
"context"
2018-04-11 18:10:14 -04:00
"v2ray.com/core"
"v2ray.com/core/app/proxyman"
2018-02-09 17:07:38 -05:00
"v2ray.com/core/common"
"v2ray.com/core/common/dice"
2018-12-04 08:17:08 -05:00
"v2ray.com/core/common/errors"
2018-10-23 06:21:12 -04:00
"v2ray.com/core/common/mux"
"v2ray.com/core/common/net"
2018-10-21 04:27:13 -04:00
"v2ray.com/core/features/policy"
"v2ray.com/core/features/stats"
"v2ray.com/core/proxy"
2018-09-07 08:50:25 -04:00
"v2ray.com/core/transport/internet"
)
func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
var uplinkCounter stats.Counter
var downlinkCounter stats.Counter
2018-04-11 18:10:14 -04:00
2018-10-21 04:27:13 -04:00
policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
2018-04-11 18:10:14 -04:00
if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
2018-10-21 04:27:13 -04:00
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
2018-04-11 18:10:14 -04:00
name := "inbound>>>" + tag + ">>>traffic>>>uplink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
2018-04-11 18:10:14 -04:00
if c != nil {
uplinkCounter = c
}
}
if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
2018-10-21 04:27:13 -04:00
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
2018-04-11 18:10:14 -04:00
name := "inbound>>>" + tag + ">>>traffic>>>downlink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
2018-04-11 18:10:14 -04:00
if c != nil {
downlinkCounter = c
}
}
return uplinkCounter, downlinkCounter
}
type AlwaysOnInboundHandler struct {
2017-02-03 16:50:01 -05:00
proxy proxy.Inbound
workers []worker
2017-04-02 08:06:20 -04:00
mux *mux.Server
tag string
}
func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
2018-02-09 17:07:38 -05:00
rawProxy, err := common.CreateObject(ctx, proxyConfig)
if err != nil {
return nil, err
}
2018-02-09 17:07:38 -05:00
p, ok := rawProxy.(proxy.Inbound)
if !ok {
return nil, newError("not an inbound proxy.")
}
h := &AlwaysOnInboundHandler{
proxy: p,
2017-04-02 08:06:20 -04:00
mux: mux.NewServer(ctx),
tag: tag,
}
2018-04-11 18:10:14 -04:00
uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
nl := p.Network()
pr := receiverConfig.PortRange
2017-01-27 17:52:29 -05:00
address := receiverConfig.Listen.AsAddress()
if address == nil {
address = net.AnyIP
}
2018-09-17 09:12:58 -04:00
mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
if err != nil {
return nil, newError("failed to parse stream config").Base(err).AtWarning()
}
if receiverConfig.ReceiveOriginalDestination {
if mss.SocketSettings == nil {
mss.SocketSettings = &internet.SocketConfig{}
}
if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
}
mss.SocketSettings.ReceiveOriginalDestAddress = true
}
for port := pr.From; port <= pr.To; port++ {
2018-11-20 10:58:26 -05:00
if net.HasNetwork(nl, net.Network_TCP) {
2017-12-27 16:25:12 -05:00
newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
2018-09-07 08:50:25 -04:00
worker := &tcpWorker{
2018-04-11 18:10:14 -04:00
address: address,
port: net.Port(port),
proxy: p,
2018-09-07 08:50:25 -04:00
stream: mss,
2018-04-11 18:10:14 -04:00
recvOrigDest: receiverConfig.ReceiveOriginalDestination,
tag: tag,
dispatcher: h.mux,
2018-07-16 07:47:00 -04:00
sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
2018-04-11 18:10:14 -04:00
uplinkCounter: uplinkCounter,
downlinkCounter: downlinkCounter,
2020-06-18 00:37:10 -04:00
ctx: ctx,
}
h.workers = append(h.workers, worker)
}
2018-11-20 10:58:26 -05:00
if net.HasNetwork(nl, net.Network_UDP) {
worker := &udpWorker{
2018-04-11 18:10:14 -04:00
tag: tag,
proxy: p,
address: address,
port: net.Port(port),
dispatcher: h.mux,
uplinkCounter: uplinkCounter,
downlinkCounter: downlinkCounter,
2018-09-17 09:12:58 -04:00
stream: mss,
}
h.workers = append(h.workers, worker)
}
}
return h, nil
}
2018-06-04 08:29:43 -04:00
// Start implements common.Runnable.
func (h *AlwaysOnInboundHandler) Start() error {
for _, worker := range h.workers {
if err := worker.Start(); err != nil {
return err
}
}
return nil
}
2018-06-04 08:29:43 -04:00
// Close implements common.Closable.
2018-02-08 09:39:46 -05:00
func (h *AlwaysOnInboundHandler) Close() error {
2018-12-04 08:17:08 -05:00
var errs []error
for _, worker := range h.workers {
2018-12-04 08:17:08 -05:00
errs = append(errs, worker.Close())
2018-05-31 05:55:11 -04:00
}
2018-12-04 08:17:08 -05:00
errs = append(errs, h.mux.Close())
if err := errors.Combine(errs...); err != nil {
return newError("failed to close all resources").Base(err)
}
2018-02-08 09:39:46 -05:00
return nil
}
func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
if len(h.workers) == 0 {
return nil, 0, 0
}
w := h.workers[dice.Roll(len(h.workers))]
return w.Proxy(), w.Port(), 9999
}
func (h *AlwaysOnInboundHandler) Tag() string {
return h.tag
}
2018-02-05 17:38:24 -05:00
func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
return h.proxy
}