diff --git a/app/dns/nameserver.go b/app/dns/nameserver.go index 8b79ffd9b..678a1f7fd 100644 --- a/app/dns/nameserver.go +++ b/app/dns/nameserver.go @@ -10,7 +10,7 @@ import ( "github.com/v2ray/v2ray-core/common/alloc" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet/udp" "github.com/miekg/dns" ) @@ -43,7 +43,7 @@ type UDPNameServer struct { sync.Mutex address v2net.Destination requests map[uint16]*PendingRequest - udpServer *hub.UDPServer + udpServer *udp.UDPServer nextCleanup time.Time } @@ -51,7 +51,7 @@ func NewUDPNameServer(address v2net.Destination, dispatcher dispatcher.PacketDis s := &UDPNameServer{ address: address, requests: make(map[uint16]*PendingRequest), - udpServer: hub.NewUDPServer(dispatcher), + udpServer: udp.NewUDPServer(dispatcher), } return s } diff --git a/app/dns/server_test.go b/app/dns/server_test.go index d2c1bb8dc..19570f046 100644 --- a/app/dns/server_test.go +++ b/app/dns/server_test.go @@ -13,6 +13,7 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/freedom" "github.com/v2ray/v2ray-core/testing/assert" + "github.com/v2ray/v2ray-core/transport/internet" ) func TestDnsAdd(t *testing.T) { @@ -21,7 +22,16 @@ func TestDnsAdd(t *testing.T) { space := app.NewSpace() outboundHandlerManager := proxyman.NewDefaultOutboundHandlerManager() - outboundHandlerManager.SetDefaultHandler(freedom.NewFreedomConnection(&freedom.Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.AnyIP})) + outboundHandlerManager.SetDefaultHandler( + freedom.NewFreedomConnection( + &freedom.Config{}, + space, + &proxy.OutboundHandlerMeta{ + Address: v2net.AnyIP, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }, + })) space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundHandlerManager) space.BindApp(dispatcher.APP_ID, dispatchers.NewDefaultDispatcher(space)) diff --git a/common/net/network.go b/common/net/network.go index 218f3daf5..3fa9509e9 100644 --- a/common/net/network.go +++ b/common/net/network.go @@ -12,6 +12,9 @@ const ( // UDPNetwork represents the UDP network. UDPNetwork = Network("udp") + + // KCPNetwork represents the KCP network. + KCPNetwork = Network("kcp") ) // Network represents a communication network on internet. diff --git a/proxy/blackhole/blackhole.go b/proxy/blackhole/blackhole.go index 02202962e..8fa892f7d 100644 --- a/proxy/blackhole/blackhole.go +++ b/proxy/blackhole/blackhole.go @@ -6,6 +6,7 @@ import ( v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" + "github.com/v2ray/v2ray-core/transport/internet" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -33,9 +34,16 @@ func (this *BlackHole) Dispatch(destination v2net.Destination, payload *alloc.Bu return nil } -func init() { - internal.MustRegisterOutboundHandlerCreator("blackhole", - func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { - return NewBlackHole(space, config.(*Config), meta), nil - }) +type Factory struct{} + +func (this *Factory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP +} + +func (this *Factory) Create(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { + return NewBlackHole(space, config.(*Config), meta), nil +} + +func init() { + internal.MustRegisterOutboundHandlerCreator("blackhole", new(Factory)) } diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 6f588dead..a949d1b66 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -11,7 +11,8 @@ import ( v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" + "github.com/v2ray/v2ray-core/transport/internet/udp" ) type DokodemoDoor struct { @@ -22,9 +23,9 @@ type DokodemoDoor struct { address v2net.Address port v2net.Port packetDispatcher dispatcher.PacketDispatcher - tcpListener *hub.TCPHub - udpHub *hub.UDPHub - udpServer *hub.UDPServer + tcpListener *internet.TCPHub + udpHub *udp.UDPHub + udpServer *udp.UDPServer meta *proxy.InboundHandlerMeta } @@ -88,8 +89,8 @@ func (this *DokodemoDoor) Start() error { } func (this *DokodemoDoor) ListenUDP() error { - this.udpServer = hub.NewUDPServer(this.packetDispatcher) - udpHub, err := hub.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPackets) + this.udpServer = udp.NewUDPServer(this.packetDispatcher) + udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPackets) if err != nil { log.Error("Dokodemo failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -115,7 +116,8 @@ func (this *DokodemoDoor) handleUDPResponse(dest v2net.Destination, payload *all } func (this *DokodemoDoor) ListenTCP() error { - tcpListener, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.HandleTCPConnection, nil) + log.Info("Dokodemo: Stream settings: ", this.meta.StreamSettings) + tcpListener, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.HandleTCPConnection, this.meta.StreamSettings) if err != nil { log.Error("Dokodemo: Failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -126,7 +128,7 @@ func (this *DokodemoDoor) ListenTCP() error { return nil } -func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) { +func (this *DokodemoDoor) HandleTCPConnection(conn internet.Connection) { defer conn.Close() var dest v2net.Destination @@ -145,6 +147,7 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) { log.Info("Dokodemo: Unknown destination, stop forwarding...") return } + log.Info("Dokodemo: Handling request to ", dest) ray := this.packetDispatcher.DispatchToOutbound(dest) defer ray.InboundOutput().Release() @@ -177,9 +180,16 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) { inputFinish.Lock() } -func init() { - internal.MustRegisterInboundHandlerCreator("dokodemo-door", - func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { - return NewDokodemoDoor(rawConfig.(*Config), space, meta), nil - }) +type Factory struct{} + +func (this *Factory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP +} + +func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { + return NewDokodemoDoor(rawConfig.(*Config), space, meta), nil +} + +func init() { + internal.MustRegisterInboundHandlerCreator("dokodemo-door", new(Factory)) } diff --git a/proxy/dokodemo/dokodemo_test.go b/proxy/dokodemo/dokodemo_test.go index 5787a60c5..c0a8987ab 100644 --- a/proxy/dokodemo/dokodemo_test.go +++ b/proxy/dokodemo/dokodemo_test.go @@ -16,6 +16,7 @@ import ( "github.com/v2ray/v2ray-core/testing/assert" "github.com/v2ray/v2ray-core/testing/servers/tcp" "github.com/v2ray/v2ray-core/testing/servers/udp" + "github.com/v2ray/v2ray-core/transport/internet" ) func TestDokodemoTCP(t *testing.T) { @@ -40,7 +41,14 @@ func TestDokodemoTCP(t *testing.T) { ohm := proxyman.NewDefaultOutboundHandlerManager() ohm.SetDefaultHandler( freedom.NewFreedomConnection( - &freedom.Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.LocalHostIP})) + &freedom.Config{}, + space, + &proxy.OutboundHandlerMeta{ + Address: v2net.LocalHostIP, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }, + })) space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, ohm) data2Send := "Data to be sent to remote." @@ -51,7 +59,12 @@ func TestDokodemoTCP(t *testing.T) { Port: tcpServer.Port, Network: v2net.TCPNetwork.AsList(), Timeout: 600, - }, space, &proxy.InboundHandlerMeta{Address: v2net.LocalHostIP, Port: port}) + }, space, &proxy.InboundHandlerMeta{ + Address: v2net.LocalHostIP, + Port: port, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }}) defer dokodemo.Close() assert.Error(space.Initialize()).IsNil() @@ -100,7 +113,13 @@ func TestDokodemoUDP(t *testing.T) { ohm := proxyman.NewDefaultOutboundHandlerManager() ohm.SetDefaultHandler( freedom.NewFreedomConnection( - &freedom.Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.AnyIP})) + &freedom.Config{}, + space, + &proxy.OutboundHandlerMeta{ + Address: v2net.AnyIP, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }})) space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, ohm) data2Send := "Data to be sent to remote." @@ -111,7 +130,12 @@ func TestDokodemoUDP(t *testing.T) { Port: udpServer.Port, Network: v2net.UDPNetwork.AsList(), Timeout: 600, - }, space, &proxy.InboundHandlerMeta{Address: v2net.LocalHostIP, Port: port}) + }, space, &proxy.InboundHandlerMeta{ + Address: v2net.LocalHostIP, + Port: port, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }}) defer dokodemo.Close() assert.Error(space.Initialize()).IsNil() diff --git a/proxy/dokodemo/sockopt_linux.go b/proxy/dokodemo/sockopt_linux.go index 6c329be2f..71d24de7b 100644 --- a/proxy/dokodemo/sockopt_linux.go +++ b/proxy/dokodemo/sockopt_linux.go @@ -7,13 +7,15 @@ import ( "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" + "github.com/v2ray/v2ray-core/transport/internet/tcp" ) const SO_ORIGINAL_DST = 80 -func GetOriginalDestination(conn *hub.Connection) v2net.Destination { - fd, err := conn.SysFd() +func GetOriginalDestination(conn internet.Connection) v2net.Destination { + tcpConn := conn.(*tcp.Connection) + fd, err := tcpConn.SysFd() if err != nil { log.Info("Dokodemo: Failed to get original destination: ", err) return nil diff --git a/proxy/dokodemo/sockopt_other.go b/proxy/dokodemo/sockopt_other.go index 04dedd3d0..73e5b8214 100644 --- a/proxy/dokodemo/sockopt_other.go +++ b/proxy/dokodemo/sockopt_other.go @@ -4,9 +4,9 @@ package dokodemo import ( v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" ) -func GetOriginalDestination(conn *hub.Connection) v2net.Destination { +func GetOriginalDestination(conn internet.Connection) v2net.Destination { return nil } diff --git a/proxy/freedom/freedom.go b/proxy/freedom/freedom.go index b1a54515a..20fe91d16 100644 --- a/proxy/freedom/freedom.go +++ b/proxy/freedom/freedom.go @@ -2,7 +2,6 @@ package freedom import ( "io" - "net" "sync" "github.com/v2ray/v2ray-core/app" @@ -15,7 +14,8 @@ import ( "github.com/v2ray/v2ray-core/common/retry" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" + "github.com/v2ray/v2ray-core/transport/internet/tcp" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -75,12 +75,12 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload * defer ray.OutboundInput().Release() defer ray.OutboundOutput().Close() - var conn net.Conn + var conn internet.Connection if this.domainStrategy == DomainStrategyUseIP && destination.Address().IsDomain() { destination = this.ResolveIP(destination) } err := retry.Timed(5, 100).On(func() error { - rawConn, err := hub.DialWithoutCache(this.meta.Address, destination) + rawConn, err := internet.Dial(this.meta.Address, destination, this.meta.StreamSettings) if err != nil { return err } @@ -130,7 +130,7 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload * }() writeMutex.Lock() - if tcpConn, ok := conn.(*net.TCPConn); ok { + if tcpConn, ok := conn.(*tcp.RawConnection); ok { tcpConn.CloseWrite() } readMutex.Lock() @@ -138,9 +138,16 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload * return nil } -func init() { - internal.MustRegisterOutboundHandlerCreator("freedom", - func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { - return NewFreedomConnection(config.(*Config), space, meta), nil - }) +type FreedomFactory struct{} + +func (this *FreedomFactory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP +} + +func (this *FreedomFactory) Create(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { + return NewFreedomConnection(config.(*Config), space, meta), nil +} + +func init() { + internal.MustRegisterOutboundHandlerCreator("freedom", new(FreedomFactory)) } diff --git a/proxy/freedom/freedom_test.go b/proxy/freedom/freedom_test.go index c9f5ec8e3..0d0e5950d 100644 --- a/proxy/freedom/freedom_test.go +++ b/proxy/freedom/freedom_test.go @@ -18,6 +18,7 @@ import ( . "github.com/v2ray/v2ray-core/proxy/freedom" "github.com/v2ray/v2ray-core/testing/assert" "github.com/v2ray/v2ray-core/testing/servers/tcp" + "github.com/v2ray/v2ray-core/transport/internet" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -38,7 +39,15 @@ func TestSinglePacket(t *testing.T) { assert.Error(err).IsNil() space := app.NewSpace() - freedom := NewFreedomConnection(&Config{}, space, &proxy.OutboundHandlerMeta{Address: v2net.AnyIP}) + freedom := NewFreedomConnection( + &Config{}, + space, + &proxy.OutboundHandlerMeta{ + Address: v2net.AnyIP, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }, + }) space.Initialize() traffic := ray.NewRay() @@ -58,7 +67,15 @@ func TestSinglePacket(t *testing.T) { func TestUnreachableDestination(t *testing.T) { assert := assert.On(t) - freedom := NewFreedomConnection(&Config{}, app.NewSpace(), &proxy.OutboundHandlerMeta{Address: v2net.AnyIP}) + freedom := NewFreedomConnection( + &Config{}, + app.NewSpace(), + &proxy.OutboundHandlerMeta{ + Address: v2net.AnyIP, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }, + }) traffic := ray.NewRay() data2Send := "Data to be sent to remote" payload := alloc.NewSmallBuffer().Clear().Append([]byte(data2Send)) @@ -85,7 +102,12 @@ func TestIPResolution(t *testing.T) { freedom := NewFreedomConnection( &Config{DomainStrategy: DomainStrategyUseIP}, space, - &proxy.OutboundHandlerMeta{Address: v2net.AnyIP}) + &proxy.OutboundHandlerMeta{ + Address: v2net.AnyIP, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }, + }) space.Initialize() diff --git a/proxy/http/server.go b/proxy/http/server.go index 81132c8fa..7e9fd266b 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -2,7 +2,6 @@ package http import ( "bufio" - "crypto/tls" "io" "net" "net/http" @@ -18,7 +17,7 @@ import ( v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -28,7 +27,7 @@ type Server struct { accepting bool packetDispatcher dispatcher.PacketDispatcher config *Config - tcpListener *hub.TCPHub + tcpListener *internet.TCPHub meta *proxy.InboundHandlerMeta } @@ -59,11 +58,7 @@ func (this *Server) Start() error { return nil } - var tlsConfig *tls.Config - if this.config.TLSConfig != nil { - tlsConfig = this.config.TLSConfig.GetConfig() - } - tcpListener, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, tlsConfig) + tcpListener, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, this.meta.StreamSettings) if err != nil { log.Error("HTTP: Failed listen on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -98,7 +93,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error return v2net.TCPDestination(v2net.DomainAddress(host), port), nil } -func (this *Server) handleConnection(conn *hub.Connection) { +func (this *Server) handleConnection(conn internet.Connection) { defer conn.Close() reader := bufio.NewReader(conn) @@ -269,15 +264,22 @@ func (this *Server) handlePlainHTTP(request *http.Request, dest v2net.Destinatio finish.Wait() } -func init() { - internal.MustRegisterInboundHandlerCreator("http", - func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { - if !space.HasApp(dispatcher.APP_ID) { - return nil, internal.ErrorBadConfiguration - } - return NewServer( - rawConfig.(*Config), - space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), - meta), nil - }) +type ServerFactory struct{} + +func (this *ServerFactory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP +} + +func (this *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { + if !space.HasApp(dispatcher.APP_ID) { + return nil, internal.ErrorBadConfiguration + } + return NewServer( + rawConfig.(*Config), + space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), + meta), nil +} + +func init() { + internal.MustRegisterInboundHandlerCreator("http", new(ServerFactory)) } diff --git a/proxy/http/server_test.go b/proxy/http/server_test.go index bbd5fe374..f62c5f043 100644 --- a/proxy/http/server_test.go +++ b/proxy/http/server_test.go @@ -12,6 +12,9 @@ import ( "github.com/v2ray/v2ray-core/proxy" . "github.com/v2ray/v2ray-core/proxy/http" "github.com/v2ray/v2ray-core/testing/assert" + "github.com/v2ray/v2ray-core/transport/internet" + + _ "github.com/v2ray/v2ray-core/transport/internet/tcp" ) func TestHopByHopHeadersStrip(t *testing.T) { @@ -54,7 +57,15 @@ func TestNormalGetRequest(t *testing.T) { testPacketDispatcher := testdispatcher.NewTestPacketDispatcher(nil) port := v2nettesting.PickPort() - httpProxy := NewServer(&Config{}, testPacketDispatcher, &proxy.InboundHandlerMeta{Address: v2net.LocalHostIP, Port: port}) + httpProxy := NewServer( + &Config{}, + testPacketDispatcher, + &proxy.InboundHandlerMeta{ + Address: v2net.LocalHostIP, + Port: port, + StreamSettings: &internet.StreamSettings{ + Type: internet.StreamConnectionTypeRawTCP, + }}) defer httpProxy.Close() err := httpProxy.Start() diff --git a/proxy/internal/creator.go b/proxy/internal/creator.go index 8a2fa5e8c..37f7a21e6 100644 --- a/proxy/internal/creator.go +++ b/proxy/internal/creator.go @@ -3,7 +3,15 @@ package internal import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport/internet" ) -type InboundHandlerCreator func(space app.Space, config interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) -type OutboundHandlerCreator func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) +type InboundHandlerFactory interface { + StreamCapability() internet.StreamConnectionType + Create(space app.Space, config interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) +} + +type OutboundHandlerFactory interface { + StreamCapability() internet.StreamConnectionType + Create(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) +} diff --git a/proxy/internal/handler_cache.go b/proxy/internal/handler_cache.go index b016a1ffc..16e7c4e28 100644 --- a/proxy/internal/handler_cache.go +++ b/proxy/internal/handler_cache.go @@ -5,18 +5,19 @@ import ( "github.com/v2ray/v2ray-core/app" "github.com/v2ray/v2ray-core/proxy" + "github.com/v2ray/v2ray-core/transport/internet" ) var ( - inboundFactories = make(map[string]InboundHandlerCreator) - outboundFactories = make(map[string]OutboundHandlerCreator) + inboundFactories = make(map[string]InboundHandlerFactory) + outboundFactories = make(map[string]OutboundHandlerFactory) ErrorProxyNotFound = errors.New("Proxy not found.") ErrorNameExists = errors.New("Proxy with the same name already exists.") ErrorBadConfiguration = errors.New("Bad proxy configuration.") ) -func RegisterInboundHandlerCreator(name string, creator InboundHandlerCreator) error { +func RegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) error { if _, found := inboundFactories[name]; found { return ErrorNameExists } @@ -24,13 +25,13 @@ func RegisterInboundHandlerCreator(name string, creator InboundHandlerCreator) e return nil } -func MustRegisterInboundHandlerCreator(name string, creator InboundHandlerCreator) { +func MustRegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) { if err := RegisterInboundHandlerCreator(name, creator); err != nil { panic(err) } } -func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerCreator) error { +func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) error { if _, found := outboundFactories[name]; found { return ErrorNameExists } @@ -38,7 +39,7 @@ func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerCreator) return nil } -func MustRegisterOutboundHandlerCreator(name string, creator OutboundHandlerCreator) { +func MustRegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) { if err := RegisterOutboundHandlerCreator(name, creator); err != nil { panic(err) } @@ -49,14 +50,22 @@ func CreateInboundHandler(name string, space app.Space, rawConfig []byte, meta * if !found { return nil, ErrorProxyNotFound } + if meta.StreamSettings == nil { + meta.StreamSettings = &internet.StreamSettings{ + Type: creator.StreamCapability(), + } + } else { + meta.StreamSettings.Type &= creator.StreamCapability() + } + if len(rawConfig) > 0 { proxyConfig, err := CreateInboundConfig(name, rawConfig) if err != nil { return nil, err } - return creator(space, proxyConfig, meta) + return creator.Create(space, proxyConfig, meta) } - return creator(space, nil, meta) + return creator.Create(space, nil, meta) } func CreateOutboundHandler(name string, space app.Space, rawConfig []byte, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { @@ -64,14 +73,21 @@ func CreateOutboundHandler(name string, space app.Space, rawConfig []byte, meta if !found { return nil, ErrorProxyNotFound } + if meta.StreamSettings == nil { + meta.StreamSettings = &internet.StreamSettings{ + Type: creator.StreamCapability(), + } + } else { + meta.StreamSettings.Type &= creator.StreamCapability() + } if len(rawConfig) > 0 { proxyConfig, err := CreateOutboundConfig(name, rawConfig) if err != nil { return nil, err } - return creator(space, proxyConfig, meta) + return creator.Create(space, proxyConfig, meta) } - return creator(space, nil, meta) + return creator.Create(space, nil, meta) } diff --git a/proxy/proxy.go b/proxy/proxy.go index f0c100481..c8ee37e9d 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -4,6 +4,7 @@ package proxy // import "github.com/v2ray/v2ray-core/proxy" import ( "github.com/v2ray/v2ray-core/common/alloc" v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport/internet" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -15,18 +16,16 @@ const ( ) type InboundHandlerMeta struct { - Tag string - Address v2net.Address - Port v2net.Port - //Whether this proxy support KCP connections - KcpSupported bool + Tag string + Address v2net.Address + Port v2net.Port + StreamSettings *internet.StreamSettings } type OutboundHandlerMeta struct { - Tag string - Address v2net.Address - //Whether this proxy support KCP connections - KcpSupported bool + Tag string + Address v2net.Address + StreamSettings *internet.StreamSettings } // An InboundHandler handles inbound network connections to V2Ray. diff --git a/proxy/shadowsocks/server.go b/proxy/shadowsocks/server.go index 7b5eee313..6dadf88cf 100644 --- a/proxy/shadowsocks/server.go +++ b/proxy/shadowsocks/server.go @@ -16,7 +16,8 @@ import ( "github.com/v2ray/v2ray-core/common/protocol" "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" + "github.com/v2ray/v2ray-core/transport/internet/udp" ) type Server struct { @@ -24,9 +25,9 @@ type Server struct { config *Config meta *proxy.InboundHandlerMeta accepting bool - tcpHub *hub.TCPHub - udpHub *hub.UDPHub - udpServer *hub.UDPServer + tcpHub *internet.TCPHub + udpHub *udp.UDPHub + udpServer *udp.UDPServer } func NewServer(config *Config, packetDispatcher dispatcher.PacketDispatcher, meta *proxy.InboundHandlerMeta) *Server { @@ -61,7 +62,7 @@ func (this *Server) Start() error { return nil } - tcpHub, err := hub.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, nil) + tcpHub, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.handleConnection, this.meta.StreamSettings) if err != nil { log.Error("Shadowsocks: Failed to listen TCP on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -69,8 +70,8 @@ func (this *Server) Start() error { this.tcpHub = tcpHub if this.config.UDP { - this.udpServer = hub.NewUDPServer(this.packetDispatcher) - udpHub, err := hub.ListenUDP(this.meta.Address, this.meta.Port, this.handlerUDPPayload) + this.udpServer = udp.NewUDPServer(this.packetDispatcher) + udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handlerUDPPayload) if err != nil { log.Error("Shadowsocks: Failed to listen UDP on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -154,7 +155,7 @@ func (this *Server) handlerUDPPayload(payload *alloc.Buffer, source v2net.Destin }) } -func (this *Server) handleConnection(conn *hub.Connection) { +func (this *Server) handleConnection(conn internet.Connection) { defer conn.Close() buffer := alloc.NewSmallBuffer() @@ -248,15 +249,22 @@ func (this *Server) handleConnection(conn *hub.Connection) { writeFinish.Lock() } -func init() { - internal.MustRegisterInboundHandlerCreator("shadowsocks", - func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { - if !space.HasApp(dispatcher.APP_ID) { - return nil, internal.ErrorBadConfiguration - } - return NewServer( - rawConfig.(*Config), - space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), - meta), nil - }) +type ServerFactory struct{} + +func (this *ServerFactory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP +} + +func (this *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { + if !space.HasApp(dispatcher.APP_ID) { + return nil, internal.ErrorBadConfiguration + } + return NewServer( + rawConfig.(*Config), + space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), + meta), nil +} + +func init() { + internal.MustRegisterInboundHandlerCreator("shadowsocks", new(ServerFactory)) } diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 7ea444f5b..b427d2bb0 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -14,7 +14,8 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" "github.com/v2ray/v2ray-core/proxy/socks/protocol" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" + "github.com/v2ray/v2ray-core/transport/internet/udp" ) var ( @@ -29,10 +30,10 @@ type Server struct { accepting bool packetDispatcher dispatcher.PacketDispatcher config *Config - tcpListener *hub.TCPHub - udpHub *hub.UDPHub + tcpListener *internet.TCPHub + udpHub *udp.UDPHub udpAddress v2net.Destination - udpServer *hub.UDPServer + udpServer *udp.UDPServer meta *proxy.InboundHandlerMeta } @@ -73,11 +74,11 @@ func (this *Server) Start() error { return nil } - listener, err := hub.ListenTCP( + listener, err := internet.ListenTCP( this.meta.Address, this.meta.Port, this.handleConnection, - nil) + this.meta.StreamSettings) if err != nil { log.Error("Socks: failed to listen on ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -92,7 +93,7 @@ func (this *Server) Start() error { return nil } -func (this *Server) handleConnection(connection *hub.Connection) { +func (this *Server) handleConnection(connection internet.Connection) { defer connection.Close() timedReader := v2net.NewTimeOutReader(120, connection) @@ -302,15 +303,22 @@ func (this *Server) transport(reader io.Reader, writer io.Writer, destination v2 outputFinish.Lock() } -func init() { - internal.MustRegisterInboundHandlerCreator("socks", - func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { - if !space.HasApp(dispatcher.APP_ID) { - return nil, internal.ErrorBadConfiguration - } - return NewServer( - rawConfig.(*Config), - space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), - meta), nil - }) +type ServerFactory struct{} + +func (this *ServerFactory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP +} + +func (this *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { + if !space.HasApp(dispatcher.APP_ID) { + return nil, internal.ErrorBadConfiguration + } + return NewServer( + rawConfig.(*Config), + space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), + meta), nil +} + +func init() { + internal.MustRegisterInboundHandlerCreator("socks", new(ServerFactory)) } diff --git a/proxy/socks/server_test.go b/proxy/socks/server_test.go deleted file mode 100644 index 021813a85..000000000 --- a/proxy/socks/server_test.go +++ /dev/null @@ -1,266 +0,0 @@ -package socks_test - -import ( - "bytes" - "fmt" - "io/ioutil" - "net" - "testing" - - "golang.org/x/net/proxy" - - "github.com/v2ray/v2ray-core/app" - "github.com/v2ray/v2ray-core/app/dns" - v2net "github.com/v2ray/v2ray-core/common/net" - v2nettesting "github.com/v2ray/v2ray-core/common/net/testing" - v2proxy "github.com/v2ray/v2ray-core/proxy" - proxytesting "github.com/v2ray/v2ray-core/proxy/testing" - proxymocks "github.com/v2ray/v2ray-core/proxy/testing/mocks" - "github.com/v2ray/v2ray-core/shell/point" - "github.com/v2ray/v2ray-core/testing/assert" -) - -func TestSocksTcpConnect(t *testing.T) { - assert := assert.On(t) - port := v2nettesting.PickPort() - - connInput := []byte("The data to be returned to socks server.") - connOutput := bytes.NewBuffer(make([]byte, 0, 1024)) - och := &proxymocks.OutboundConnectionHandler{ - ConnOutput: connOutput, - ConnInput: bytes.NewReader(connInput), - } - - protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och", - func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) { - return och, nil - }) - assert.Error(err).IsNil() - - config := &point.Config{ - Port: port, - InboundConfig: &point.InboundConnectionConfig{ - Protocol: "socks", - ListenOn: v2net.LocalHostIP, - Settings: []byte(` - { - "auth": "noauth" - }`), - }, - DNSConfig: &dns.Config{ - NameServers: []v2net.Destination{ - v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)), - }, - }, - OutboundConfig: &point.OutboundConnectionConfig{ - Protocol: protocol, - Settings: nil, - }, - } - - point, err := point.NewPoint(config) - assert.Error(err).IsNil() - - err = point.Start() - assert.Error(err).IsNil() - - socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), nil, proxy.Direct) - assert.Error(err).IsNil() - - targetServer := "google.com:80" - conn, err := socks5Client.Dial("tcp", targetServer) - assert.Error(err).IsNil() - - data2Send := "The data to be sent to remote server." - conn.Write([]byte(data2Send)) - if tcpConn, ok := conn.(*net.TCPConn); ok { - tcpConn.CloseWrite() - } - - dataReturned, err := ioutil.ReadAll(conn) - assert.Error(err).IsNil() - conn.Close() - - assert.Bytes([]byte(data2Send)).Equals(connOutput.Bytes()) - assert.Bytes(dataReturned).Equals(connInput) - assert.String(targetServer).Equals(och.Destination.NetAddr()) -} - -func TestSocksTcpConnectWithUserPass(t *testing.T) { - assert := assert.On(t) - port := v2nettesting.PickPort() - - connInput := []byte("The data to be returned to socks server.") - connOutput := bytes.NewBuffer(make([]byte, 0, 1024)) - och := &proxymocks.OutboundConnectionHandler{ - ConnInput: bytes.NewReader(connInput), - ConnOutput: connOutput, - } - - protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och", - func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) { - return och, nil - }) - assert.Error(err).IsNil() - - config := &point.Config{ - Port: port, - InboundConfig: &point.InboundConnectionConfig{ - Protocol: "socks", - ListenOn: v2net.LocalHostIP, - Settings: []byte(` - { - "auth": "password", - "accounts": [ - {"user": "userx", "pass": "passy"} - ] - }`), - }, - DNSConfig: &dns.Config{ - NameServers: []v2net.Destination{ - v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)), - }, - }, - OutboundConfig: &point.OutboundConnectionConfig{ - Protocol: protocol, - Settings: nil, - }, - } - - point, err := point.NewPoint(config) - assert.Error(err).IsNil() - - err = point.Start() - assert.Error(err).IsNil() - - socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), &proxy.Auth{User: "userx", Password: "passy"}, proxy.Direct) - assert.Error(err).IsNil() - - targetServer := "1.2.3.4:443" - conn, err := socks5Client.Dial("tcp", targetServer) - assert.Error(err).IsNil() - - data2Send := "The data to be sent to remote server." - conn.Write([]byte(data2Send)) - if tcpConn, ok := conn.(*net.TCPConn); ok { - tcpConn.CloseWrite() - } - - dataReturned, err := ioutil.ReadAll(conn) - assert.Error(err).IsNil() - conn.Close() - - assert.Bytes([]byte(data2Send)).Equals(connOutput.Bytes()) - assert.Bytes(dataReturned).Equals(connInput) - assert.String(targetServer).Equals(och.Destination.NetAddr()) -} - -func TestSocksTcpConnectWithWrongUserPass(t *testing.T) { - assert := assert.On(t) - port := v2nettesting.PickPort() - - connInput := []byte("The data to be returned to socks server.") - connOutput := bytes.NewBuffer(make([]byte, 0, 1024)) - och := &proxymocks.OutboundConnectionHandler{ - ConnInput: bytes.NewReader(connInput), - ConnOutput: connOutput, - } - - protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och", - func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) { - return och, nil - }) - assert.Error(err).IsNil() - - config := &point.Config{ - Port: port, - InboundConfig: &point.InboundConnectionConfig{ - Protocol: "socks", - ListenOn: v2net.LocalHostIP, - Settings: []byte(` - { - "auth": "password", - "accounts": [ - {"user": "userx", "pass": "passy"} - ] - }`), - }, - DNSConfig: &dns.Config{ - NameServers: []v2net.Destination{ - v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)), - }, - }, - OutboundConfig: &point.OutboundConnectionConfig{ - Protocol: protocol, - Settings: nil, - }, - } - - point, err := point.NewPoint(config) - assert.Error(err).IsNil() - - err = point.Start() - assert.Error(err).IsNil() - - socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), &proxy.Auth{User: "userx", Password: "passz"}, proxy.Direct) - assert.Error(err).IsNil() - - targetServer := "1.2.3.4:443" - _, err = socks5Client.Dial("tcp", targetServer) - assert.Error(err).IsNotNil() -} - -func TestSocksTcpConnectWithWrongAuthMethod(t *testing.T) { - assert := assert.On(t) - port := v2nettesting.PickPort() - - connInput := []byte("The data to be returned to socks server.") - connOutput := bytes.NewBuffer(make([]byte, 0, 1024)) - och := &proxymocks.OutboundConnectionHandler{ - ConnInput: bytes.NewReader(connInput), - ConnOutput: connOutput, - } - - protocol, err := proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och", - func(space app.Space, config interface{}, meta *v2proxy.OutboundHandlerMeta) (v2proxy.OutboundHandler, error) { - return och, nil - }) - assert.Error(err).IsNil() - - config := &point.Config{ - Port: port, - InboundConfig: &point.InboundConnectionConfig{ - ListenOn: v2net.LocalHostIP, - Protocol: "socks", - Settings: []byte(` - { - "auth": "password", - "accounts": [ - {"user": "userx", "pass": "passy"} - ] - }`), - }, - DNSConfig: &dns.Config{ - NameServers: []v2net.Destination{ - v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)), - }, - }, - OutboundConfig: &point.OutboundConnectionConfig{ - Protocol: protocol, - Settings: nil, - }, - } - - point, err := point.NewPoint(config) - assert.Error(err).IsNil() - - err = point.Start() - assert.Error(err).IsNil() - - socks5Client, err := proxy.SOCKS5("tcp", fmt.Sprintf("127.0.0.1:%d", port), nil, proxy.Direct) - assert.Error(err).IsNil() - - targetServer := "1.2.3.4:443" - _, err = socks5Client.Dial("tcp", targetServer) - assert.Error(err).IsNotNil() -} diff --git a/proxy/socks/server_udp.go b/proxy/socks/server_udp.go index 5dffb5081..bc4da3c2f 100644 --- a/proxy/socks/server_udp.go +++ b/proxy/socks/server_udp.go @@ -5,12 +5,12 @@ import ( "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/proxy/socks/protocol" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet/udp" ) func (this *Server) listenUDP() error { - this.udpServer = hub.NewUDPServer(this.packetDispatcher) - udpHub, err := hub.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPayload) + this.udpServer = udp.NewUDPServer(this.packetDispatcher) + udpHub, err := udp.ListenUDP(this.meta.Address, this.meta.Port, this.handleUDPPayload) if err != nil { log.Error("Socks: Failed to listen on udp ", this.meta.Address, ":", this.meta.Port) return err diff --git a/proxy/testing/proxy.go b/proxy/testing/proxy.go index e8b2b3fb6..a6d8f0c36 100644 --- a/proxy/testing/proxy.go +++ b/proxy/testing/proxy.go @@ -13,7 +13,7 @@ func randomString() string { return fmt.Sprintf("-%d", count) } -func RegisterInboundConnectionHandlerCreator(prefix string, creator internal.InboundHandlerCreator) (string, error) { +func RegisterInboundConnectionHandlerCreator(prefix string, creator internal.InboundHandlerFactory) (string, error) { for { name := prefix + randomString() err := internal.RegisterInboundHandlerCreator(name, creator) @@ -23,7 +23,7 @@ func RegisterInboundConnectionHandlerCreator(prefix string, creator internal.Inb } } -func RegisterOutboundConnectionHandlerCreator(prefix string, creator internal.OutboundHandlerCreator) (string, error) { +func RegisterOutboundConnectionHandlerCreator(prefix string, creator internal.OutboundHandlerFactory) (string, error) { for { name := prefix + randomString() err := internal.RegisterOutboundHandlerCreator(name, creator) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index e084da2ed..9954dcdbf 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -17,8 +17,7 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" - "github.com/v2ray/v2ray-core/transport" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" ) type userByEmail struct { @@ -72,7 +71,7 @@ type VMessInboundHandler struct { clients protocol.UserValidator usersByEmail *userByEmail accepting bool - listener *hub.TCPHub + listener *internet.TCPHub detours *DetourConfig meta *proxy.InboundHandlerMeta } @@ -106,7 +105,7 @@ func (this *VMessInboundHandler) Start() error { return nil } - tcpListener, err := hub.ListenTCP6(this.meta.Address, this.meta.Port, this.HandleConnection, this.meta, nil) + tcpListener, err := internet.ListenTCP(this.meta.Address, this.meta.Port, this.HandleConnection, this.meta.StreamSettings) if err != nil { log.Error("Unable to listen tcp ", this.meta.Address, ":", this.meta.Port, ": ", err) return err @@ -118,7 +117,7 @@ func (this *VMessInboundHandler) Start() error { return nil } -func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { +func (this *VMessInboundHandler) HandleConnection(connection internet.Connection) { defer connection.Close() connReader := v2net.NewTimeOutReader(8, connection) @@ -140,11 +139,9 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { return } log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "") - log.Debug("VMessIn: Received request for ", request.Destination()) + log.Info("VMessIn: Received request for ", request.Destination()) - if request.Option.Has(protocol.RequestOptionConnectionReuse) { - connection.SetReusable(true) - } + connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse)) ray := this.packetDispatcher.DispatchToOutbound(request.Destination()) input := ray.InboundInput() @@ -184,7 +181,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { Command: this.generateCommand(request), } - if request.Option.Has(protocol.RequestOptionConnectionReuse) && transport.IsConnectionReusable() { + if connection.Reusable() { response.Option.Set(protocol.ResponseOptionConnectionReuse) } @@ -220,36 +217,39 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.Connection) { readFinish.Lock() } -func (this *VMessInboundHandler) setProxyCap() { - this.meta.KcpSupported = true + +type Factory struct{} + +func (this *Factory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP } + +func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { + if !space.HasApp(dispatcher.APP_ID) { + return nil, internal.ErrorBadConfiguration + } + config := rawConfig.(*Config) + + allowedClients := protocol.NewTimedUserValidator(protocol.DefaultIDHash) + for _, user := range config.AllowedUsers { + allowedClients.Add(user) + } + + handler := &VMessInboundHandler{ + packetDispatcher: space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), + clients: allowedClients, + detours: config.DetourConfig, + usersByEmail: NewUserByEmail(config.AllowedUsers, config.Defaults), + meta: meta, + } + + if space.HasApp(proxyman.APP_ID_INBOUND_MANAGER) { + handler.inboundHandlerManager = space.GetApp(proxyman.APP_ID_INBOUND_MANAGER).(proxyman.InboundHandlerManager) + } + + return handler, nil +} + func init() { - internal.MustRegisterInboundHandlerCreator("vmess", - func(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { - if !space.HasApp(dispatcher.APP_ID) { - return nil, internal.ErrorBadConfiguration - } - config := rawConfig.(*Config) - - allowedClients := protocol.NewTimedUserValidator(protocol.DefaultIDHash) - for _, user := range config.AllowedUsers { - allowedClients.Add(user) - } - - handler := &VMessInboundHandler{ - packetDispatcher: space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher), - clients: allowedClients, - detours: config.DetourConfig, - usersByEmail: NewUserByEmail(config.AllowedUsers, config.Defaults), - meta: meta, - } - - if space.HasApp(proxyman.APP_ID_INBOUND_MANAGER) { - handler.inboundHandlerManager = space.GetApp(proxyman.APP_ID_INBOUND_MANAGER).(proxyman.InboundHandlerManager) - } - - handler.setProxyCap() - - return handler, nil - }) + internal.MustRegisterInboundHandlerCreator("vmess", new(Factory)) } diff --git a/proxy/vmess/outbound/outbound.go b/proxy/vmess/outbound/outbound.go index 517f4f633..b484da23f 100644 --- a/proxy/vmess/outbound/outbound.go +++ b/proxy/vmess/outbound/outbound.go @@ -15,8 +15,7 @@ import ( "github.com/v2ray/v2ray-core/proxy" "github.com/v2ray/v2ray-core/proxy/internal" vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io" - "github.com/v2ray/v2ray-core/transport" - "github.com/v2ray/v2ray-core/transport/hub" + "github.com/v2ray/v2ray-core/transport/internet" "github.com/v2ray/v2ray-core/transport/ray" ) @@ -30,11 +29,11 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al defer ray.OutboundOutput().Close() var rec *Receiver - var conn *hub.Connection + var conn internet.Connection err := retry.Timed(5, 100).On(func() error { rec = this.receiverManager.PickReceiver() - rawConn, err := hub.Dial3(this.meta.Address, rec.Destination, this.meta) + rawConn, err := internet.Dial(this.meta.Address, rec.Destination, this.meta.StreamSettings) if err != nil { return err } @@ -63,9 +62,9 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al defer conn.Close() - if transport.IsConnectionReusable() { + conn.SetReusable(true) + if conn.Reusable() { // Conn reuse may be disabled on transportation layer request.Option.Set(protocol.RequestOptionConnectionReuse) - conn.SetReusable(true) } input := ray.OutboundInput() @@ -85,7 +84,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al return nil } -func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) { +func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) { defer finish.Unlock() writer := v2io.NewBufferedWriter(conn) @@ -117,7 +116,7 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn return } -func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) { +func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) { defer finish.Unlock() reader := v2io.NewBufferedReader(conn) @@ -154,21 +153,24 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con return } -func (this *VMessOutboundHandler) setProxyCap() { - this.meta.KcpSupported = true + +type Factory struct{} + +func (this *Factory) StreamCapability() internet.StreamConnectionType { + return internet.StreamConnectionTypeRawTCP | internet.StreamConnectionTypeTCP } + +func (this *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { + vOutConfig := rawConfig.(*Config) + + handler := &VMessOutboundHandler{ + receiverManager: NewReceiverManager(vOutConfig.Receivers), + meta: meta, + } + + return handler, nil +} + func init() { - internal.MustRegisterOutboundHandlerCreator("vmess", - func(space app.Space, rawConfig interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { - vOutConfig := rawConfig.(*Config) - - handler := &VMessOutboundHandler{ - receiverManager: NewReceiverManager(vOutConfig.Receivers), - meta: meta, - } - - handler.setProxyCap() - - return handler, nil - }) + internal.MustRegisterOutboundHandlerCreator("vmess", new(Factory)) } diff --git a/proxy/vmess/vmess_test.go b/proxy/vmess/vmess_test.go deleted file mode 100644 index 477844815..000000000 --- a/proxy/vmess/vmess_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package vmess_test - -import ( - "bytes" - "testing" - - "github.com/v2ray/v2ray-core/app" - "github.com/v2ray/v2ray-core/app/dispatcher" - "github.com/v2ray/v2ray-core/app/dns" - v2net "github.com/v2ray/v2ray-core/common/net" - v2nettesting "github.com/v2ray/v2ray-core/common/net/testing" - "github.com/v2ray/v2ray-core/common/protocol" - "github.com/v2ray/v2ray-core/common/uuid" - "github.com/v2ray/v2ray-core/proxy" - proxytesting "github.com/v2ray/v2ray-core/proxy/testing" - proxymocks "github.com/v2ray/v2ray-core/proxy/testing/mocks" - _ "github.com/v2ray/v2ray-core/proxy/vmess/inbound" - _ "github.com/v2ray/v2ray-core/proxy/vmess/outbound" - "github.com/v2ray/v2ray-core/shell/point" - "github.com/v2ray/v2ray-core/testing/assert" -) - -func TestVMessInAndOut(t *testing.T) { - assert := assert.On(t) - - id, err := uuid.ParseString("ad937d9d-6e23-4a5a-ba23-bce5092a7c51") - assert.Error(err).IsNil() - - testAccount := protocol.NewID(id) - - portA := v2nettesting.PickPort() - portB := v2nettesting.PickPort() - - ichConnInput := []byte("The data to be send to outbound server.") - ichConnOutput := bytes.NewBuffer(make([]byte, 0, 1024)) - ich := &proxymocks.InboundConnectionHandler{ - ConnInput: bytes.NewReader(ichConnInput), - ConnOutput: ichConnOutput, - } - - protocol, err := proxytesting.RegisterInboundConnectionHandlerCreator("mock_ich", - func(space app.Space, config interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) { - ich.ListeningAddress = meta.Address - ich.ListeningPort = meta.Port - ich.PacketDispatcher = space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher) - return ich, nil - }) - assert.Error(err).IsNil() - - configA := &point.Config{ - Port: portA, - DNSConfig: &dns.Config{ - NameServers: []v2net.Destination{ - v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)), - }, - }, - InboundConfig: &point.InboundConnectionConfig{ - Protocol: protocol, - ListenOn: v2net.LocalHostIP, - Settings: nil, - }, - OutboundConfig: &point.OutboundConnectionConfig{ - Protocol: "vmess", - Settings: []byte(`{ - "vnext": [ - { - "address": "127.0.0.1", - "port": ` + portB.String() + `, - "users": [ - {"id": "` + testAccount.String() + `"} - ] - } - ] - }`), - }, - } - - pointA, err := point.NewPoint(configA) - assert.Error(err).IsNil() - - err = pointA.Start() - assert.Error(err).IsNil() - - ochConnInput := []byte("The data to be returned to inbound server.") - ochConnOutput := bytes.NewBuffer(make([]byte, 0, 1024)) - och := &proxymocks.OutboundConnectionHandler{ - ConnInput: bytes.NewReader(ochConnInput), - ConnOutput: ochConnOutput, - } - - protocol, err = proxytesting.RegisterOutboundConnectionHandlerCreator("mock_och", - func(space app.Space, config interface{}, meta *proxy.OutboundHandlerMeta) (proxy.OutboundHandler, error) { - return och, nil - }) - assert.Error(err).IsNil() - - configB := &point.Config{ - Port: portB, - DNSConfig: &dns.Config{ - NameServers: []v2net.Destination{ - v2net.UDPDestination(v2net.DomainAddress("localhost"), v2net.Port(53)), - }, - }, - InboundConfig: &point.InboundConnectionConfig{ - Protocol: "vmess", - ListenOn: v2net.LocalHostIP, - Settings: []byte(`{ - "clients": [ - {"id": "` + testAccount.String() + `"} - ] - }`), - }, - OutboundConfig: &point.OutboundConnectionConfig{ - Protocol: protocol, - Settings: nil, - }, - } - - pointB, err := point.NewPoint(configB) - assert.Error(err).IsNil() - - err = pointB.Start() - assert.Error(err).IsNil() - - dest := v2net.TCPDestination(v2net.IPAddress([]byte{1, 2, 3, 4}), 80) - ich.Communicate(dest) - assert.Bytes(ichConnInput).Equals(ochConnOutput.Bytes()) - assert.Bytes(ichConnOutput.Bytes()).Equals(ochConnInput) -} diff --git a/release/server/main.go b/release/server/main.go index 8eef146c8..300727925 100644 --- a/release/server/main.go +++ b/release/server/main.go @@ -21,6 +21,10 @@ import ( _ "github.com/v2ray/v2ray-core/proxy/socks" _ "github.com/v2ray/v2ray-core/proxy/vmess/inbound" _ "github.com/v2ray/v2ray-core/proxy/vmess/outbound" + + _ "github.com/v2ray/v2ray-core/transport/internet/kcp" + _ "github.com/v2ray/v2ray-core/transport/internet/tcp" + _ "github.com/v2ray/v2ray-core/transport/internet/udp" ) var ( diff --git a/shell/point/config.go b/shell/point/config.go index bf838bcc7..0ac32c9c6 100644 --- a/shell/point/config.go +++ b/shell/point/config.go @@ -6,19 +6,22 @@ import ( "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport" + "github.com/v2ray/v2ray-core/transport/internet" ) type InboundConnectionConfig struct { - Port v2net.Port - ListenOn v2net.Address - Protocol string - Settings []byte + Port v2net.Port + ListenOn v2net.Address + StreamSettings *internet.StreamSettings + Protocol string + Settings []byte } type OutboundConnectionConfig struct { - Protocol string - SendThrough v2net.Address - Settings []byte + Protocol string + SendThrough v2net.Address + StreamSettings *internet.StreamSettings + Settings []byte } type LogConfig struct { @@ -40,19 +43,21 @@ type InboundDetourAllocationConfig struct { } type InboundDetourConfig struct { - Protocol string - PortRange v2net.PortRange - ListenOn v2net.Address - Tag string - Allocation *InboundDetourAllocationConfig - Settings []byte + Protocol string + PortRange v2net.PortRange + ListenOn v2net.Address + Tag string + Allocation *InboundDetourAllocationConfig + StreamSettings *internet.StreamSettings + Settings []byte } type OutboundDetourConfig struct { - Protocol string - SendThrough v2net.Address - Tag string - Settings []byte + Protocol string + SendThrough v2net.Address + StreamSettings *internet.StreamSettings + Tag string + Settings []byte } type Config struct { diff --git a/shell/point/config_json.go b/shell/point/config_json.go index 6c408f35b..b16e8ec62 100644 --- a/shell/point/config_json.go +++ b/shell/point/config_json.go @@ -14,6 +14,7 @@ import ( "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" "github.com/v2ray/v2ray-core/transport" + "github.com/v2ray/v2ray-core/transport/internet" ) const ( @@ -57,10 +58,11 @@ func (this *Config) UnmarshalJSON(data []byte) error { func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error { type JsonConfig struct { - Port uint16 `json:"port"` - Listen *v2net.AddressJson `json:"listen"` - Protocol string `json:"protocol"` - Settings json.RawMessage `json:"settings"` + Port uint16 `json:"port"` + Listen *v2net.AddressJson `json:"listen"` + Protocol string `json:"protocol"` + StreamSetting *internet.StreamSettings `json:"streamSettings"` + Settings json.RawMessage `json:"settings"` } jsonConfig := new(JsonConfig) @@ -75,6 +77,9 @@ func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error { } this.ListenOn = jsonConfig.Listen.Address } + if jsonConfig.StreamSetting != nil { + this.StreamSettings = jsonConfig.StreamSetting + } this.Protocol = jsonConfig.Protocol this.Settings = jsonConfig.Settings @@ -83,9 +88,10 @@ func (this *InboundConnectionConfig) UnmarshalJSON(data []byte) error { func (this *OutboundConnectionConfig) UnmarshalJSON(data []byte) error { type JsonConnectionConfig struct { - Protocol string `json:"protocol"` - SendThrough *v2net.AddressJson `json:"sendThrough"` - Settings json.RawMessage `json:"settings"` + Protocol string `json:"protocol"` + SendThrough *v2net.AddressJson `json:"sendThrough"` + StreamSetting *internet.StreamSettings `json:"streamSettings"` + Settings json.RawMessage `json:"settings"` } jsonConfig := new(JsonConnectionConfig) if err := json.Unmarshal(data, jsonConfig); err != nil { @@ -101,6 +107,9 @@ func (this *OutboundConnectionConfig) UnmarshalJSON(data []byte) error { } this.SendThrough = address } + if jsonConfig.StreamSetting != nil { + this.StreamSettings = jsonConfig.StreamSetting + } return nil } @@ -162,12 +171,13 @@ func (this *InboundDetourAllocationConfig) UnmarshalJSON(data []byte) error { func (this *InboundDetourConfig) UnmarshalJSON(data []byte) error { type JsonInboundDetourConfig struct { - Protocol string `json:"protocol"` - PortRange *v2net.PortRange `json:"port"` - ListenOn *v2net.AddressJson `json:"listen"` - Settings json.RawMessage `json:"settings"` - Tag string `json:"tag"` - Allocation *InboundDetourAllocationConfig `json:"allocate"` + Protocol string `json:"protocol"` + PortRange *v2net.PortRange `json:"port"` + ListenOn *v2net.AddressJson `json:"listen"` + Settings json.RawMessage `json:"settings"` + Tag string `json:"tag"` + Allocation *InboundDetourAllocationConfig `json:"allocate"` + StreamSetting *internet.StreamSettings `json:"streamSettings"` } jsonConfig := new(JsonInboundDetourConfig) if err := json.Unmarshal(data, jsonConfig); err != nil { @@ -195,15 +205,19 @@ func (this *InboundDetourConfig) UnmarshalJSON(data []byte) error { Refresh: DefaultRefreshMinute, } } + if jsonConfig.StreamSetting != nil { + this.StreamSettings = jsonConfig.StreamSetting + } return nil } func (this *OutboundDetourConfig) UnmarshalJSON(data []byte) error { type JsonOutboundDetourConfig struct { - Protocol string `json:"protocol"` - SendThrough *v2net.AddressJson `json:"sendThrough"` - Tag string `json:"tag"` - Settings json.RawMessage `json:"settings"` + Protocol string `json:"protocol"` + SendThrough *v2net.AddressJson `json:"sendThrough"` + Tag string `json:"tag"` + Settings json.RawMessage `json:"settings"` + StreamSetting *internet.StreamSettings `json:"streamSettings"` } jsonConfig := new(JsonOutboundDetourConfig) if err := json.Unmarshal(data, jsonConfig); err != nil { @@ -220,6 +234,10 @@ func (this *OutboundDetourConfig) UnmarshalJSON(data []byte) error { } this.SendThrough = address } + + if jsonConfig.StreamSetting != nil { + this.StreamSettings = jsonConfig.StreamSetting + } return nil } diff --git a/shell/point/inbound_detour_always.go b/shell/point/inbound_detour_always.go index dd420ae34..d439443dc 100644 --- a/shell/point/inbound_detour_always.go +++ b/shell/point/inbound_detour_always.go @@ -26,9 +26,10 @@ func NewInboundDetourHandlerAlways(space app.Space, config *InboundDetourConfig) for i := ports.From; i <= ports.To; i++ { ichConfig := config.Settings ich, err := proxyrepo.CreateInboundHandler(config.Protocol, space, ichConfig, &proxy.InboundHandlerMeta{ - Address: config.ListenOn, - Port: i, - Tag: config.Tag}) + Address: config.ListenOn, + Port: i, + Tag: config.Tag, + StreamSettings: config.StreamSettings}) if err != nil { log.Error("Failed to create inbound connection handler: ", err) return nil, err diff --git a/shell/point/inbound_detour_dynamic.go b/shell/point/inbound_detour_dynamic.go index bba37d066..4d90b336b 100644 --- a/shell/point/inbound_detour_dynamic.go +++ b/shell/point/inbound_detour_dynamic.go @@ -32,9 +32,10 @@ func NewInboundDetourHandlerDynamic(space app.Space, config *InboundDetourConfig // To test configuration ich, err := proxyrepo.CreateInboundHandler(config.Protocol, space, config.Settings, &proxy.InboundHandlerMeta{ - Address: config.ListenOn, - Port: 0, - Tag: config.Tag}) + Address: config.ListenOn, + Port: 0, + Tag: config.Tag, + StreamSettings: config.StreamSettings}) if err != nil { log.Error("Point: Failed to create inbound connection handler: ", err) return nil, err @@ -99,7 +100,7 @@ func (this *InboundDetourHandlerDynamic) refresh() error { for idx, _ := range newIchs { port := this.pickUnusedPort() ich, err := proxyrepo.CreateInboundHandler(config.Protocol, this.space, config.Settings, &proxy.InboundHandlerMeta{ - Address: config.ListenOn, Port: port, Tag: config.Tag}) + Address: config.ListenOn, Port: port, Tag: config.Tag, StreamSettings: config.StreamSettings}) if err != nil { log.Error("Point: Failed to create inbound connection handler: ", err) return err diff --git a/shell/point/point.go b/shell/point/point.go index 752eee5e1..246d5c85f 100644 --- a/shell/point/point.go +++ b/shell/point/point.go @@ -93,9 +93,10 @@ func NewPoint(pConfig *Config) (*Point, error) { ichConfig := pConfig.InboundConfig.Settings ich, err := proxyrepo.CreateInboundHandler( pConfig.InboundConfig.Protocol, vpoint.space, ichConfig, &proxy.InboundHandlerMeta{ - Tag: "system.inbound", - Address: pConfig.InboundConfig.ListenOn, - Port: vpoint.port}) + Tag: "system.inbound", + Address: pConfig.InboundConfig.ListenOn, + Port: vpoint.port, + StreamSettings: pConfig.InboundConfig.StreamSettings}) if err != nil { log.Error("Failed to create inbound connection handler: ", err) return nil, err @@ -105,8 +106,10 @@ func NewPoint(pConfig *Config) (*Point, error) { ochConfig := pConfig.OutboundConfig.Settings och, err := proxyrepo.CreateOutboundHandler( pConfig.OutboundConfig.Protocol, vpoint.space, ochConfig, &proxy.OutboundHandlerMeta{ - Tag: "system.outbound", - Address: pConfig.OutboundConfig.SendThrough}) + Tag: "system.outbound", + Address: pConfig.OutboundConfig.SendThrough, + StreamSettings: pConfig.OutboundConfig.StreamSettings, + }) if err != nil { log.Error("Failed to create outbound connection handler: ", err) return nil, err @@ -153,8 +156,10 @@ func NewPoint(pConfig *Config) (*Point, error) { for _, detourConfig := range outboundDetours { detourHandler, err := proxyrepo.CreateOutboundHandler( detourConfig.Protocol, vpoint.space, detourConfig.Settings, &proxy.OutboundHandlerMeta{ - Tag: detourConfig.Tag, - Address: detourConfig.SendThrough}) + Tag: detourConfig.Tag, + Address: detourConfig.SendThrough, + StreamSettings: detourConfig.StreamSettings, + }) if err != nil { log.Error("Point: Failed to create detour outbound connection handler: ", err) return nil, err diff --git a/transport/config.go b/transport/config.go index 7ea9beccf..a98d1b84b 100644 --- a/transport/config.go +++ b/transport/config.go @@ -1,30 +1,23 @@ package transport -import "github.com/v2ray/v2ray-core/transport/hub/kcpv" +import ( + "github.com/v2ray/v2ray-core/transport/internet/kcp" + "github.com/v2ray/v2ray-core/transport/internet/tcp" +) // Config for V2Ray transport layer. type Config struct { - ConnectionReuse bool - enableKcp bool - kcpConfig *kcpv.Config + tcpConfig *tcp.Config + kcpConfig *kcp.Config } // Apply applies this Config. func (this *Config) Apply() error { - if this.ConnectionReuse { - connectionReuse = true + if this.tcpConfig != nil { + this.tcpConfig.Apply() } - enableKcp = this.enableKcp - if enableKcp { - KcpConfig = this.kcpConfig - /* - KCP do not support connectionReuse, - it is mandatory to set connectionReuse to false - Since KCP have no handshake and - does not SlowStart, there isn't benefit to - use that anyway. - */ - connectionReuse = false + if this.kcpConfig != nil { + this.kcpConfig.Apply() } return nil } diff --git a/transport/config_json.go b/transport/config_json.go index 4328aaa0d..30a8dcf26 100644 --- a/transport/config_json.go +++ b/transport/config_json.go @@ -5,35 +5,21 @@ package transport import ( "encoding/json" - "github.com/v2ray/v2ray-core/common/log" - "github.com/v2ray/v2ray-core/transport/hub/kcpv" + "github.com/v2ray/v2ray-core/transport/internet/kcp" + "github.com/v2ray/v2ray-core/transport/internet/tcp" ) func (this *Config) UnmarshalJSON(data []byte) error { type JsonConfig struct { - ConnectionReuse bool `json:"connectionReuse"` - EnableKcp bool `json:"EnableKCP,omitempty"` - KcpConfig *kcpv.Config `json:"KcpConfig,omitempty"` - } - jsonConfig := &JsonConfig{ - ConnectionReuse: true, - EnableKcp: false, + TCPConfig *tcp.Config `json:"tcpSettings"` + KCPCOnfig *kcp.Config `json:"kcpSettings"` } + jsonConfig := new(JsonConfig) if err := json.Unmarshal(data, jsonConfig); err != nil { return err } - this.ConnectionReuse = jsonConfig.ConnectionReuse - this.enableKcp = jsonConfig.EnableKcp - if jsonConfig.KcpConfig != nil { - this.kcpConfig = jsonConfig.KcpConfig - if jsonConfig.KcpConfig.AdvancedConfigs == nil { - jsonConfig.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs - } - } else { - if jsonConfig.EnableKcp { - log.Error("transport: You have enabled KCP but no configure is given") - } - } + this.tcpConfig = jsonConfig.TCPConfig + this.kcpConfig = jsonConfig.KCPCOnfig return nil } diff --git a/transport/hub/dialer.go b/transport/hub/dialer.go deleted file mode 100644 index e4c73c06a..000000000 --- a/transport/hub/dialer.go +++ /dev/null @@ -1,102 +0,0 @@ -package hub - -import ( - "errors" - "net" - "time" - - v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/proxy" - "github.com/v2ray/v2ray-core/transport" -) - -var ( - ErrorInvalidHost = errors.New("Invalid Host.") - - globalCache = NewConnectionCache() -) - -func Dial(src v2net.Address, dest v2net.Destination) (*Connection, error) { - if src == nil { - src = v2net.AnyIP - } - id := src.String() + "-" + dest.NetAddr() - var conn net.Conn - if dest.IsTCP() && transport.IsConnectionReusable() { - conn = globalCache.Get(id) - } - if conn == nil { - var err error - conn, err = DialWithoutCache(src, dest) - if err != nil { - return nil, err - } - } - return &Connection{ - dest: id, - conn: conn, - listener: globalCache, - }, nil -} - -func DialWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, error) { - dialer := &net.Dialer{ - Timeout: time.Second * 60, - DualStack: true, - } - - if src != nil && src != v2net.AnyIP { - var addr net.Addr - if dest.IsTCP() { - addr = &net.TCPAddr{ - IP: src.IP(), - Port: 0, - } - } else { - addr = &net.UDPAddr{ - IP: src.IP(), - Port: 0, - } - } - dialer.LocalAddr = addr - } - - return dialer.Dial(dest.Network().String(), dest.NetAddr()) -} - -func Dial3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) { - if proxyMeta.KcpSupported && transport.IsKcpEnabled() { - return DialKCP3(src, dest, proxyMeta) - } - return Dial(src, dest) -} -func DialWithoutCache3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (net.Conn, error) { - if proxyMeta.KcpSupported && transport.IsKcpEnabled() { - return DialKCPWithoutCache(src, dest) - } - return DialWithoutCache(src, dest) -} - -func DialKCP3(src v2net.Address, dest v2net.Destination, proxyMeta *proxy.OutboundHandlerMeta) (*Connection, error) { - if src == nil { - src = v2net.AnyIP - } - id := src.String() + "-" + dest.NetAddr() - conn, err := DialWithoutCache3(src, dest, proxyMeta) - if err != nil { - return nil, err - } - return &Connection{ - dest: id, - conn: conn, - listener: globalCache, - }, nil -} - -/*DialKCPWithoutCache Dial KCP connection -This Dialer will ignore src this is a restriction -due to github.com/xtaci/kcp-go.DialWithOptions -*/ -func DialKCPWithoutCache(src v2net.Address, dest v2net.Destination) (net.Conn, error) { - return DialKCP(dest) -} diff --git a/transport/hub/kcp_test.go b/transport/hub/kcp_test.go deleted file mode 100644 index 30feb80b2..000000000 --- a/transport/hub/kcp_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package hub_test - -import "testing" - -import ( - v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/testing/assert" - "github.com/v2ray/v2ray-core/transport" - "github.com/v2ray/v2ray-core/transport/hub" - "github.com/v2ray/v2ray-core/transport/hub/kcpv" -) - -func Test_Pair(t *testing.T) { - assert := assert.On(t) - transport.KcpConfig = &kcpv.Config{} - transport.KcpConfig.Mode = "fast2" - transport.KcpConfig.Key = "key" - transport.KcpConfig.AdvancedConfigs = kcpv.DefaultAdvancedConfigs - lst, _ := hub.ListenKCP(v2net.ParseAddress("127.0.0.1"), 17777) - go func() { - connx, err2 := lst.Accept() - assert.Error(err2).IsNil() - connx.Close() - }() - conn, _ := hub.DialKCP(v2net.TCPDestination(v2net.ParseAddress("127.0.0.1"), 17777)) - conn.LocalAddr() - conn.RemoteAddr() - conn.ApplyConf() - conn.Write([]byte("x")) - conn.Close() -} diff --git a/transport/hub/kcpv/config_json.go b/transport/hub/kcpv/config_json.go deleted file mode 100644 index 66974c206..000000000 --- a/transport/hub/kcpv/config_json.go +++ /dev/null @@ -1,3 +0,0 @@ -package kcpv - -//We can use the default version of json parser diff --git a/transport/hub/kcpv/crypto.go b/transport/hub/kcpv/crypto.go deleted file mode 100644 index 049c9f2d3..000000000 --- a/transport/hub/kcpv/crypto.go +++ /dev/null @@ -1,21 +0,0 @@ -package kcpv - -import ( - "crypto/aes" - "crypto/cipher" - "crypto/sha256" -) - -func generateKeyFromConfigString(key string) []byte { - key += "consensus salt: Let's fight arcifical deceleration with our code. We shall prove our believes with action." - keyw := sha256.Sum256([]byte(key)) - return keyw[:] -} - -func generateBlockWithKey(key []byte) (cipher.Block, error) { - return aes.NewCipher(key) -} - -func GetChipher(key string) (cipher.Block, error) { - return generateBlockWithKey(generateKeyFromConfigString(key)) -} diff --git a/transport/hub/tcp.go b/transport/hub/tcp.go deleted file mode 100644 index 8f63d3ab5..000000000 --- a/transport/hub/tcp.go +++ /dev/null @@ -1,116 +0,0 @@ -package hub - -import ( - "crypto/tls" - "errors" - "net" - "sync" - - "github.com/v2ray/v2ray-core/common/log" - v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/proxy" - "github.com/v2ray/v2ray-core/transport" -) - -var ( - ErrorClosedConnection = errors.New("Connection already closed.") -) - -type TCPHub struct { - sync.Mutex - listener net.Listener - connCallback ConnectionHandler - accepting bool -} - -func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) { - listener, err := net.ListenTCP("tcp", &net.TCPAddr{ - IP: address.IP(), - Port: int(port), - Zone: "", - }) - if err != nil { - return nil, err - } - var hub *TCPHub - if tlsConfig != nil { - tlsListener := tls.NewListener(listener, tlsConfig) - hub = &TCPHub{ - listener: tlsListener, - connCallback: callback, - } - } else { - hub = &TCPHub{ - listener: listener, - connCallback: callback, - } - } - - go hub.start() - return hub, nil -} -func ListenKCPhub(address v2net.Address, port v2net.Port, callback ConnectionHandler, tlsConfig *tls.Config) (*TCPHub, error) { - listener, err := ListenKCP(address, port) - if err != nil { - return nil, err - } - var hub *TCPHub - if tlsConfig != nil { - tlsListener := tls.NewListener(listener, tlsConfig) - hub = &TCPHub{ - listener: tlsListener, - connCallback: callback, - } - } else { - hub = &TCPHub{ - listener: listener, - connCallback: callback, - } - } - - go hub.start() - return hub, nil -} -func ListenTCP6(address v2net.Address, port v2net.Port, callback ConnectionHandler, proxyMeta *proxy.InboundHandlerMeta, tlsConfig *tls.Config) (*TCPHub, error) { - if proxyMeta.KcpSupported && transport.IsKcpEnabled() { - return ListenKCPhub(address, port, callback, tlsConfig) - } else { - return ListenTCP(address, port, callback, tlsConfig) - } - return nil, errors.New("ListenTCP6: Not Implemented") -} - -func (this *TCPHub) Close() { - this.accepting = false - this.listener.Close() -} - -func (this *TCPHub) start() { - this.accepting = true - for this.accepting { - conn, err := this.listener.Accept() - - if err != nil { - if this.accepting { - log.Warning("Listener: Failed to accept new TCP connection: ", err) - } - continue - } - go this.connCallback(&Connection{ - dest: conn.RemoteAddr().String(), - conn: conn, - listener: this, - }) - } -} - -// @Private -func (this *TCPHub) Recycle(dest string, conn net.Conn) { - if this.accepting { - go this.connCallback(&Connection{ - dest: dest, - conn: conn, - listener: this, - }) - } -} diff --git a/transport/internet/connection.go b/transport/internet/connection.go new file mode 100644 index 000000000..ac241ab78 --- /dev/null +++ b/transport/internet/connection.go @@ -0,0 +1,33 @@ +package internet + +import ( + "net" +) + +type ConnectionHandler func(Connection) + +type Reusable interface { + Reusable() bool + SetReusable(reuse bool) +} + +type StreamConnectionType int + +var ( + StreamConnectionTypeRawTCP StreamConnectionType = 1 + StreamConnectionTypeTCP StreamConnectionType = 2 + StreamConnectionTypeKCP StreamConnectionType = 4 +) + +type StreamSettings struct { + Type StreamConnectionType +} + +func (this *StreamSettings) IsCapableOf(streamType StreamConnectionType) bool { + return (this.Type & streamType) == streamType +} + +type Connection interface { + net.Conn + Reusable +} diff --git a/transport/internet/connection_json.go b/transport/internet/connection_json.go new file mode 100644 index 000000000..a4ad8c5a2 --- /dev/null +++ b/transport/internet/connection_json.go @@ -0,0 +1,27 @@ +// +build json + +package internet + +import ( + "encoding/json" + + v2net "github.com/v2ray/v2ray-core/common/net" +) + +func (this *StreamSettings) UnmarshalJSON(data []byte) error { + type JSONConfig struct { + Network v2net.NetworkList `json:"network"` + } + this.Type = StreamConnectionTypeRawTCP + jsonConfig := new(JSONConfig) + if err := json.Unmarshal(data, jsonConfig); err != nil { + return err + } + if jsonConfig.Network.HasNetwork(v2net.KCPNetwork) { + this.Type |= StreamConnectionTypeKCP + } + if jsonConfig.Network.HasNetwork(v2net.TCPNetwork) { + this.Type |= StreamConnectionTypeTCP + } + return nil +} diff --git a/transport/internet/dialer.go b/transport/internet/dialer.go new file mode 100644 index 000000000..1ec7c552b --- /dev/null +++ b/transport/internet/dialer.go @@ -0,0 +1,63 @@ +package internet + +import ( + "errors" + "net" + "time" + + v2net "github.com/v2ray/v2ray-core/common/net" +) + +var ( + ErrUnsupportedStreamType = errors.New("Unsupported stream type.") +) + +type Dialer func(src v2net.Address, dest v2net.Destination) (Connection, error) + +var ( + TCPDialer Dialer + KCPDialer Dialer + RawTCPDialer Dialer + UDPDialer Dialer +) + +func Dial(src v2net.Address, dest v2net.Destination, settings *StreamSettings) (Connection, error) { + if dest.IsTCP() { + switch { + case settings.IsCapableOf(StreamConnectionTypeKCP): + return KCPDialer(src, dest) + case settings.IsCapableOf(StreamConnectionTypeTCP): + return TCPDialer(src, dest) + case settings.IsCapableOf(StreamConnectionTypeRawTCP): + return RawTCPDialer(src, dest) + } + return nil, ErrUnsupportedStreamType + } + + return UDPDialer(src, dest) +} + +func DialToDest(src v2net.Address, dest v2net.Destination) (net.Conn, error) { + dialer := &net.Dialer{ + Timeout: time.Second * 60, + DualStack: true, + } + + if src != nil && src != v2net.AnyIP { + var addr net.Addr + if dest.IsTCP() { + addr = &net.TCPAddr{ + IP: src.IP(), + Port: 0, + } + } else { + addr = &net.UDPAddr{ + IP: src.IP(), + Port: 0, + } + } + dialer.LocalAddr = addr + } + + return dialer.Dial(dest.Network().String(), dest.NetAddr()) +} diff --git a/transport/hub/dialer_test.go b/transport/internet/dialer_test.go similarity index 62% rename from transport/hub/dialer_test.go rename to transport/internet/dialer_test.go index 24dd18f50..d32c03a90 100644 --- a/transport/hub/dialer_test.go +++ b/transport/internet/dialer_test.go @@ -1,14 +1,13 @@ -package hub_test +package internet_test import ( - "net" "testing" v2net "github.com/v2ray/v2ray-core/common/net" v2nettesting "github.com/v2ray/v2ray-core/common/net/testing" "github.com/v2ray/v2ray-core/testing/assert" "github.com/v2ray/v2ray-core/testing/servers/tcp" - . "github.com/v2ray/v2ray-core/transport/hub" + . "github.com/v2ray/v2ray-core/transport/internet" ) func TestDialDomain(t *testing.T) { @@ -21,7 +20,7 @@ func TestDialDomain(t *testing.T) { assert.Error(err).IsNil() defer server.Close() - conn, err := Dial(nil, v2net.TCPDestination(v2net.DomainAddress("local.v2ray.com"), dest.Port())) + conn, err := DialToDest(nil, v2net.TCPDestination(v2net.DomainAddress("local.v2ray.com"), dest.Port())) assert.Error(err).IsNil() assert.String(conn.RemoteAddr().String()).Equals("127.0.0.1:" + dest.Port().String()) conn.Close() @@ -37,19 +36,7 @@ func TestDialWithLocalAddr(t *testing.T) { assert.Error(err).IsNil() defer server.Close() - var localAddr net.IP - addrs, err := net.InterfaceAddrs() - assert.Error(err).IsNil() - for _, addr := range addrs { - str := addr.String() - ip := net.ParseIP(str) - if ip != nil && ip.To4() != nil { - localAddr = ip.To4() - } - } - assert.Pointer(localAddr).IsNotNil() - - conn, err := Dial(v2net.IPAddress(localAddr), v2net.TCPDestination(v2net.LocalHostIP, dest.Port())) + conn, err := DialToDest(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, dest.Port())) assert.Error(err).IsNil() assert.String(conn.RemoteAddr().String()).Equals("127.0.0.1:" + dest.Port().String()) conn.Close() diff --git a/transport/hub/kcpv/config.go b/transport/internet/kcp/config.go similarity index 54% rename from transport/hub/kcpv/config.go rename to transport/internet/kcp/config.go index 9b162d308..149e060a1 100644 --- a/transport/hub/kcpv/config.go +++ b/transport/internet/kcp/config.go @@ -1,4 +1,4 @@ -package kcpv +package kcp /*AdvancedConfig define behavior of KCP in detail @@ -29,16 +29,6 @@ can cause v2ray to kill the proxy connection it is relaying, Higher value can prevent server from closing zombie socket and waste resources. */ -type AdvancedConfig struct { - Mtu int `json:"MaximumTransmissionUnit"` - Sndwnd int `json:"SendingWindowSize"` - Rcvwnd int `json:"ReceivingWindowSize"` - Fec int `json:"ForwardErrorCorrectionGroupSize"` - Acknodelay bool `json:"AcknowledgeNoDelay"` - Dscp int `json:"Dscp"` - ReadTimeout int `json:"ReadTimeout"` - WriteTimeout int `json:"WriteTimeout"` -} /*Config define basic behavior of KCP Mode: @@ -46,20 +36,33 @@ can be one of these values: fast3,fast2,fast,normal <<<<<<- less delay ->>>>>> less bandwich wasted - -EncryptionKey: -a string that will be the EncryptionKey of -All KCP connection we Listen-Accpet or -Dial, We are not very sure about how this -encryption hehave and DO use a unique randomly -generated key. */ type Config struct { - Mode string `json:"Mode"` - Key string `json:"EncryptionKey"` - AdvancedConfigs *AdvancedConfig `json:"AdvancedConfig,omitempty"` + Mode string `json:"Mode"` + Mtu int `json:"MaximumTransmissionUnit"` + Sndwnd int `json:"SendingWindowSize"` + Rcvwnd int `json:"ReceivingWindowSize"` + Fec int `json:"ForwardErrorCorrectionGroupSize"` + Acknodelay bool `json:"AcknowledgeNoDelay"` + Dscp int `json:"Dscp"` + ReadTimeout int `json:"ReadTimeout"` + WriteTimeout int `json:"WriteTimeout"` } -var DefaultAdvancedConfigs = &AdvancedConfig{ - Mtu: 1350, Sndwnd: 1024, Rcvwnd: 1024, Fec: 4, Dscp: 0, ReadTimeout: 600, WriteTimeout: 500, Acknodelay: false, +func (this *Config) Apply() { + effectiveConfig = *this } + +var ( + effectiveConfig = Config{ + Mode: "normal", + Mtu: 1350, + Sndwnd: 1024, + Rcvwnd: 1024, + Fec: 4, + Dscp: 0, + ReadTimeout: 600, + WriteTimeout: 500, + Acknodelay: false, + } +) diff --git a/transport/internet/kcp/config_json.go b/transport/internet/kcp/config_json.go new file mode 100644 index 000000000..c38b2a042 --- /dev/null +++ b/transport/internet/kcp/config_json.go @@ -0,0 +1,27 @@ +// +build json + +package kcp + +import ( + "encoding/json" +) + +func (this *Config) UnmarshalJSON(data []byte) error { + type JSONConfig struct { + Mode string `json:"Mode"` + Mtu int `json:"MaximumTransmissionUnit"` + Sndwnd int `json:"SendingWindowSize"` + Rcvwnd int `json:"ReceivingWindowSize"` + Fec int `json:"ForwardErrorCorrectionGroupSize"` + Acknodelay bool `json:"AcknowledgeNoDelay"` + Dscp int `json:"Dscp"` + ReadTimeout int `json:"ReadTimeout"` + WriteTimeout int `json:"WriteTimeout"` + } + jsonConfig := effectiveConfig + if err := json.Unmarshal(data, &jsonConfig); err != nil { + return err + } + *this = jsonConfig + return nil +} diff --git a/transport/internet/kcp/dialer.go b/transport/internet/kcp/dialer.go new file mode 100644 index 000000000..706b2d4f6 --- /dev/null +++ b/transport/internet/kcp/dialer.go @@ -0,0 +1,26 @@ +package kcp + +import ( + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport/internet" + + "github.com/xtaci/kcp-go" +) + +func DialKCP(src v2net.Address, dest v2net.Destination) (internet.Connection, error) { + cpip, _ := kcp.NewNoneBlockCrypt(nil) + kcv, err := kcp.DialWithOptions(effectiveConfig.Fec, dest.NetAddr(), cpip) + if err != nil { + return nil, err + } + kcvn := &KCPVconn{hc: kcv} + err = kcvn.ApplyConf() + if err != nil { + return nil, err + } + return kcvn, nil +} + +func init() { + internet.KCPDialer = DialKCP +} diff --git a/transport/hub/kcp.go b/transport/internet/kcp/session.go similarity index 62% rename from transport/hub/kcp.go rename to transport/internet/kcp/session.go index 818dc7426..0b34b18e9 100644 --- a/transport/hub/kcp.go +++ b/transport/internet/kcp/session.go @@ -1,20 +1,18 @@ -package hub +package kcp import ( "errors" "net" "time" - "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" - "github.com/v2ray/v2ray-core/transport" - "github.com/v2ray/v2ray-core/transport/hub/kcpv" + "github.com/v2ray/v2ray-core/transport/internet" + "github.com/xtaci/kcp-go" ) type KCPVlistener struct { lst *kcp.Listener - conf *kcpv.Config previousSocketid map[int]uint32 previousSocketid_mapid int } @@ -25,7 +23,7 @@ It could be reconized as a new connection and call accept. If we can detect that the connection is of such a kind, we will discard that conn. */ -func (kvl *KCPVlistener) Accept() (net.Conn, error) { +func (kvl *KCPVlistener) Accept() (internet.Connection, error) { conn, err := kvl.lst.Accept() if err != nil { return nil, err @@ -59,7 +57,6 @@ func (kvl *KCPVlistener) Accept() (net.Conn, error) { } kcv := &KCPVconn{hc: conn} - kcv.conf = kvl.conf err = kcv.ApplyConf() if err != nil { return nil, err @@ -77,14 +74,13 @@ func (kvl *KCPVlistener) Addr() net.Addr { type KCPVconn struct { hc *kcp.UDPSession - conf *kcpv.Config conntokeep time.Time } //var counter int func (kcpvc *KCPVconn) Read(b []byte) (int, error) { - ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.ReadTimeout) * time.Second) + ifb := time.Now().Add(time.Duration(effectiveConfig.ReadTimeout) * time.Second) if ifb.After(kcpvc.conntokeep) { kcpvc.conntokeep = ifb } @@ -93,7 +89,7 @@ func (kcpvc *KCPVconn) Read(b []byte) (int, error) { } func (kcpvc *KCPVconn) Write(b []byte) (int, error) { - ifb := time.Now().Add(time.Duration(kcpvc.conf.AdvancedConfigs.WriteTimeout) * time.Second) + ifb := time.Now().Add(time.Duration(effectiveConfig.WriteTimeout) * time.Second) if ifb.After(kcpvc.conntokeep) { kcpvc.conntokeep = ifb } @@ -107,27 +103,22 @@ It is recommmanded to call this func once and only once */ func (kcpvc *KCPVconn) ApplyConf() error { nodelay, interval, resend, nc := 0, 40, 0, 0 - if kcpvc.conf.Mode != "manual" { - switch kcpvc.conf.Mode { - case "normal": - nodelay, interval, resend, nc = 0, 30, 2, 1 - case "fast": - nodelay, interval, resend, nc = 0, 20, 2, 1 - case "fast2": - nodelay, interval, resend, nc = 1, 20, 2, 1 - case "fast3": - nodelay, interval, resend, nc = 1, 10, 2, 1 - } - } else { - log.Error("kcp: Failed to Apply configure: Manual mode is not supported.(yet!)") - return errors.New("kcp: Manual Not Implemented") + switch effectiveConfig.Mode { + case "normal": + nodelay, interval, resend, nc = 0, 30, 2, 1 + case "fast": + nodelay, interval, resend, nc = 0, 20, 2, 1 + case "fast2": + nodelay, interval, resend, nc = 1, 20, 2, 1 + case "fast3": + nodelay, interval, resend, nc = 1, 10, 2, 1 } kcpvc.hc.SetNoDelay(nodelay, interval, resend, nc) - kcpvc.hc.SetWindowSize(kcpvc.conf.AdvancedConfigs.Sndwnd, kcpvc.conf.AdvancedConfigs.Rcvwnd) - kcpvc.hc.SetMtu(kcpvc.conf.AdvancedConfigs.Mtu) - kcpvc.hc.SetACKNoDelay(kcpvc.conf.AdvancedConfigs.Acknodelay) - kcpvc.hc.SetDSCP(kcpvc.conf.AdvancedConfigs.Dscp) + kcpvc.hc.SetWindowSize(effectiveConfig.Sndwnd, effectiveConfig.Rcvwnd) + kcpvc.hc.SetMtu(effectiveConfig.Mtu) + kcpvc.hc.SetACKNoDelay(effectiveConfig.Acknodelay) + kcpvc.hc.SetDSCP(effectiveConfig.Dscp) //counter++ //log.Info(counter) return nil @@ -167,27 +158,22 @@ func (kcpvc *KCPVconn) SetWriteDeadline(t time.Time) error { return kcpvc.hc.SetWriteDeadline(t) } -func DialKCP(dest v2net.Destination) (*KCPVconn, error) { - kcpconf := transport.KcpConfig - cpip, _ := kcpv.GetChipher(kcpconf.Key) - kcv, err := kcp.DialWithOptions(kcpconf.AdvancedConfigs.Fec, dest.NetAddr(), cpip) - if err != nil { - return nil, err - } - kcvn := &KCPVconn{hc: kcv} - kcvn.conf = kcpconf - err = kcvn.ApplyConf() - if err != nil { - return nil, err - } - return kcvn, nil +func (this *KCPVconn) Reusable() bool { + return false } -func ListenKCP(address v2net.Address, port v2net.Port) (*KCPVlistener, error) { - kcpconf := transport.KcpConfig - cpip, _ := kcpv.GetChipher(kcpconf.Key) +func (this *KCPVconn) SetReusable(b bool) { + +} + +func ListenKCP(address v2net.Address, port v2net.Port) (internet.Listener, error) { laddr := address.String() + ":" + port.String() - kcl, err := kcp.ListenWithOptions(kcpconf.AdvancedConfigs.Fec, laddr, cpip) - kcvl := &KCPVlistener{lst: kcl, conf: kcpconf} + crypt, _ := kcp.NewNoneBlockCrypt(nil) + kcl, err := kcp.ListenWithOptions(effectiveConfig.Fec, laddr, crypt) + kcvl := &KCPVlistener{lst: kcl} return kcvl, err } + +func init() { + internet.KCPListenFunc = ListenKCP +} diff --git a/transport/internet/tcp/config.go b/transport/internet/tcp/config.go new file mode 100644 index 000000000..6bc1f4d67 --- /dev/null +++ b/transport/internet/tcp/config.go @@ -0,0 +1,15 @@ +package tcp + +type Config struct { + ConnectionReuse bool +} + +func (this *Config) Apply() { + effectiveConfig = this +} + +var ( + effectiveConfig = &Config{ + ConnectionReuse: true, + } +) diff --git a/transport/internet/tcp/config_json.go b/transport/internet/tcp/config_json.go new file mode 100644 index 000000000..e77f184a3 --- /dev/null +++ b/transport/internet/tcp/config_json.go @@ -0,0 +1,20 @@ +package tcp + +import ( + "encoding/json" +) + +func (this *Config) UnmarshalJSON(data []byte) error { + type JsonConfig struct { + ConnectionReuse bool `json:"connectionReuse"` + } + jsonConfig := &JsonConfig{ + ConnectionReuse: true, + } + if err := json.Unmarshal(data, jsonConfig); err != nil { + return err + } + this.ConnectionReuse = jsonConfig.ConnectionReuse + + return nil +} diff --git a/transport/hub/connection.go b/transport/internet/tcp/connection.go similarity index 73% rename from transport/hub/connection.go rename to transport/internet/tcp/connection.go index 83f2e236b..9fc3ffedf 100644 --- a/transport/hub/connection.go +++ b/transport/internet/tcp/connection.go @@ -1,24 +1,31 @@ -package hub +package tcp import ( "errors" + "io" "net" "reflect" "time" - - "github.com/v2ray/v2ray-core/transport" ) var ( ErrInvalidConn = errors.New("Invalid Connection.") ) -type ConnectionHandler func(*Connection) - type ConnectionManager interface { Recycle(string, net.Conn) } +type RawConnection struct { + net.TCPConn +} + +func (this *RawConnection) Reusable() bool { + return false +} + +func (this *RawConnection) SetReusable(b bool) {} + type Connection struct { dest string conn net.Conn @@ -26,9 +33,18 @@ type Connection struct { reusable bool } +func NewConnection(dest string, conn net.Conn, manager ConnectionManager) *Connection { + return &Connection{ + dest: dest, + conn: conn, + listener: manager, + reusable: effectiveConfig.ConnectionReuse, + } +} + func (this *Connection) Read(b []byte) (int, error) { if this == nil || this.conn == nil { - return 0, ErrorClosedConnection + return 0, io.EOF } return this.conn.Read(b) @@ -36,20 +52,22 @@ func (this *Connection) Read(b []byte) (int, error) { func (this *Connection) Write(b []byte) (int, error) { if this == nil || this.conn == nil { - return 0, ErrorClosedConnection + return 0, io.ErrClosedPipe } return this.conn.Write(b) } func (this *Connection) Close() error { if this == nil || this.conn == nil { - return ErrorClosedConnection + return io.ErrClosedPipe } - if transport.IsConnectionReusable() && this.Reusable() { + if this.Reusable() { this.listener.Recycle(this.dest, this.conn) return nil } - return this.conn.Close() + err := this.conn.Close() + this.conn = nil + return err } func (this *Connection) LocalAddr() net.Addr { @@ -73,6 +91,9 @@ func (this *Connection) SetWriteDeadline(t time.Time) error { } func (this *Connection) SetReusable(reusable bool) { + if !effectiveConfig.ConnectionReuse { + return + } this.reusable = reusable } diff --git a/transport/hub/connection_cache.go b/transport/internet/tcp/connection_cache.go similarity index 99% rename from transport/hub/connection_cache.go rename to transport/internet/tcp/connection_cache.go index 55c4a6fa8..032a92df5 100644 --- a/transport/hub/connection_cache.go +++ b/transport/internet/tcp/connection_cache.go @@ -1,4 +1,4 @@ -package hub +package tcp import ( "net" diff --git a/transport/internet/tcp/dialer.go b/transport/internet/tcp/dialer.go new file mode 100644 index 000000000..e39827e7e --- /dev/null +++ b/transport/internet/tcp/dialer.go @@ -0,0 +1,49 @@ +package tcp + +import ( + "net" + + "github.com/v2ray/v2ray-core/common/log" + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport/internet" +) + +var ( + globalCache = NewConnectionCache() +) + +func Dial(src v2net.Address, dest v2net.Destination) (internet.Connection, error) { + log.Info("Dailing TCP to ", dest) + if src == nil { + src = v2net.AnyIP + } + id := src.String() + "-" + dest.NetAddr() + var conn net.Conn + if dest.IsTCP() && effectiveConfig.ConnectionReuse { + conn = globalCache.Get(id) + } + if conn == nil { + var err error + conn, err = internet.DialToDest(src, dest) + if err != nil { + return nil, err + } + } + return NewConnection(id, conn, globalCache), nil +} + +func DialRaw(src v2net.Address, dest v2net.Destination) (internet.Connection, error) { + log.Info("Dailing Raw TCP to ", dest) + conn, err := internet.DialToDest(src, dest) + if err != nil { + return nil, err + } + return &RawConnection{ + TCPConn: *conn.(*net.TCPConn), + }, nil +} + +func init() { + internet.TCPDialer = Dial + internet.RawTCPDialer = DialRaw +} diff --git a/transport/internet/tcp/hub.go b/transport/internet/tcp/hub.go new file mode 100644 index 000000000..ae3b85e7a --- /dev/null +++ b/transport/internet/tcp/hub.go @@ -0,0 +1,159 @@ +package tcp + +import ( + "errors" + "net" + "sync" + "time" + + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport/internet" +) + +var ( + ErrClosedListener = errors.New("Listener is closed.") +) + +type ConnectionWithError struct { + conn net.Conn + err error +} + +type TCPListener struct { + sync.Mutex + acccepting bool + listener *net.TCPListener + awaitingConns chan *ConnectionWithError +} + +func ListenTCP(address v2net.Address, port v2net.Port) (internet.Listener, error) { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: address.IP(), + Port: int(port), + }) + if err != nil { + return nil, err + } + l := &TCPListener{ + acccepting: true, + listener: listener, + awaitingConns: make(chan *ConnectionWithError, 32), + } + go l.KeepAccepting() + return l, nil +} + +func (this *TCPListener) Accept() (internet.Connection, error) { + for this.acccepting { + select { + case connErr, open := <-this.awaitingConns: + if !open { + return nil, ErrClosedListener + } + if connErr.err != nil { + return nil, connErr.err + } + return NewConnection("", connErr.conn, this), nil + case <-time.After(time.Second * 2): + } + } + return nil, ErrClosedListener +} + +func (this *TCPListener) KeepAccepting() { + for this.acccepting { + conn, err := this.listener.Accept() + this.Lock() + if !this.acccepting { + this.Unlock() + break + } + select { + case this.awaitingConns <- &ConnectionWithError{ + conn: conn, + err: err, + }: + default: + if conn != nil { + conn.Close() + } + } + + this.Unlock() + } +} + +func (this *TCPListener) Recycle(dest string, conn net.Conn) { + this.Lock() + defer this.Unlock() + if !this.acccepting { + return + } + select { + case this.awaitingConns <- &ConnectionWithError{conn: conn}: + default: + conn.Close() + } +} + +func (this *TCPListener) Addr() net.Addr { + return this.listener.Addr() +} + +func (this *TCPListener) Close() error { + this.Lock() + defer this.Unlock() + this.acccepting = false + this.listener.Close() + close(this.awaitingConns) + for connErr := range this.awaitingConns { + if connErr.conn != nil { + go connErr.conn.Close() + } + } + return nil +} + +type RawTCPListener struct { + accepting bool + listener *net.TCPListener +} + +func (this *RawTCPListener) Accept() (internet.Connection, error) { + conn, err := this.listener.AcceptTCP() + if err != nil { + return nil, err + } + return &RawConnection{ + TCPConn: *conn, + }, nil +} + +func (this *RawTCPListener) Addr() net.Addr { + return this.listener.Addr() +} + +func (this *RawTCPListener) Close() error { + this.accepting = false + this.listener.Close() + return nil +} + +func ListenRawTCP(address v2net.Address, port v2net.Port) (internet.Listener, error) { + listener, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: address.IP(), + Port: int(port), + }) + if err != nil { + return nil, err + } + return &RawTCPListener{ + accepting: true, + listener: listener, + }, nil +} + +func init() { + internet.TCPListenFunc = ListenTCP + internet.RawTCPListenFunc = ListenRawTCP +} diff --git a/transport/internet/tcp_hub.go b/transport/internet/tcp_hub.go new file mode 100644 index 000000000..b8d8ff31a --- /dev/null +++ b/transport/internet/tcp_hub.go @@ -0,0 +1,77 @@ +package internet + +import ( + "errors" + "net" + "sync" + + "github.com/v2ray/v2ray-core/common/log" + v2net "github.com/v2ray/v2ray-core/common/net" +) + +var ( + ErrorClosedConnection = errors.New("Connection already closed.") + + KCPListenFunc ListenFunc + TCPListenFunc ListenFunc + RawTCPListenFunc ListenFunc +) + +type ListenFunc func(address v2net.Address, port v2net.Port) (Listener, error) +type Listener interface { + Accept() (Connection, error) + Close() error + Addr() net.Addr +} + +type TCPHub struct { + sync.Mutex + listener Listener + connCallback ConnectionHandler + accepting bool +} + +func ListenTCP(address v2net.Address, port v2net.Port, callback ConnectionHandler, settings *StreamSettings) (*TCPHub, error) { + var listener Listener + var err error + if settings.IsCapableOf(StreamConnectionTypeKCP) { + listener, err = KCPListenFunc(address, port) + } else if settings.IsCapableOf(StreamConnectionTypeTCP) { + listener, err = TCPListenFunc(address, port) + } else { + listener, err = RawTCPListenFunc(address, port) + } + + if err != nil { + return nil, err + } + + hub := &TCPHub{ + listener: listener, + connCallback: callback, + } + + go hub.start() + return hub, nil +} + +func (this *TCPHub) Close() { + this.accepting = false + this.listener.Close() +} + +func (this *TCPHub) start() { + this.accepting = true + for this.accepting { + conn, err := this.listener.Accept() + + if err != nil { + if this.accepting { + log.Warning("Listener: Failed to accept new TCP connection: ", err) + } + continue + } + log.Info("Handling connection from ", conn.RemoteAddr()) + go this.connCallback(conn) + } +} diff --git a/transport/internet/udp/connection.go b/transport/internet/udp/connection.go new file mode 100644 index 000000000..94591961e --- /dev/null +++ b/transport/internet/udp/connection.go @@ -0,0 +1,30 @@ +package udp + +import ( + "net" + + v2net "github.com/v2ray/v2ray-core/common/net" + "github.com/v2ray/v2ray-core/transport/internet" +) + +type Connection struct { + net.UDPConn +} + +func (this *Connection) Reusable() bool { + return false +} + +func (this *Connection) SetReusable(b bool) {} + +func init() { + internet.UDPDialer = func(src v2net.Address, dest v2net.Destination) (internet.Connection, error) { + conn, err := internet.DialToDest(src, dest) + if err != nil { + return nil, err + } + return &Connection{ + UDPConn: *(conn.(*net.UDPConn)), + }, nil + } +} diff --git a/transport/hub/udp.go b/transport/internet/udp/udp.go similarity index 99% rename from transport/hub/udp.go rename to transport/internet/udp/udp.go index 2e69b8341..d16ea6981 100644 --- a/transport/hub/udp.go +++ b/transport/internet/udp/udp.go @@ -1,4 +1,4 @@ -package hub +package udp import ( "net" diff --git a/transport/hub/udp_server.go b/transport/internet/udp/udp_server.go similarity index 99% rename from transport/hub/udp_server.go rename to transport/internet/udp/udp_server.go index 720472802..ca162998b 100644 --- a/transport/hub/udp_server.go +++ b/transport/internet/udp/udp_server.go @@ -1,4 +1,4 @@ -package hub +package udp import ( "sync" diff --git a/transport/transport.go b/transport/transport.go deleted file mode 100644 index dee265fc4..000000000 --- a/transport/transport.go +++ /dev/null @@ -1,18 +0,0 @@ -package transport - -import "github.com/v2ray/v2ray-core/transport/hub/kcpv" - -var ( - connectionReuse = true - enableKcp = false - KcpConfig *kcpv.Config -) - -// IsConnectionReusable returns true if V2Ray is trying to reuse TCP connections. -func IsConnectionReusable() bool { - return connectionReuse -} - -func IsKcpEnabled() bool { - return enableKcp -}