From 715ac9d267c7d8b64a209abb6cc1b56b1faa6f2f Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Sun, 28 Oct 2018 07:27:07 +0100 Subject: [PATCH] test case for reverse proxy --- app/reverse/bridge.go | 2 + app/reverse/portal.go | 32 +++-- app/reverse/reverse.go | 42 +++++- common/mux/client.go | 4 + testing/scenarios/reverse_test.go | 211 ++++++++++++++++++++++++++++++ 5 files changed, 280 insertions(+), 11 deletions(-) create mode 100644 testing/scenarios/reverse_test.go diff --git a/app/reverse/bridge.go b/app/reverse/bridge.go index 077a3ff86..f18503dfb 100644 --- a/app/reverse/bridge.go +++ b/app/reverse/bridge.go @@ -32,6 +32,8 @@ func NewBridge(config *BridgeConfig, dispatcher routing.Dispatcher) (*Bridge, er b := &Bridge{ dispatcher: dispatcher, + tag: config.Tag, + domain: config.Domain, } b.monitorTask = &task.Periodic{ Execute: b.monitor, diff --git a/app/reverse/portal.go b/app/reverse/portal.go index 12baece8a..99f76955f 100644 --- a/app/reverse/portal.go +++ b/app/reverse/portal.go @@ -8,8 +8,8 @@ import ( "github.com/golang/protobuf/proto" "v2ray.com/core/common" "v2ray.com/core/common/buf" - "v2ray.com/core/common/dice" "v2ray.com/core/common/mux" + "v2ray.com/core/common/net" "v2ray.com/core/common/session" "v2ray.com/core/common/task" "v2ray.com/core/common/vio" @@ -53,6 +53,7 @@ func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) { func (p *Portal) Start() error { return p.ohm.AddHandler(context.Background(), &Outbound{ portal: p, + tag: p.tag, }) } @@ -66,7 +67,7 @@ func (s *Portal) HandleConnection(ctx context.Context, link *vio.Link) error { return newError("outbound metadata not found").AtError() } - if isInternalDomain(outboundMeta.Target) { + if isDomain(outboundMeta.Target, s.domain) { muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{ MaxConcurrency: 0, MaxConnection: 256, @@ -149,17 +150,24 @@ func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) { p.access.Lock() defer p.access.Unlock() - n := len(p.workers) - if n == 0 { + if len(p.workers) == 0 { return nil, newError("empty worker list") } - idx := dice.Roll(n) - for i := 0; i < n; i++ { - w := p.workers[(i+idx)%n] - if !w.IsFull() { - return w.client, nil + var minIdx int = -1 + var minConn uint32 = 9999 + for i, w := range p.workers { + if w.IsFull() { + continue } + if w.client.ActiveConnections() < minConn { + minConn = w.client.ActiveConnections() + minIdx = i + } + } + + if minIdx != -1 { + return p.workers[minIdx].client, nil } return nil, newError("no mux client worker available") @@ -184,7 +192,11 @@ func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) { uplinkReader, uplinkWriter := pipe.New(opt...) downlinkReader, downlinkWriter := pipe.New(opt...) - f := client.Dispatch(context.Background(), &vio.Link{ + ctx := context.Background() + ctx = session.ContextWithOutbound(ctx, &session.Outbound{ + Target: net.UDPDestination(net.DomainAddress(internalDomain), 0), + }) + f := client.Dispatch(ctx, &vio.Link{ Reader: uplinkReader, Writer: downlinkWriter, }) diff --git a/app/reverse/reverse.go b/app/reverse/reverse.go index 194ccae14..ed3b41fbe 100644 --- a/app/reverse/reverse.go +++ b/app/reverse/reverse.go @@ -16,8 +16,12 @@ const ( internalDomain = "reverse.internal.v2ray.com" ) +func isDomain(dest net.Destination, domain string) bool { + return dest.Address.Family().IsDomain() && dest.Address.Domain() == domain +} + func isInternalDomain(dest net.Destination) bool { - return dest.Address.Family().IsDomain() && dest.Address.Domain() == internalDomain + return isDomain(dest, internalDomain) } func init() { @@ -56,3 +60,39 @@ func (r *Reverse) Init(config *Config, d routing.Dispatcher, ohm outbound.Manage return nil } + +func (r *Reverse) Type() interface{} { + return (*Reverse)(nil) +} + +func (r *Reverse) Start() error { + for _, b := range r.bridges { + if err := b.Start(); err != nil { + return err + } + } + + for _, p := range r.portals { + if err := p.Start(); err != nil { + return err + } + } + + return nil +} + +func (r *Reverse) Close() error { + for _, b := range r.bridges { + if err := b.Close(); err != nil { + return err + } + } + + for _, p := range r.portals { + if err := p.Close(); err != nil { + return err + } + } + + return nil +} diff --git a/common/mux/client.go b/common/mux/client.go index 06904b234..ecb7ddf8f 100644 --- a/common/mux/client.go +++ b/common/mux/client.go @@ -190,6 +190,10 @@ func NewClientWorker(stream vio.Link, s ClientStrategy) (*ClientWorker, error) { return c, nil } +func (m *ClientWorker) ActiveConnections() uint32 { + return uint32(m.sessionManager.Size()) +} + // Closed returns true if this Client is closed. func (m *ClientWorker) Closed() bool { return m.done.Done() diff --git a/testing/scenarios/reverse_test.go b/testing/scenarios/reverse_test.go new file mode 100644 index 000000000..7bdd7ed57 --- /dev/null +++ b/testing/scenarios/reverse_test.go @@ -0,0 +1,211 @@ +package scenarios + +import ( + "crypto/rand" + "testing" + "time" + + "v2ray.com/core/app/reverse" + "v2ray.com/core/app/router" + + "v2ray.com/core" + "v2ray.com/core/app/proxyman" + "v2ray.com/core/common" + "v2ray.com/core/common/compare" + "v2ray.com/core/common/net" + "v2ray.com/core/common/protocol" + "v2ray.com/core/common/serial" + "v2ray.com/core/common/uuid" + "v2ray.com/core/proxy/blackhole" + "v2ray.com/core/proxy/dokodemo" + "v2ray.com/core/proxy/freedom" + "v2ray.com/core/proxy/vmess" + "v2ray.com/core/proxy/vmess/inbound" + "v2ray.com/core/proxy/vmess/outbound" + "v2ray.com/core/testing/servers/tcp" +) + +func TestReverseProxy(t *testing.T) { + tcpServer := tcp.Server{ + MsgProcessor: xor, + } + dest, err := tcpServer.Start() + common.Must(err) + + defer tcpServer.Close() + + userID := protocol.NewID(uuid.New()) + externalPort := tcp.PickPort() + reversePort := tcp.PickPort() + + serverConfig := &core.Config{ + App: []*serial.TypedMessage{ + serial.ToTypedMessage(&reverse.Config{ + PortalConfig: []*reverse.PortalConfig{ + { + Tag: "portal", + Domain: "test.v2ray.com", + }, + }, + }), + serial.ToTypedMessage(&router.Config{ + Rule: []*router.RoutingRule{ + { + Domain: []*router.Domain{ + {Type: router.Domain_Full, Value: "test.v2ray.com"}, + }, + Tag: "portal", + }, + { + InboundTag: []string{"external"}, + Tag: "portal", + }, + }, + }), + }, + Inbound: []*core.InboundHandlerConfig{ + { + Tag: "external", + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortRange: net.SinglePortRange(externalPort), + Listen: net.NewIPOrDomain(net.LocalHostIP), + }), + ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ + Address: net.NewIPOrDomain(dest.Address), + Port: uint32(dest.Port), + NetworkList: &net.NetworkList{ + Network: []net.Network{net.Network_TCP}, + }, + }), + }, + { + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortRange: net.SinglePortRange(reversePort), + Listen: net.NewIPOrDomain(net.LocalHostIP), + }), + ProxySettings: serial.ToTypedMessage(&inbound.Config{ + User: []*protocol.User{ + { + Account: serial.ToTypedMessage(&vmess.Account{ + Id: userID.String(), + AlterId: 64, + }), + }, + }, + }), + }, + }, + Outbound: []*core.OutboundHandlerConfig{ + { + ProxySettings: serial.ToTypedMessage(&blackhole.Config{}), + }, + }, + } + + clientPort := tcp.PickPort() + clientConfig := &core.Config{ + App: []*serial.TypedMessage{ + serial.ToTypedMessage(&reverse.Config{ + BridgeConfig: []*reverse.BridgeConfig{ + { + Tag: "bridge", + Domain: "test.v2ray.com", + }, + }, + }), + serial.ToTypedMessage(&router.Config{ + Rule: []*router.RoutingRule{ + { + Domain: []*router.Domain{ + {Type: router.Domain_Full, Value: "test.v2ray.com"}, + }, + Tag: "reverse", + }, + { + InboundTag: []string{"bridge"}, + Tag: "freedom", + }, + }, + }), + }, + Inbound: []*core.InboundHandlerConfig{ + { + ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{ + PortRange: net.SinglePortRange(clientPort), + Listen: net.NewIPOrDomain(net.LocalHostIP), + }), + ProxySettings: serial.ToTypedMessage(&dokodemo.Config{ + Address: net.NewIPOrDomain(dest.Address), + Port: uint32(dest.Port), + NetworkList: &net.NetworkList{ + Network: []net.Network{net.Network_TCP}, + }, + }), + }, + }, + Outbound: []*core.OutboundHandlerConfig{ + { + Tag: "freedom", + ProxySettings: serial.ToTypedMessage(&freedom.Config{}), + }, + { + Tag: "reverse", + ProxySettings: serial.ToTypedMessage(&outbound.Config{ + Receiver: []*protocol.ServerEndpoint{ + { + Address: net.NewIPOrDomain(net.LocalHostIP), + Port: uint32(reversePort), + User: []*protocol.User{ + { + Account: serial.ToTypedMessage(&vmess.Account{ + Id: userID.String(), + AlterId: 64, + SecuritySettings: &protocol.SecurityConfig{ + Type: protocol.SecurityType_AES128_GCM, + }, + }), + }, + }, + }, + }, + }), + }, + }, + } + + servers, err := InitializeServerConfigs(serverConfig, clientConfig) + common.Must(err) + + defer CloseAllServers(servers) + + //var wg sync.WaitGroup + //wg.Add(10) + //for i := 0; i < 10; i++ { + //go func() { + //defer wg.Done() + + conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: int(externalPort), + }) + common.Must(err) + defer conn.Close() + + payload := make([]byte, 10240*1024) + rand.Read(payload) + + nBytes, err := conn.Write([]byte(payload)) + common.Must(err) + + if nBytes != len(payload) { + t.Error("only part of payload is written: ", nBytes) + } + + response := readFrom(conn, time.Second*20, 10240*1024) + if err := compare.BytesEqualWithDetail(response, xor([]byte(payload))); err != nil { + t.Error(err) + } + //}() + //} + //wg.Wait() +}